Skip to content

Translation

orbiter.rules.rulesets.TranslationRuleset

A Ruleset is a collection of Rules that are evaluated in priority order

A TranslationRuleset is a container for Rulesets, which applies to a specific translation

>>> TranslationRuleset(
...   file_type={FileTypeJSON},                                      # Has a file type
...   translate_fn=fake_translate,                                  # and can have a callable
...   # translate_fn="orbiter.rules.translate.fake_translate",      # or a qualified name to a function
...   dag_filter_ruleset={"ruleset": [{"rule": lambda x: None}]},   # Rulesets can be dict within dicts
...   dag_ruleset=DAGRuleset(ruleset=[Rule(rule=lambda x: None)]),  # or objects within objects
...   task_filter_ruleset=EMPTY_RULESET,                            # or a mix
...   task_ruleset=EMPTY_RULESET,
...   task_dependency_ruleset=EMPTY_RULESET,                        # Omitted for brevity
...   post_processing_ruleset=EMPTY_RULESET,
... )
TranslationRuleset(...)

Parameters:

Name Type Description
file_type Set[Type[FileType]]

FileType to translate

dag_filter_ruleset DAGFilterRuleset | dict
dag_ruleset DAGRuleset | dict
task_filter_ruleset TaskFilterRuleset | dict
task_ruleset TaskRuleset | dict
task_dependency_ruleset TaskDependencyRuleset | dict
post_processing_ruleset PostProcessingRuleset | dict
translate_fn Callable[[TranslationRuleset, Path], OrbiterProject] | str | TranslateFn

Either a qualified name to a function (e.g. path.to.file.function), or a function reference, with the signature:
(translation_ruleset: Translation Ruleset, input_dir: Path) -> OrbiterProject

Methods:

Name Description
dumps

Convert Python dictionary back to source string form, useful for testing

get_ext

Get the first file extension for this ruleset

get_files_with_extension

A generator that yields files with a specific extension(s) in a directory

loads

Converts all files of type into a Python dictionary "intermediate representation" form,

test

Test an input against the whole ruleset.

dumps

dumps(input_dict: dict, ext: str | None = None) -> str

Convert Python dictionary back to source string form, useful for testing

Parameters:

Name Type Description
input_dict dict

The dictionary to convert to a string

ext str | None

The file type extension to dump as, defaults to first 'file_type' in the set

Returns:

Type Description
str

The string representation of the input_dict, in the file_type format

Source code in orbiter/rules/rulesets.py
@validate_call
def dumps(self, input_dict: dict, ext: str | None = None) -> str:
    """
    Convert Python dictionary back to source string form, useful for testing

    :param input_dict: The dictionary to convert to a string
    :type input_dict: dict
    :param ext: The file type extension to dump as, defaults to first 'file_type' in the set
    :type ext: str | None
    :return str: The string representation of the input_dict, in the file_type format
    :rtype: str
    """
    for file_type in self.file_type:
        if ext is None or ext.lower() in file_type.extension:
            return file_type.dump_fn(input_dict)
    raise TypeError(f"Invalid file_type={ext}")

get_ext

get_ext() -> str

Get the first file extension for this ruleset

>>> EMPTY_TRANSLATION_RULESET.get_ext()
'JSON'
Source code in orbiter/rules/rulesets.py
def get_ext(self) -> str:
    """
    Get the first file extension for this ruleset

    ```pycon
    >>> EMPTY_TRANSLATION_RULESET.get_ext()
    'JSON'

    ```
    """
    return next(iter(next(iter(self.file_type)).extension))

get_files_with_extension

get_files_with_extension(
    input_dir: Path,
) -> Generator[Path, dict]

A generator that yields files with a specific extension(s) in a directory

Parameters:

Name Type Description
input_dir Path

The directory to search in

Returns:

Type Description
Generator[Path, dict]

Generator item of (Path, dict) for each file found

Source code in orbiter/rules/rulesets.py
def get_files_with_extension(self, input_dir: Path) -> Generator[Path, dict]:
    """
    A generator that yields files with a specific extension(s) in a directory

    :param input_dir: The directory to search in
    :type input_dir: Path
    :return: Generator item of (Path, dict) for each file found
    :rtype: Generator[Path, dict]
    """
    for directory, _, files in input_dir.walk() if hasattr(input_dir, "walk") else _backport_walk(input_dir):
        logger.debug(f"Checking directory={directory}")
        for file in files:
            file = directory / file
            # noinspection PyBroadException
            try:
                yield (
                    # Return the file path
                    file,
                    # and load the file and convert it into a python dict
                    self.loads(file),
                )
            except TypeError:
                logger.debug(f"File={file} not of correct type, skipping...")
                continue

loads

loads(file: Path) -> dict

Converts all files of type into a Python dictionary "intermediate representation" form, prior to any rulesets being applied.

Parameters:

Name Type Description
file Path

The file to load

Returns:

Type Description
dict

The dictionary representation of the input_str

Source code in orbiter/rules/rulesets.py
@validate_call
def loads(self, file: Path) -> dict:
    """
    Converts all files of type into a Python dictionary "intermediate representation" form,
    prior to any rulesets being applied.

    :param file: The file to load
    :type file: Path
    :return: The dictionary representation of the input_str
    :rtype: dict
    """
    for file_type in self.file_type:
        if file.suffix.lower() in {f".{ext.lower()}" for ext in file_type.extension}:
            try:
                return file_type.load_fn(file.read_text())
            except Exception as e:
                logger.error(f"Error loading file={file}! Skipping!\n{e}")
                continue
    raise TypeError(f"Invalid file_type={file.suffix}, does not match file_type={self.file_type}")

test

test(input_value: str | dict) -> OrbiterProject

Test an input against the whole ruleset. - 'input_dict' (a parsed python dict) - or 'input_str' (raw value) to test against the ruleset.

Parameters:

Name Type Description
input_value str | dict

The input to test can be either a dict (passed to translate_ruleset.dumps() before translate_ruleset.loads()) or a string (read directly by translate_ruleset.loads())

Returns:

Type Description
OrbiterProject

OrbiterProject produced after applying the ruleset

Source code in orbiter/rules/rulesets.py
def test(self, input_value: str | dict) -> OrbiterProject:
    """
    Test an input against the whole ruleset.
    - 'input_dict' (a parsed python dict)
    - or 'input_str' (raw value) to test against the ruleset.

    :param input_value: The input to test
        can be either a dict (passed to `translate_ruleset.dumps()` before `translate_ruleset.loads()`)
        or a string (read directly by `translate_ruleset.loads()`)
    :type input_value: str | dict
    :return: OrbiterProject produced after applying the ruleset
    :rtype: OrbiterProject
    """
    with TemporaryDirectory() as tempdir:
        file = Path(tempdir) / f"{uuid.uuid4()}.{self.get_ext()}"
        file.write_text(self.dumps(input_value) if isinstance(input_value, dict) else input_value)
        return self.translate_fn(translation_ruleset=self, input_dir=file.parent)

orbiter.file_types

Classes:

Name Description
FileType

Abstract Base File Type

FileTypeJIL

JIL File Type

FileTypeJSON

JSON File Type

FileTypeXML

XML File Type

FileTypeYAML

YAML File Type

Functions:

Name Description
xmltodict_parse

Calls xmltodict.parse and does post-processing fixes.

FileType

Abstract Base File Type

Parameters:

Name Type Description
extension Set[str]

The file extension(s) for this file type

load_fn Callable[[str], dict]

The function to load the file into a dictionary for this file type

dump_fn Callable[[dict], str]

The function to dump a dictionary to a string for this file type

FileTypeJIL

JIL File Type

Parameters:

Name Type Description
extension Set[str]

JIL

load_fn Callable[[str], dict]

custom JIL loading function

dump_fn Callable[[dict], str]

custom JIL dumping function

FileTypeJSON

JSON File Type

>>> out = FileTypeJSON.dump_fn({'a': 1}); out
'{"a": 1}'
>>> FileTypeJSON.load_fn(out)
{'a': 1}

Parameters:

Name Type Description
extension Set[str]

JSON

load_fn Callable[[str], dict]

json.loads

dump_fn Callable[[dict], str]

json.dumps

FileTypeXML

XML File Type

Note

This class uses a custom xmltodict_parse method to standardize the output to a list of dictionaries

>>> out = FileTypeXML.dump_fn({'a': 1}); out
'<?xml version="1.0" encoding="utf-8"?>\n<a>1</a>'
>>> FileTypeXML.load_fn(out)
{'a': '1'}

Parameters:

Name Type Description
extension Set[str]

XML

load_fn Callable[[str], dict]

xmltodict_parse

dump_fn Callable[[dict], str]

xmltodict.unparse

FileTypeYAML

YAML File Type

>>> out = FileTypeYAML.dump_fn({'a': 1}); out
'a: 1\n'
>>> FileTypeYAML.load_fn(out)
{'a': 1}

Parameters:

Name Type Description
extension Set[str]

YAML, YML

load_fn Callable[[str], dict]

yaml.safe_load

dump_fn Callable[[dict], str]

yaml.safe_dump

xmltodict_parse

xmltodict_parse(input_str: str) -> Any

Calls xmltodict.parse and does post-processing fixes.

Note

The original xmltodict.parse method returns EITHER:

  • a dict (one child element of type)
  • or a list of dict (many child element of type)

This behavior can be confusing, and is an issue with the original xml spec being referenced.

This method deviates by standardizing to the latter case (always a list[dict]).

All XML elements will be a list of dictionaries, even if there's only one element.

>>> xmltodict_parse("")
Traceback (most recent call last):
xml.parsers.expat.ExpatError: no element found: line 1, column 0
>>> xmltodict_parse("<a></a>")
{'a': None}
>>> xmltodict_parse("<a foo='bar'></a>")
{'a': [{'@foo': 'bar'}]}
>>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo></a>")  # Singleton - gets modified
{'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}]}]}
>>> xmltodict_parse("<a foo='bar'><foo bar='baz'><bar><bop></bop></bar></foo></a>")  # Nested Singletons - modified
{'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz', 'bar': [{'bop': None}]}]}]}
>>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo><foo bing='bop'></foo></a>")
{'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}, {'@bing': 'bop'}]}]}

Parameters:

Name Type Description
input_str str

The XML string to parse

Returns:

Type Description
dict

The parsed XML

Source code in orbiter/file_types.py
def xmltodict_parse(input_str: str) -> Any:
    """Calls `xmltodict.parse` and does post-processing fixes.

    !!! note

        The original [`xmltodict.parse`](https://pypi.org/project/xmltodict/) method returns EITHER:

        - a dict (one child element of type)
        - or a list of dict (many child element of type)

        This behavior can be confusing, and is an issue with the original xml spec being referenced.

        **This method deviates by standardizing to the latter case (always a `list[dict]`).**

        **All XML elements will be a list of dictionaries, even if there's only one element.**

    ```pycon
    >>> xmltodict_parse("")
    Traceback (most recent call last):
    xml.parsers.expat.ExpatError: no element found: line 1, column 0
    >>> xmltodict_parse("<a></a>")
    {'a': None}
    >>> xmltodict_parse("<a foo='bar'></a>")
    {'a': [{'@foo': 'bar'}]}
    >>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo></a>")  # Singleton - gets modified
    {'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}]}]}
    >>> xmltodict_parse("<a foo='bar'><foo bar='baz'><bar><bop></bop></bar></foo></a>")  # Nested Singletons - modified
    {'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz', 'bar': [{'bop': None}]}]}]}
    >>> xmltodict_parse("<a foo='bar'><foo bar='baz'></foo><foo bing='bop'></foo></a>")
    {'a': [{'@foo': 'bar', 'foo': [{'@bar': 'baz'}, {'@bing': 'bop'}]}]}

    ```
    :param input_str: The XML string to parse
    :type input_str: str
    :return: The parsed XML
    :rtype: dict
    """

    # noinspection t
    def _fix(d):
        """fix the dict in place, recursively, standardizing on a list of dict even if there's only one entry."""
        # if it's a dict, descend to fix
        if isinstance(d, dict):
            for k, v in d.items():
                # @keys are properties of elements, non-@keys are elements
                if not k.startswith("@"):
                    if isinstance(v, dict):
                        # THE FIX
                        # any non-@keys should be a list of dict, even if there's just one of the element
                        d[k] = [v]
                        _fix(v)
                    else:
                        _fix(v)
        # if it's a list, descend to fix
        if isinstance(d, list):
            for v in d:
                _fix(v)

    output = xmltodict.parse(input_str)
    _fix(output)
    return output

Rulesets

orbiter.rules.rulesets.Ruleset

A list of rules, which are evaluated to generate different types of output

You must pass a Rule (or dict with the schema of Rule)

>>> from orbiter.rules import rule
>>> @rule
... def x(val):
...    return None
>>> Ruleset(ruleset=[x, {"rule": lambda: None}])
... # doctest: +ELLIPSIS
Ruleset(ruleset=[Rule(...), Rule(...)])

Note

You can't pass non-Rules

>>> # noinspection PyTypeChecker
... Ruleset(ruleset=[None])
... # doctest: +ELLIPSIS
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
ruleset List[Rule | Callable[[Any], Any | None]]

List of Rule (or dict with the schema of Rule)

Methods:

Name Description
apply

Apply all rules in ruleset to a single item, in priority order, removing any None results.

apply_many

Apply a ruleset to each item in collection (such as dict().items())

apply

apply(
    take_first: bool = False, **kwargs
) -> List[Any] | Any

Apply all rules in ruleset to a single item, in priority order, removing any None results.

A ruleset with one rule can produce up to one result

>>> from orbiter.rules import rule

>>> @rule
... def gt_4(val):
...     return str(val) if val > 4 else None
>>> Ruleset(ruleset=[gt_4]).apply(val=5)
['5']

Many rules can produce many results, one for each rule.

>>> @rule
... def gt_3(val):
...    return str(val) if val > 3 else None
>>> Ruleset(ruleset=[gt_4, gt_3]).apply(val=5)
['5', '5']

The take_first flag will evaluate rules in the ruleset and return the first match

>>> Ruleset(ruleset=[gt_4, gt_3]).apply(val=5, take_first=True)
'5'

If nothing matched, an empty list is returned

>>> @rule
... def always_none(val):
...     return None
>>> @rule
... def more_always_none(val):
...     return None
>>> Ruleset(ruleset=[always_none, more_always_none]).apply(val=5)
[]

If nothing matched, and take_first=True, None is returned

>>> Ruleset(ruleset=[always_none, more_always_none]).apply(val=5, take_first=True)
... # None

Tip

If no input is given, an error is returned

>>> Ruleset(ruleset=[always_none]).apply()
Traceback (most recent call last):
RuntimeError: No values provided! Supply at least one key=val pair as kwargs!

Parameters:

Name Type Description
take_first bool

only take the first (if any) result from the ruleset application

kwargs

key=val pairs to pass to the evaluated rule function

Returns:

Type Description
List[Any] | Any | None

List of rules that evaluated to Any (in priority order), or an empty list, or Any (if take_first=True)

Raises:

Type Description
RuntimeError

if the Ruleset is empty or input_val is None

RuntimeError

if the Rule raises an exception

Source code in orbiter/rules/rulesets.py
@validate_call
def apply(self, take_first: bool = False, **kwargs) -> List[Any] | Any:
    """
    Apply all rules in ruleset **to a single item**, in priority order, removing any `None` results.

    A ruleset with one rule can produce **up to one** result
    ```pycon
    >>> from orbiter.rules import rule

    >>> @rule
    ... def gt_4(val):
    ...     return str(val) if val > 4 else None
    >>> Ruleset(ruleset=[gt_4]).apply(val=5)
    ['5']

    ```

    Many rules can produce many results, one for each rule.
    ```pycon
    >>> @rule
    ... def gt_3(val):
    ...    return str(val) if val > 3 else None
    >>> Ruleset(ruleset=[gt_4, gt_3]).apply(val=5)
    ['5', '5']

    ```

    The `take_first` flag will evaluate rules in the ruleset and return the first match
    ```pycon
    >>> Ruleset(ruleset=[gt_4, gt_3]).apply(val=5, take_first=True)
    '5'

    ```

    If nothing matched, an empty list is returned
    ```pycon
    >>> @rule
    ... def always_none(val):
    ...     return None
    >>> @rule
    ... def more_always_none(val):
    ...     return None
    >>> Ruleset(ruleset=[always_none, more_always_none]).apply(val=5)
    []

    ```

    If nothing matched, and `take_first=True`, `None` is returned
    ```pycon
    >>> Ruleset(ruleset=[always_none, more_always_none]).apply(val=5, take_first=True)
    ... # None

    ```

    !!! tip

        If no input is given, an error is returned
        ```pycon
        >>> Ruleset(ruleset=[always_none]).apply()
        Traceback (most recent call last):
        RuntimeError: No values provided! Supply at least one key=val pair as kwargs!

        ```

    :param take_first: only take the first (if any) result from the ruleset application
    :type take_first: bool
    :param kwargs: key=val pairs to pass to the evaluated rule function
    :returns: List of rules that evaluated to `Any` (in priority order),
                or an empty list,
                or `Any` (if `take_first=True`)
    :rtype: List[Any] | Any | None
    :raises RuntimeError: if the Ruleset is empty or input_val is None
    :raises RuntimeError: if the Rule raises an exception
    """
    if not len(kwargs):
        raise RuntimeError("No values provided! Supply at least one key=val pair as kwargs!")
    results = []
    for _rule in self._sorted():
        result = _rule(**kwargs)
        should_show_input = "val" in kwargs and not (
            isinstance(kwargs["val"], OrbiterProject) or isinstance(kwargs["val"], OrbiterDAG)
        )
        if result is not None:
            logger.debug(
                "---------\n"
                f"[RULESET MATCHED] '{self.__class__.__module__}.{self.__class__.__name__}'\n"
                f"[RULE MATCHED] '{_rule.__name__}'\n"
                f"[INPUT] {trim_dict(kwargs) if should_show_input else '<Skipping...>'}\n"
                f"[RETURN] {trim_dict(result)}\n"
                f"---------"
            )
            results.append(result)
            if take_first:
                return result
    return None if take_first and not len(results) else results

apply_many

apply_many(
    input_val: Collection[Any], take_first: bool = False
) -> List[List[Any]] | List[Any]

Apply a ruleset to each item in collection (such as dict().items()) and return any results that are not None

You can turn the output of apply_many into a dict, if the rule takes and returns a tuple

>>> from itertools import chain
>>> from orbiter.rules import rule

>>> @rule
... def filter_for_type_folder(val):
...   (key, val) = val
...   return (key, val) if val.get('Type', '') == 'Folder' else None
>>> ruleset = Ruleset(ruleset=[filter_for_type_folder])
>>> input_dict = {
...    "a": {"Type": "Folder"},
...    "b": {"Type": "File"},
...    "c": {"Type": "Folder"},
... }
>>> dict(chain(*chain(ruleset.apply_many(input_dict.items()))))
... # use dict(chain(*chain(...))), if using `take_first=True`, to turn many results back into dict
{'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}
>>> dict(ruleset.apply_many(input_dict.items(), take_first=True))
... # use dict(...) directly, if using `take_first=True`, to turn results back into dict
{'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}

Tip

You cannot pass input without length

>>> ruleset.apply_many({})
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
RuntimeError: Input is not Collection[Any] with length!

Parameters:

Name Type Description
input_val Collection[Any]

List to evaluate ruleset over

take_first bool

Only take the first (if any) result from each ruleset application

Returns:

Type Description
List[List[Any]] | List[Any]

List of list with all non-null evaluations for each item
or list of the first non-null evaluation for each item (if take_first=True)

Raises:

Type Description
RuntimeError

if the Ruleset or input_vals are empty

RuntimeError

if the Rule raises an exception

Source code in orbiter/rules/rulesets.py
def apply_many(
    self,
    input_val: Collection[Any],
    take_first: bool = False,
) -> List[List[Any]] | List[Any]:
    """
    Apply a ruleset to each item in collection (such as `dict().items()`)
    and return any results that are not `None`

    You can turn the output of `apply_many` into a dict, if the rule takes and returns a tuple
    ```pycon
    >>> from itertools import chain
    >>> from orbiter.rules import rule

    >>> @rule
    ... def filter_for_type_folder(val):
    ...   (key, val) = val
    ...   return (key, val) if val.get('Type', '') == 'Folder' else None
    >>> ruleset = Ruleset(ruleset=[filter_for_type_folder])
    >>> input_dict = {
    ...    "a": {"Type": "Folder"},
    ...    "b": {"Type": "File"},
    ...    "c": {"Type": "Folder"},
    ... }
    >>> dict(chain(*chain(ruleset.apply_many(input_dict.items()))))
    ... # use dict(chain(*chain(...))), if using `take_first=True`, to turn many results back into dict
    {'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}
    >>> dict(ruleset.apply_many(input_dict.items(), take_first=True))
    ... # use dict(...) directly, if using `take_first=True`, to turn results back into dict
    {'a': {'Type': 'Folder'}, 'c': {'Type': 'Folder'}}

    ```
    !!! tip

        You cannot pass input without length
        ```pycon
        >>> ruleset.apply_many({})
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        RuntimeError: Input is not Collection[Any] with length!

        ```
    :param input_val: List to evaluate ruleset over
    :type input_val: Collection[Any]
    :param take_first: Only take the first (if any) result from each ruleset application
    :type take_first: bool
    :returns: List of list with all non-null evaluations for each item<br>
              or list of the first non-null evaluation for each item (if `take_first=True`)
    :rtype: List[List[Any]] | List[Any]
    :raises RuntimeError: if the Ruleset or input_vals are empty
    :raises RuntimeError: if the Rule raises an exception
    """
    # Validate Input
    if not input_val or not len(input_val):
        raise RuntimeError("Input is not `Collection[Any]` with length!")

    return [
        results[0] if take_first else results
        for item in input_val
        if (results := self.apply(take_first=False, val=item)) is not None and len(results)
    ]

orbiter.rules.rulesets.DAGFilterRuleset

Bases: Ruleset

Ruleset of DAGFilterRule

orbiter.rules.rulesets.DAGRuleset

Bases: Ruleset

Ruleset of DAGRule

orbiter.rules.rulesets.TaskFilterRuleset

Bases: Ruleset

Ruleset of TaskFilterRule

orbiter.rules.rulesets.TaskRuleset

Bases: Ruleset

Ruleset of TaskRule

orbiter.rules.rulesets.TaskDependencyRuleset

Bases: Ruleset

Ruleset of TaskDependencyRule

orbiter.rules.rulesets.PostProcessingRuleset

Bases: Ruleset

Ruleset of PostProcessingRule