Skip to content

Translation

orbiter.rules.rulesets.TranslationRuleset

A Ruleset is a collection of Rules that are evaluated in priority order

A TranslationRuleset is a container for Rulesets, which applies to a specific translation

>>> TranslationRuleset(
...   file_type={FileTypeJSON},                                      # Has a file type
...   translate_fn=fake_translate,                                  # and can have a callable
...   # translate_fn="orbiter.rules.translate.fake_translate",      # or a qualified name to a function
...   dag_filter_ruleset={"ruleset": [{"rule": lambda x: None}]},   # Rulesets can be dict within dicts
...   dag_ruleset=DAGRuleset(ruleset=[Rule(rule=lambda x: None)]),  # or objects within objects
...   task_filter_ruleset=EMPTY_RULESET,                            # or a mix
...   task_ruleset=EMPTY_RULESET,
...   task_dependency_ruleset=EMPTY_RULESET,                        # Omitted for brevity
...   post_processing_ruleset=EMPTY_RULESET,
... )
TranslationRuleset(...)

Parameters:

Name Type Description
file_type Set[Type[FileType]]

FileType to translate

dag_filter_ruleset DAGFilterRuleset | dict
dag_ruleset DAGRuleset | dict
task_filter_ruleset TaskFilterRuleset | dict
task_ruleset TaskRuleset | dict
task_dependency_ruleset TaskDependencyRuleset | dict
post_processing_ruleset PostProcessingRuleset | dict
translate_fn Callable[[TranslationRuleset, Path], OrbiterProject] | str | TranslateFn

Either a qualified name to a function (e.g. path.to.file.function), or a function reference, with the signature:
(translation_ruleset: Translation Ruleset, input_dir: Path) -> OrbiterProject

Methods:

Name Description
dumps

Convert Python dictionary back to source string form, useful for testing

get_ext

Get the first file extension for this ruleset

get_files_with_extension

A generator that yields files with a specific extension(s) in a directory

loads

Converts all files of type into a Python dictionary "intermediate representation" form,

test

Test an input against the whole ruleset.

dumps

dumps(input_dict: dict, ext: str | None = None) -> str

Convert Python dictionary back to source string form, useful for testing

Parameters:

Name Type Description
input_dict dict

The dictionary to convert to a string

ext str | None

The file type extension to dump as, defaults to first 'file_type' in the set

Returns:

Type Description
str

The string representation of the input_dict, in the file_type format

Source code in orbiter/rules/rulesets.py
@validate_call
def dumps(self, input_dict: dict, ext: str | None = None) -> str:
    """
    Convert Python dictionary back to source string form, useful for testing

    :param input_dict: The dictionary to convert to a string
    :type input_dict: dict
    :param ext: The file type extension to dump as, defaults to first 'file_type' in the set
    :type ext: str | None
    :return str: The string representation of the input_dict, in the file_type format
    :rtype: str
    """
    for file_type in self.file_type:
        if ext is None or ext.lower() in file_type.extension:
            return file_type.dump_fn(input_dict)
    raise TypeError(f"Invalid file_type={ext}")

get_ext

get_ext() -> str

Get the first file extension for this ruleset

>>> EMPTY_TRANSLATION_RULESET.get_ext()
'JSON'
Source code in orbiter/rules/rulesets.py
def get_ext(self) -> str:
    """
    Get the first file extension for this ruleset

    ```pycon
    >>> EMPTY_TRANSLATION_RULESET.get_ext()
    'JSON'

    ```
    """
    return next(iter(next(iter(self.file_type)).extension))

get_files_with_extension

get_files_with_extension(
    input_dir: Path,
) -> Generator[Tuple[Path, dict]]

A generator that yields files with a specific extension(s) in a directory

Parameters:

Name Type Description
input_dir Path

The directory to search in

Returns:

Type Description
Generator[Path, dict]

Generator item of (Path, dict) for each file found

Source code in orbiter/rules/rulesets.py
def get_files_with_extension(self, input_dir: Path) -> Generator[Tuple[Path, dict]]:
    """
    A generator that yields files with a specific extension(s) in a directory

    :param input_dir: The directory to search in
    :type input_dir: Path
    :return: Generator item of (Path, dict) for each file found
    :rtype: Generator[Path, dict]
    """
    for directory, _, files in input_dir.walk() if hasattr(input_dir, "walk") else _backport_walk(input_dir):
        logger.debug(f"Checking directory={directory}")
        for file in files:
            file = directory / file
            try:
                dict_or_list = self.loads(file)
                # If the file is list-like (not a dict), return each item in the list
                if isinstance(dict_or_list, Collection) and not isinstance(dict_or_list, Mapping):
                    for item in dict_or_list:
                        yield file, item
                else:
                    yield file, dict_or_list
            except TypeError:
                logger.debug(f"File={file} not of correct type, skipping...")
                continue

loads

loads(file: Path) -> dict

Converts all files of type into a Python dictionary "intermediate representation" form, prior to any rulesets being applied.

Parameters:

Name Type Description
file Path

The file to load

Returns:

Type Description
dict

The dictionary representation of the input_str

Source code in orbiter/rules/rulesets.py
@validate_call
def loads(self, file: Path) -> dict:
    """
    Converts all files of type into a Python dictionary "intermediate representation" form,
    prior to any rulesets being applied.

    :param file: The file to load
    :type file: Path
    :return: The dictionary representation of the input_str
    :rtype: dict
    """
    for file_type in self.file_type:
        if file.suffix.lower() in {f".{ext.lower()}" for ext in file_type.extension}:
            try:
                return file_type.load_fn(file.read_text())
            except Exception as e:
                logger.error(f"Error loading file={file}! Skipping!\n{e}")
                continue
    raise TypeError(f"Invalid file_type={file.suffix}, does not match file_type={self.file_type}")

test

test(input_value: str | dict) -> OrbiterProject

Test an input against the whole ruleset. - 'input_dict' (a parsed python dict) - or 'input_str' (raw value) to test against the ruleset.

Parameters:

Name Type Description
input_value str | dict

The input to test can be either a dict (passed to translate_ruleset.dumps() before translate_ruleset.loads()) or a string (read directly by translate_ruleset.loads())

Returns:

Type Description
OrbiterProject

OrbiterProject produced after applying the ruleset

Source code in orbiter/rules/rulesets.py
def test(self, input_value: str | dict) -> OrbiterProject:
    """
    Test an input against the whole ruleset.
    - 'input_dict' (a parsed python dict)
    - or 'input_str' (raw value) to test against the ruleset.

    :param input_value: The input to test
        can be either a dict (passed to `translate_ruleset.dumps()` before `translate_ruleset.loads()`)
        or a string (read directly by `translate_ruleset.loads()`)
    :type input_value: str | dict
    :return: OrbiterProject produced after applying the ruleset
    :rtype: OrbiterProject
    """
    with TemporaryDirectory() as tempdir:
        file = Path(tempdir) / f"{uuid.uuid4()}.{self.get_ext()}"
        file.write_text(self.dumps(input_dict=input_value) if isinstance(input_value, dict) else input_value)
        return self.translate_fn(translation_ruleset=self, input_dir=file.parent)

orbiter.file_types

Classes:

Name Description
FileType

Abstract Base File Type

FileTypeJIL

JIL File Type

FileTypeJSON

JSON File Type

FileTypePython

Python File Type

FileTypeXML

XML File Type

FileTypeYAML

YAML File Type

Functions:

Name Description
parse_jil

Parses JIL string into a dictionary.

xmltodict_parse

Calls xmltodict.parse and does post-processing fixes.

FileType

Abstract Base File Type

Parameters:

Name Type Description
extension Set[str]

The file extension(s) for this file type

load_fn Callable[[str], dict]

The function to load the file into a dictionary for this file type

dump_fn Callable[[dict], str]

The function to dump a dictionary to a string for this file type

FileTypeJIL

JIL File Type

Parameters:

Name Type Description
extension Set[str]

JIL

load_fn Callable[[str], dict]

custom JIL loading function

dump_fn Callable[[dict], str]

JIL dumping function not yet implemented, raises an error

FileTypeJSON

JSON File Type

>>> out = FileTypeJSON.dump_fn({'a': 1}); out
'{"a": 1}'
>>> FileTypeJSON.load_fn(out)
{'a': 1}

Parameters:

Name Type Description
extension Set[str]

JSON

load_fn Callable[[str], dict]

json.loads

dump_fn Callable[[dict], str]

json.dumps

FileTypePython

Python File Type

Parameters:

Name Type Description
extension Set[str]

PY

load_fn Callable[[str], dict]

Python AST loading function (via ast2json)

dump_fn Callable[[dict], str]

Python dumping function not yet implemented, raises an error

FileTypeXML

XML File Type

Note

This class uses a custom xmltodict_parse method to standardize the output to a list of dictionaries

>>> out = FileTypeXML.dump_fn({'a': 1}); out
'<?xml version="1.0" encoding="utf-8"?>\n<a>1</a>'
>>> FileTypeXML.load_fn(out)
{'a': '1'}

Parameters:

Name Type Description
extension Set[str]

XML

load_fn Callable[[str], dict]

xmltodict_parse

dump_fn Callable[[dict], str]

xmltodict.unparse

FileTypeYAML

YAML File Type

>>> out = FileTypeYAML.dump_fn({'a': 1}); out
'a: 1\n'
>>> FileTypeYAML.load_fn(out)
{'a': 1}

Parameters:

Name Type Description
extension Set[str]

YAML, YML

load_fn Callable[[str], dict]

yaml.safe_load

dump_fn Callable[[dict], str]

yaml.safe_dump

parse_jil

parse_jil(s: str) -> dict[str, list[dict]]

Parses JIL string into a dictionary.

>>> parse_jil(r'''insert_job: TEST.ECHO  job_type: CMD  /* INLINE COMMENT */
... owner: foo
... /* MULTILINE
...     COMMENT */
... machine: bar
... command: echo "Hello World"''')
{'jobs': [{'insert_job': 'TEST.ECHO', 'job_type': 'CMD', 'owner': 'foo', 'machine': 'bar', 'command': 'echo "Hello World"'}]}
Source code in orbiter/file_types.py
def parse_jil(s: str) -> dict[str, list[dict]]:
    """Parses JIL string into a dictionary.

    ```pycon
    >>> parse_jil(r'''insert_job: TEST.ECHO  job_type: CMD  /* INLINE COMMENT */
    ... owner: foo
    ... /* MULTILINE
    ...     COMMENT */
    ... machine: bar
    ... command: echo "Hello World"''')
    {'jobs': [{'insert_job': 'TEST.ECHO', 'job_type': 'CMD', 'owner': 'foo', 'machine': 'bar', 'command': 'echo "Hello World"'}]}

    ```
    """
    return {k: [dict(job) for job in v] for k, v in JilParser(None).parse_jobs_from_str(s).items()}

xmltodict_parse

xmltodict_parse(input_str: str) -> Any

Calls xmltodict.parse and does post-processing fixes.

Note

The original xmltodict.parse method returns EITHER:

  • a dict (one child element of type)
  • or a list of dict (many child element of type)

This behavior can be confusing, and is an issue with the original xml spec being referenced.

This method deviates by standardizing to the latter case (always a list[dict]).

All XML elements will be a list of dictionaries, even if there's only one element.

>>> xmltodict_parse("")
Traceback (most recent call last):
xml.parsers.expat.ExpatError: no element found: line 1, column 0
>>> xmltodict_parse("<a></a>")
{'a': None}
>>> xmltodict_parse("<a foo='bar'></a>")
{'a': [{'@foo': 'bar'}]}
>>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo></a>")  # Singleton - gets modified
{'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}]}]}
>>> xmltodict_parse("<a foo='bar'><foo bar='baz'><bar><bop></bop></bar></foo></a>")  # Nested Singletons - modified
{'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz', 'bar': [{'bop': None}]}]}]}
>>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo><foo bing='bop'></foo></a>")
{'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}, {'@bing': 'bop'}]}]}
>>> xmltodict_parse("<a>&lt;?xml version=&apos;1.0&apos; encoding=&apos;UTF-16&apos;?&gt;&lt;Properties version=&apos;1.1&apos;&gt;&lt;/Properties&gt;</a>")
{'a': {'Properties': [{'@version': '1.1'}]}}
>>> xmltodict_parse('''<Source>&lt;Activity mc:Ignorable="sap sap2010 sads"
...   x:Class="Activity" sap2010:WorkflowViewState.IdRef="Activity_1"
...   xmlns="http://schemas.microsoft.com/netfx/2009/xaml/activities"
...   xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"&gt;
...     &lt;TextExpression.NamespacesForImplementation&gt;
...       &lt;sco:Collection x:TypeArguments="x:String"&gt;
...         &lt;x:String&gt;System&lt;/x:String&gt;
...       &lt;/sco:Collection&gt;
...     &lt;/TextExpression.NamespacesForImplementation&gt;
... &lt;/Activity&gt;</Source>
... ''')  # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
{'Source': {'Activity': [{'@mc:Ignorable': 'sap sap2010 sads',... 'TextExpression.NamespacesForImplementation': [{'sco:Collection': [{...}]}]}]}}

Parameters:

Name Type Description
input_str str

The XML string to parse

Returns:

Type Description
dict

The parsed XML

Source code in orbiter/file_types.py
def xmltodict_parse(input_str: str) -> Any:
    """Calls `xmltodict.parse` and does post-processing fixes.

    !!! note

        The original [`xmltodict.parse`](https://pypi.org/project/xmltodict/) method returns EITHER:

        - a dict (one child element of type)
        - or a list of dict (many child element of type)

        This behavior can be confusing, and is an issue with the original xml spec being referenced.

        **This method deviates by standardizing to the latter case (always a `list[dict]`).**

        **All XML elements will be a list of dictionaries, even if there's only one element.**

    ```pycon
    >>> xmltodict_parse("")
    Traceback (most recent call last):
    xml.parsers.expat.ExpatError: no element found: line 1, column 0
    >>> xmltodict_parse("<a></a>")
    {'a': None}
    >>> xmltodict_parse("<a foo='bar'></a>")
    {'a': [{'@foo': 'bar'}]}
    >>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo></a>")  # Singleton - gets modified
    {'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}]}]}
    >>> xmltodict_parse("<a foo='bar'><foo bar='baz'><bar><bop></bop></bar></foo></a>")  # Nested Singletons - modified
    {'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz', 'bar': [{'bop': None}]}]}]}
    >>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo><foo bing='bop'></foo></a>")
    {'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}, {'@bing': 'bop'}]}]}
    >>> xmltodict_parse("<a>&lt;?xml version=&apos;1.0&apos; encoding=&apos;UTF-16&apos;?&gt;&lt;Properties version=&apos;1.1&apos;&gt;&lt;/Properties&gt;</a>")
    {'a': {'Properties': [{'@version': '1.1'}]}}
    >>> xmltodict_parse('''<Source>&lt;Activity mc:Ignorable="sap sap2010 sads"
    ...   x:Class="Activity" sap2010:WorkflowViewState.IdRef="Activity_1"
    ...   xmlns="http://schemas.microsoft.com/netfx/2009/xaml/activities"
    ...   xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"&gt;
    ...     &lt;TextExpression.NamespacesForImplementation&gt;
    ...       &lt;sco:Collection x:TypeArguments="x:String"&gt;
    ...         &lt;x:String&gt;System&lt;/x:String&gt;
    ...       &lt;/sco:Collection&gt;
    ...     &lt;/TextExpression.NamespacesForImplementation&gt;
    ... &lt;/Activity&gt;</Source>
    ... ''')  # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
    {'Source': {'Activity': [{'@mc:Ignorable': 'sap sap2010 sads',... 'TextExpression.NamespacesForImplementation': [{'sco:Collection': [{...}]}]}]}}

    ```
    :param input_str: The XML string to parse
    :type input_str: str
    :return: The parsed XML
    :rtype: dict
    """

    def _fix_escaped_xml(v):
        try:
            parsed_unescaped_xml = xmltodict.parse(v)
            _fix(parsed_unescaped_xml)
            return parsed_unescaped_xml
        except Exception as e:
            logger.debug(f"Error parsing escaped XML: {e}")

    # noinspection t, D
    def _fix(d):
        """fix the dict in place, recursively, standardizing on a list of dict even if there's only one entry."""
        # if it's a dict, descend to fix
        if isinstance(d, dict):
            for k, v in d.items():
                # @keys are properties of elements, non-@keys are elements
                if not k.startswith("@"):
                    if isinstance(v, dict):
                        # THE FIX
                        # any non-@keys should be a list of dict, even if there's just one of the element
                        d[k] = [v]
                        _fix(v)
                    else:
                        _fix(v)
                if isinstance(v, str) and (any(v.startswith(i) for i in ("<?xml", "<?XML")) or ("xmlns:" in v)):
                    d[k] = _fix_escaped_xml(v)
        # if it's a list, descend to fix
        if isinstance(d, list):
            for v in d:
                _fix(v)

    output = xmltodict.parse(input_str)
    _fix(output)
    return output

Rulesets

orbiter.rules.rulesets.Ruleset

A list of rules, which are evaluated to generate different types of output

You must pass a Rule (or dict with the schema of Rule)

>>> from orbiter.rules import rule
>>> @rule
... def x(val):
...    return None
>>> GenericRuleset(ruleset=[x, {"rule": lambda: None}])
... # doctest: +ELLIPSIS
GenericRuleset(ruleset=[Rule(...), ...])

Note

You can't pass non-Rules

>>> # noinspection PyTypeChecker
... GenericRuleset(ruleset=[None])
... # doctest: +ELLIPSIS
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
ruleset List[Rule | Callable[[Any], Any | None]]

List of Rule (or dict with the schema of Rule)

Methods:

Name Description
apply

Apply all rules in ruleset to a single item, in priority order, removing any None results.

apply_many

Apply a ruleset to each item in collection (such as dict().items())

apply

apply(
    take_first: bool = False, **kwargs
) -> List[Any] | Any

Apply all rules in ruleset to a single item, in priority order, removing any None results.

A ruleset with one rule can produce up to one result

>>> from orbiter.rules import rule

>>> @rule
... def gt_4(val):
...     return str(val) if val > 4 else None
>>> GenericRuleset(ruleset=[gt_4]).apply(val=5)
['5']

Many rules can produce many results, one for each rule.

>>> @rule
... def gt_3(val):
...    return str(val) if val > 3 else None
>>> GenericRuleset(ruleset=[gt_4, gt_3]).apply(val=5)
['5', '5']

The take_first flag will evaluate rules in the ruleset and return the first match

>>> GenericRuleset(ruleset=[gt_4, gt_3]).apply(val=5, take_first=True)
'5'

If nothing matched, an empty list is returned

>>> @rule
... def always_none(val):
...     return None
>>> @rule
... def more_always_none(val):
...     return None
>>> GenericRuleset(ruleset=[always_none, more_always_none]).apply(val=5)
[]

If nothing matched, and take_first=True, None is returned

>>> GenericRuleset(ruleset=[always_none, more_always_none]).apply(val=5, take_first=True)
... # None

Tip

If no input is given, an error is returned

>>> GenericRuleset(ruleset=[always_none]).apply()
Traceback (most recent call last):
RuntimeError: No values provided! Supply at least one key=val pair as kwargs!

Parameters:

Name Type Description
take_first bool

only take the first (if any) result from the ruleset application

kwargs

key=val pairs to pass to the evaluated rule function

Returns:

Type Description
List[Any] | Any | None

List of rules that evaluated to Any (in priority order), or an empty list, or Any (if take_first=True)

Raises:

Type Description
RuntimeError

if the Ruleset is empty or input_val is None

RuntimeError

if the Rule raises an exception

Source code in orbiter/rules/rulesets.py
@validate_call
def apply(self, take_first: bool = False, **kwargs) -> List[Any] | Any:
    """
    Apply all rules in ruleset **to a single item**, in priority order, removing any `None` results.

    A ruleset with one rule can produce **up to one** result
    ```pycon
    >>> from orbiter.rules import rule

    >>> @rule
    ... def gt_4(val):
    ...     return str(val) if val > 4 else None
    >>> GenericRuleset(ruleset=[gt_4]).apply(val=5)
    ['5']

    ```

    Many rules can produce many results, one for each rule.
    ```pycon
    >>> @rule
    ... def gt_3(val):
    ...    return str(val) if val > 3 else None
    >>> GenericRuleset(ruleset=[gt_4, gt_3]).apply(val=5)
    ['5', '5']

    ```

    The `take_first` flag will evaluate rules in the ruleset and return the first match
    ```pycon
    >>> GenericRuleset(ruleset=[gt_4, gt_3]).apply(val=5, take_first=True)
    '5'

    ```

    If nothing matched, an empty list is returned
    ```pycon
    >>> @rule
    ... def always_none(val):
    ...     return None
    >>> @rule
    ... def more_always_none(val):
    ...     return None
    >>> GenericRuleset(ruleset=[always_none, more_always_none]).apply(val=5)
    []

    ```

    If nothing matched, and `take_first=True`, `None` is returned
    ```pycon
    >>> GenericRuleset(ruleset=[always_none, more_always_none]).apply(val=5, take_first=True)
    ... # None

    ```

    !!! tip

        If no input is given, an error is returned
        ```pycon
        >>> GenericRuleset(ruleset=[always_none]).apply()
        Traceback (most recent call last):
        RuntimeError: No values provided! Supply at least one key=val pair as kwargs!

        ```

    :param take_first: only take the first (if any) result from the ruleset application
    :type take_first: bool
    :param kwargs: key=val pairs to pass to the evaluated rule function
    :returns: List of rules that evaluated to `Any` (in priority order),
                or an empty list,
                or `Any` (if `take_first=True`)
    :rtype: List[Any] | Any | None
    :raises RuntimeError: if the Ruleset is empty or input_val is None
    :raises RuntimeError: if the Rule raises an exception
    """
    if not len(kwargs):
        raise RuntimeError("No values provided! Supply at least one key=val pair as kwargs!")
    results = []
    for _rule in self._sorted():
        result = _rule(**kwargs)
        should_show_input = "val" in kwargs and not (
            isinstance(kwargs["val"], OrbiterProject) or isinstance(kwargs["val"], OrbiterDAG)
        )
        if result is not None:
            logger.debug(
                "---------\n"
                f"[RULESET MATCHED] '{self.__class__.__module__}.{self.__class__.__name__}'\n"
                f"[RULE MATCHED] '{_rule.__name__}'\n"
                f"[INPUT] {trim_dict(kwargs) if should_show_input else '<Skipping...>'}\n"
                f"[RETURN] {trim_dict(result)}\n"
                f"---------"
            )
            results.append(result)
            if take_first:
                return result
    return None if take_first and not len(results) else results

apply_many

apply_many(
    input_val: Collection[Any], take_first: bool = False
) -> List[List[Any]] | List[Any]

Apply a ruleset to each item in collection (such as dict().items()) and return any results that are not None

You can turn the output of apply_many into a dict, if the rule takes and returns a tuple

>>> from itertools import chain
>>> from orbiter.rules import rule

>>> @rule
... def filter_for_type_folder(val):
...   (key, val) = val
...   return (key, val) if val.get('Type', '') == 'Folder' else None
>>> ruleset = GenericRuleset(ruleset=[filter_for_type_folder])
>>> input_dict = {
...    "a": {"Type": "Folder"},
...    "b": {"Type": "File"},
...    "c": {"Type": "Folder"},
... }
>>> dict(chain(*chain(ruleset.apply_many(input_dict.items()))))
... # use dict(chain(*chain(...))), if using `take_first=True`, to turn many results back into dict
{'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}
>>> dict(ruleset.apply_many(input_dict.items(), take_first=True))
... # use dict(...) directly, if using `take_first=True`, to turn results back into dict
{'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}

Tip

You cannot pass input without length

>>> ruleset.apply_many({})
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
RuntimeError: Input is not Collection[Any] with length!

Parameters:

Name Type Description
input_val Collection[Any]

List to evaluate ruleset over

take_first bool

Only take the first (if any) result from each ruleset application

Returns:

Type Description
List[List[Any]] | List[Any]

List of list with all non-null evaluations for each item
or list of the first non-null evaluation for each item (if take_first=True)

Raises:

Type Description
RuntimeError

if the Ruleset or input_vals are empty

RuntimeError

if the Rule raises an exception

Source code in orbiter/rules/rulesets.py
def apply_many(
    self,
    input_val: Collection[Any],
    take_first: bool = False,
) -> List[List[Any]] | List[Any]:
    """
    Apply a ruleset to each item in collection (such as `dict().items()`)
    and return any results that are not `None`

    You can turn the output of `apply_many` into a dict, if the rule takes and returns a tuple
    ```pycon
    >>> from itertools import chain
    >>> from orbiter.rules import rule

    >>> @rule
    ... def filter_for_type_folder(val):
    ...   (key, val) = val
    ...   return (key, val) if val.get('Type', '') == 'Folder' else None
    >>> ruleset = GenericRuleset(ruleset=[filter_for_type_folder])
    >>> input_dict = {
    ...    "a": {"Type": "Folder"},
    ...    "b": {"Type": "File"},
    ...    "c": {"Type": "Folder"},
    ... }
    >>> dict(chain(*chain(ruleset.apply_many(input_dict.items()))))
    ... # use dict(chain(*chain(...))), if using `take_first=True`, to turn many results back into dict
    {'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}
    >>> dict(ruleset.apply_many(input_dict.items(), take_first=True))
    ... # use dict(...) directly, if using `take_first=True`, to turn results back into dict
    {'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}

    ```
    !!! tip

        You cannot pass input without length
        ```pycon
        >>> ruleset.apply_many({})
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        RuntimeError: Input is not Collection[Any] with length!

        ```
    :param input_val: List to evaluate ruleset over
    :type input_val: Collection[Any]
    :param take_first: Only take the first (if any) result from each ruleset application
    :type take_first: bool
    :returns: List of list with all non-null evaluations for each item<br>
              or list of the first non-null evaluation for each item (if `take_first=True`)
    :rtype: List[List[Any]] | List[Any]
    :raises RuntimeError: if the Ruleset or input_vals are empty
    :raises RuntimeError: if the Rule raises an exception
    """
    # Validate Input
    if not input_val or not len(input_val):
        raise RuntimeError("Input is not `Collection[Any]` with length!")

    return [
        results[0] if take_first else results
        for item in input_val
        if (results := self.apply(take_first=False, val=item)) is not None and len(results)
    ]

orbiter.rules.rulesets.DAGFilterRuleset

Bases: Ruleset

Ruleset of DAGFilterRule

Methods:

Name Description
apply_ruleset

Apply all rules from [DAG Filter Ruleset][orbiter.rules.ruleset.DAGFilterRuleset] to filter down to keys

apply_ruleset

apply_ruleset(
    input_dict: dict, file: Path, input_dir: Path
) -> list[dict]

Apply all rules from [DAG Filter Ruleset][orbiter.rules.ruleset.DAGFilterRuleset] to filter down to keys that look like they can be translated to a DAG, in priority order.

Note

The file is added under a __file key to both input and output dictionaries, for use in future rules. The input dir is added under a __input_dir key to both input and output dictionaries, for use in future rules.

Parameters:

Name Type Description
input_dict dict

The input dictionary to filter, e.g. the file that was loaded and converted into a python dict

file Path

The file relative to the input directory's parent (is added under '__file' key)

input_dir Path

The input directory (is added under '__input_dir' key)

Returns:

Type Description
list[dict]

A list of dictionaries that look like they can be translated to a DAG

Source code in orbiter/rules/rulesets.py
def apply_ruleset(self, input_dict: dict, file: Path, input_dir: Path) -> list[dict]:
    """Apply all rules from [DAG Filter Ruleset][orbiter.rules.ruleset.DAGFilterRuleset] to filter down to keys
    that look like they can be translated to a DAG, in priority order.

    !!! note

        The file is added under a `__file` key to both input and output dictionaries, for use in future rules.
        The input dir is added under a `__input_dir` key to both input and output dictionaries, for use in future rules.

    :param input_dict: The input dictionary to filter, e.g. the file that was loaded and converted into a python dict
    :param file: The file relative to the input directory's parent (is added under '__file' key)
    :param input_dir: The input directory (is added under '__input_dir' key)
    :return: A list of dictionaries that look like they can be translated to a DAG
    """

    def with_file(d: dict) -> dict:
        try:
            return {"__file": file, "__input_dir": input_dir} | d
        except TypeError as e:
            logger.opt(exception=e).debug("Unable to add one or both of `__file` or `__input_dir` keys")
            return d

    return [with_file(dag_dict) for dag_dict in functools.reduce(add, self.apply(val=with_file(input_dict)), [])]

orbiter.rules.rulesets.DAGRuleset

Bases: Ruleset

Ruleset of DAGRule

Methods:

Name Description
apply_ruleset

Apply all rules from DAGRuleset to convert the object to an OrbiterDAG,

apply_ruleset

apply_ruleset(dag_dict: dict) -> OrbiterDAG | None

Apply all rules from DAGRuleset to convert the object to an OrbiterDAG, in priority order, stopping when the first rule returns a match. If no rule returns a match, the entry is filtered.

Note

The file is retained via OrbiterDAG.orbiter_kwargs['val']['__file'], for use in future rules.

Parameters:

Name Type Description
dag_dict dict

DAG Candidate, filtered via @dag_filter_rules

Returns:

Type Description
OrbiterDAG | None

An OrbiterDAG object

Source code in orbiter/rules/rulesets.py
def apply_ruleset(self, dag_dict: dict) -> OrbiterDAG | None:
    """Apply all rules from `DAGRuleset` to convert the object to an `OrbiterDAG`,
    in priority order, stopping when the first rule returns a match.
    If no rule returns a match, the entry is filtered.

    !!! note

        The file is retained via `OrbiterDAG.orbiter_kwargs['val']['__file']`, for use in future rules.

    :param dag_dict: DAG Candidate, filtered via @dag_filter_rules
    :return: An `OrbiterDAG` object
    """
    return self.apply(val=dag_dict, take_first=True)

orbiter.rules.rulesets.TaskFilterRuleset

Bases: Ruleset

Ruleset of TaskFilterRule

Methods:

Name Description
apply_ruleset

Apply all rules from TaskFilterRuleset to filter down to keys that look like they can be translated

apply_ruleset

apply_ruleset(dag_dict: dict) -> list[dict]

Apply all rules from TaskFilterRuleset to filter down to keys that look like they can be translated to a Task, in priority order.

Many entries in the dag_dict result in many task_dicts.

Parameters:

Name Type Description
dag_dict dict

The dict to filter - the same dict that was passed to the DAG ruleset

Returns:

Type Description
list[dict]

A list of dictionaries that look like they can be translated to a Task

Source code in orbiter/rules/rulesets.py
def apply_ruleset(self, dag_dict: dict) -> list[dict]:
    """Apply all rules from `TaskFilterRuleset` to filter down to keys that look like they can be translated
     to a Task, in priority order.

    Many entries in the dag_dict result in many task_dicts.

    :param dag_dict: The dict to filter - the same dict that was passed to the DAG ruleset
    :return: A list of dictionaries that look like they can be translated to a Task
    """
    return functools.reduce(add, self.apply(val=dag_dict), [])

orbiter.rules.rulesets.TaskRuleset

Bases: Ruleset

Ruleset of TaskRule

Methods:

Name Description
apply_ruleset

Apply all rules from TaskRuleset to convert the object to a Task, in priority order,

apply_ruleset

apply_ruleset(
    task_dict: dict,
) -> OrbiterOperator | OrbiterTaskGroup

Apply all rules from TaskRuleset to convert the object to a Task, in priority order, stopping when the first rule returns a match.

One task_dict makes up to one task (unless it produces a TaskGroup, which can contain many tasks). If no rule returns a match, the entry is filtered.

Parameters:

Name Type Description
task_dict dict

A dict to translate - what was returned from the Task Filter ruleset

Returns:

Type Description
OrbiterOperator | OrbiterTaskGroup

An OrbiterOperator (or descendant) or [OrbiterTaskGroup][orbiter.objects.task.OrbiterTaskGroup]

Source code in orbiter/rules/rulesets.py
def apply_ruleset(self, task_dict: dict) -> OrbiterOperator | OrbiterTaskGroup:
    """Apply all rules from `TaskRuleset` to convert the object to a Task, in priority order,
    stopping when the first rule returns a match.

    One task_dict makes up to one task (unless it produces a TaskGroup, which can contain many tasks).
    If no rule returns a match, the entry is filtered.

    :param task_dict: A dict to translate - what was returned from the Task Filter ruleset
    :return: An [OrbiterOperator][orbiter.objects.task.OrbiterOperator] (or descendant) or [OrbiterTaskGroup][orbiter.objects.task.OrbiterTaskGroup]
    """
    return self.apply(val=task_dict, take_first=True)

orbiter.rules.rulesets.TaskDependencyRuleset

Bases: Ruleset

Ruleset of TaskDependencyRule

Methods:

Name Description
apply_ruleset

Apply all rules from TaskDependencyRuleset to create a list of

apply_ruleset

apply_ruleset(
    dag: OrbiterDAG,
) -> list[OrbiterTaskDependency]

Apply all rules from TaskDependencyRuleset to create a list of OrbiterTaskDependency, which are then added in-place to each task in the OrbiterDAG.

Parameters:

Name Type Description
dag OrbiterDAG

The DAG to add the task dependencies to

Returns:

Type Description
list[OrbiterTaskDependency]
Source code in orbiter/rules/rulesets.py
def apply_ruleset(self, dag: OrbiterDAG) -> list[OrbiterTaskDependency]:
    """Apply all rules from `TaskDependencyRuleset` to create a list of
    [`OrbiterTaskDependency`][orbiter.objects.task.OrbiterTaskDependency],
    which are then added in-place to each task in the [`OrbiterDAG`][orbiter.objects.dag.OrbiterDAG].

    :param dag: The DAG to add the task dependencies to
    :return: A list of [`OrbiterTaskDependency`][orbiter.objects.task.OrbiterTaskDependency]
    """
    return list(chain(*self.apply(val=dag))) or []

orbiter.rules.rulesets.PostProcessingRuleset

Bases: Ruleset

Ruleset of PostProcessingRule

Methods:

Name Description
apply_ruleset

Apply all rules from PostProcessingRuleset to modify project in-place

apply_ruleset

apply_ruleset(project: OrbiterProject) -> None

Apply all rules from PostProcessingRuleset to modify project in-place

Parameters:

Name Type Description
project OrbiterProject

The OrbiterProject to modify

Returns:

Type Description
None

None

Source code in orbiter/rules/rulesets.py
def apply_ruleset(self, project: OrbiterProject) -> None:
    """Apply all rules from `PostProcessingRuleset` to modify project in-place

    :param project: The OrbiterProject to modify
    :return: None
    """
    self.apply(val=project, take_first=False)