Skip to content

Rules

orbiter.rules.Rule

A Rule contains a python function that is evaluated and produces something (typically an Object) or nothing

A Rule can be created from a decorator

>>> @rule(priority=1)
... def my_rule(val):
...     return 1
>>> isinstance(my_rule, Rule)
True
>>> my_rule(val={})
1

The function in a rule takes one parameter (val), and must always evaluate to something or nothing.

>>> Rule(rule=lambda val: 4)({})
4
>>> Rule(rule=lambda val: None)({})

Tip

If the returned value is an Orbiter Object, the passed kwargs are saved in a special orbiter_kwargs property

>>> from orbiter.objects.dag import OrbiterDAG
>>> @rule
... def my_rule(foo):
...     return OrbiterDAG(dag_id="", file_path="")
>>> my_rule(foo="bar").orbiter_kwargs
{'foo': 'bar'}

Note

A Rule must have a rule property and extra properties cannot be passed

>>> # noinspection Pydantic
... Rule(rule=lambda: None, not_a_prop="???")
... # doctest: +ELLIPSIS
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
rule Callable[[dict | Any], Any | None]

Python function to evaluate. Takes a single argument and returns something or nothing

priority int, optional

Higher priority rules are evaluated first, must be greater than 0. Default is 0

orbiter.rules.DAGFilterRule

Bases: Rule

The @dag_filter_rule decorator creates a DAGFilterRule

@dag_filter_rule
def foo(val: dict) -> List[dict]:
    return [{"dag_id": "foo"}]

Hint

In addition to filtering, a DAGFilterRule can also map input to a more reasonable output for later processing

orbiter.rules.DAGRule

Bases: Rule

A @dag_rule decorator creates a DAGRule

Tip

A __file key is added to the original input, which is the file path of the input.

@dag_rule
def foo(val: dict) -> OrbiterDAG | None:
    if "id" in val:
        return OrbiterDAG(dag_id=val["id"], file_path=f"{val["id"]}.py")
    else:
        return None

orbiter.rules.TaskFilterRule

Bases: Rule

A @task_filter_rule decorator creates a TaskFilterRule

@task_filter_rule
def foo(val: dict) -> List[dict] | None:
    return [{"task_id": "foo"}]

Hint

In addition to filtering, a TaskFilterRule can also map input to a more reasonable output for later processing

Parameters:

Name Type Description
val dict

A dictionary of the task

Returns:

Type Description
List[dict] | None

A list of dictionaries of possible tasks or None

orbiter.rules.TaskRule

Bases: Rule

A @task_rule decorator creates a TaskRule

@task_rule
def foo(val: dict) -> OrbiterOperator | OrbiterTaskGroup:
    if "id" in val and "command" in val:
        return OrbiterBashOperator(
            task_id=val["id"], bash_command=val["command"]
        )
    else:
        return None

Parameters:

Name Type Description
val dict

A dictionary of the task

Returns:

Type Description
OrbiterOperator | OrbiterTaskGroup | None

A subclass of OrbiterOperator or OrbiterTaskGroup or None

orbiter.rules.TaskDependencyRule

Bases: Rule

An @task_dependency_rule decorator creates a TaskDependencyRule, which takes an OrbiterDAG and returns a list[OrbiterTaskDependency] or None

@task_dependency_rule
def foo(val: OrbiterDAG) -> OrbiterTaskDependency:
    return [OrbiterTaskDependency(task_id="upstream", downstream="downstream")]

Parameters:

Name Type Description
val OrbiterDAG

Returns:

Type Description
List[OrbiterTaskDependency] | None

A list of OrbiterTaskDependency or None

orbiter.rules.PostProcessingRule

Bases: Rule

An @post_processing_rule decorator creates a PostProcessingRule, which takes an OrbiterProject, after all other rules have been applied, and modifies it in-place.

@post_processing_rule
def foo(val: OrbiterProject) -> None:
    val.dags["foo"].tasks["bar"].doc = "Hello World"

Parameters:

Name Type Description
val OrbiterProject

Returns:

Type Description
None

None