Skip to content

Rules

orbiter.rules.Rule

A Rule contains a python function that is evaluated and produces something (typically an Object) or nothing

A Rule can be created from a decorator

>>> @rule(priority=1)
... def my_rule(val):
...     return 1
>>> isinstance(my_rule, Rule)
True
>>> my_rule(val={})
1

The function in a rule takes one parameter (val), and must always evaluate to something or nothing.

>>> some_rule = Rule(rule=lambda val: 4)  # returns something
>>> some_rule(val={})  # always takes "val", in this case a dict
4
>>> other_rule = Rule(rule=lambda val: None)  # returns nothing
>>> other_rule(val={})
Depending on the type of rule, the input val may be a dictionary or a different type. Refer to each type of rule for more details.

Tip

If the returned value is an Orbiter Object, the input kwargs are saved in a special orbiter_kwargs property, along with information about the matching rule in an orbiter_meta property.

>>> from orbiter.rules import task_rule
>>> @task_rule(params_doc={"id": "BashOperator.task_id", "command": "BashOperator.bash_command"})
... def my_rule(val: dict):
...     '''This rule takes that and gives this'''
...     from orbiter.objects.operators.bash import OrbiterBashOperator
...     if 'command' in val:
...         return OrbiterBashOperator(task_id=val['id'], bash_command=val['command'])
...     else:
...         return None
>>> kwargs = {"id": "foo", "command": "echo 'hello world'", "unvisited_key": "bar"}
>>> match = my_rule(val=kwargs)
>>> type(match)
<class 'orbiter.objects.operators.bash.OrbiterBashOperator'>
>>> match.orbiter_kwargs
{'val': {'id': 'foo', 'command': "echo 'hello world'", 'unvisited_key': 'bar'}}
>>> match.orbiter_meta
... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
OrbiterMeta(matched_rule_source='@task_rule...',
    matched_rule_docstring='This rule takes that and gives this',
    matched_rule_params_doc={'id': 'BashOperator.task_id', 'command': 'BashOperator.bash_command'},
    matched_rule_name='my_rule',
    matched_rule_priority=1,
    visited_keys=['command', 'id'])

Note

A Rule must have a rule property and extra properties cannot be passed

>>> # noinspection Pydantic
... Rule(rule=lambda: None, not_a_prop="???")
... # doctest: +ELLIPSIS
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
rule Callable[[dict | Any], Any | None]

Python function to evaluate. Takes a single argument and returns something or nothing

priority (int, optional)

Higher priority rules are evaluated first, must be greater than 0. Default is 0

params_doc (dict[str, str], optional)

What the rule takes as input (key), and gives as output (value)

orbiter.rules.DAGFilterRule

Bases: Rule

The @dag_filter_rule decorator creates a DAGFilterRule

@dag_filter_rule
def foo(val: dict) -> List[dict]:
    return [{"dag_id": "foo"}]

Hint

In addition to filtering, a DAGFilterRule can also map input to a more reasonable output for later processing

orbiter.rules.DAGRule

Bases: Rule

A @dag_rule decorator creates a DAGRule

Tip

A __file key is added to the original input, which is the file path of the input.

@dag_rule
def foo(val: dict) -> OrbiterDAG | None:
    if "id" in val:
        return OrbiterDAG(dag_id=val["id"], file_path=f"{val['id']}.py")
    else:
        return None

orbiter.rules.TaskFilterRule

Bases: Rule

A @task_filter_rule decorator creates a TaskFilterRule

@task_filter_rule
def foo(val: dict) -> List[dict] | None:
    return [{"task_id": "foo"}]

Hint

In addition to filtering, a TaskFilterRule can also map input to a more reasonable output for later processing

Parameters:

Name Type Description
val dict

A dictionary of the task

Returns:

Type Description
List[dict] | None

A list of dictionaries of possible tasks or None

orbiter.rules.TaskRule

Bases: Rule

A @task_rule decorator creates a TaskRule

@task_rule
def foo(val: dict) -> OrbiterOperator | OrbiterTaskGroup:
    if "id" in val and "command" in val:
        return OrbiterBashOperator(
            task_id=val["id"], bash_command=val["command"]
        )
    else:
        return None

Parameters:

Name Type Description
val dict

A dictionary of the task

Returns:

Type Description
OrbiterOperator | OrbiterTaskGroup | None

A subclass of OrbiterOperator or OrbiterTaskGroup or None

orbiter.rules.TaskDependencyRule

Bases: Rule

An @task_dependency_rule decorator creates a TaskDependencyRule, which takes an OrbiterDAG and returns a list[OrbiterTaskDependency] or None

@task_dependency_rule
def foo(val: OrbiterDAG) -> OrbiterTaskDependency:
    return [OrbiterTaskDependency(task_id="upstream", downstream="downstream")]

Parameters:

Name Type Description
val OrbiterDAG

Returns:

Type Description
List[OrbiterTaskDependency] | None

A list of OrbiterTaskDependency or None

orbiter.rules.PostProcessingRule

Bases: Rule

An @post_processing_rule decorator creates a PostProcessingRule, which takes an OrbiterProject, after all other rules have been applied, and modifies it in-place.

@post_processing_rule
def foo(val: OrbiterProject) -> None:
    val.dags["foo"].tasks["bar"].doc = "Hello World"

Parameters:

Name Type Description
val OrbiterProject

Returns:

Type Description
None

None