Rules
orbiter.rules.Rule ¶
A Rule contains a python function that is evaluated and produces something
(typically an Object) or nothing
A Rule can be created from a decorator
>>> @rule(priority=1)
... def my_rule(val):
... return 1
>>> isinstance(my_rule, Rule)
True
>>> my_rule(val={})
1
The function in a rule takes one parameter (val), and must always evaluate to something or nothing.
>>> some_rule = Rule(rule=lambda val: 4) # returns something
>>> some_rule(val={}) # always takes "val", in this case a dict
4
>>> other_rule = Rule(rule=lambda val: None) # returns nothing
>>> other_rule(val={})
val may be a dictionary or a different type.
Refer to each type of rule for more details.
Tip
If the returned value is an Orbiter Object,
the input kwargs are saved in a special orbiter_kwargs property,
along with information about the matching rule in an orbiter_meta property.
>>> from orbiter.rules import task_rule
>>> @task_rule(params_doc={"id": "BashOperator.task_id", "command": "BashOperator.bash_command"})
... def my_rule(val: dict):
... '''This rule takes that and gives this'''
... from orbiter.objects.operators.bash import OrbiterBashOperator
... if 'command' in val:
... return OrbiterBashOperator(task_id=val['id'], bash_command=val['command'])
... else:
... return None
>>> kwargs = {"id": "foo", "command": "echo 'hello world'", "unvisited_key": "bar"}
>>> match = my_rule(val=kwargs)
>>> type(match)
<class 'orbiter.objects.operators.bash.OrbiterBashOperator'>
>>> match.orbiter_kwargs
{'val': {'id': 'foo', 'command': "echo 'hello world'", 'unvisited_key': 'bar'}}
>>> match.orbiter_meta
... # doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
OrbiterMeta(matched_rule_source='@task_rule...',
matched_rule_docstring='This rule takes that and gives this',
matched_rule_params_doc={'id': 'BashOperator.task_id', 'command': 'BashOperator.bash_command'},
matched_rule_name='my_rule',
matched_rule_priority=1,
visited_keys=['command', 'id'])
Note
A Rule must have a rule property and extra properties cannot be passed
Parameters:
| Name | Type | Description |
|---|---|---|
rule |
Callable[[dict | Any], Any | None]
|
Python function to evaluate. Takes a single argument and returns something or nothing |
priority |
(int, optional)
|
Higher priority rules are evaluated first, must be greater than 0. Default is 0 |
params_doc |
(dict[str, str], optional)
|
What the rule takes as input (key), and gives as output (value) |
orbiter.rules.DAGFilterRule ¶
Bases: Rule
The @dag_filter_rule decorator creates a DAGFilterRule
Hint
In addition to filtering, a DAGFilterRule can also map input to a more reasonable output for later processing
orbiter.rules.DAGRule ¶
orbiter.rules.TaskFilterRule ¶
Bases: Rule
A @task_filter_rule decorator creates a TaskFilterRule
Hint
In addition to filtering, a TaskFilterRule can also map input to a more reasonable output for later processing
Parameters:
| Name | Type | Description |
|---|---|---|
val |
dict
|
A dictionary of the task |
Returns:
| Type | Description |
|---|---|
List[dict] | None
|
A list of dictionaries of possible tasks or |
orbiter.rules.TaskRule ¶
Bases: Rule
A @task_rule decorator creates a TaskRule
@task_rule
def foo(val: dict) -> OrbiterOperator | OrbiterTaskGroup:
if "id" in val and "command" in val:
return OrbiterBashOperator(
task_id=val["id"], bash_command=val["command"]
)
else:
return None
Parameters:
| Name | Type | Description |
|---|---|---|
val |
dict
|
A dictionary of the task |
Returns:
| Type | Description |
|---|---|
OrbiterOperator | OrbiterTaskGroup | None
|
A subclass of |
orbiter.rules.TaskDependencyRule ¶
Bases: Rule
An @task_dependency_rule decorator creates a TaskDependencyRule,
which takes an OrbiterDAG
and returns a list[OrbiterTaskDependency] or None
@task_dependency_rule
def foo(val: OrbiterDAG) -> OrbiterTaskDependency:
return [OrbiterTaskDependency(task_id="upstream", downstream="downstream")]
Parameters:
| Name | Type | Description |
|---|---|---|
val |
OrbiterDAG
|
An |
Returns:
| Type | Description |
|---|---|
List[OrbiterTaskDependency] | None
|
A list of |
orbiter.rules.PostProcessingRule ¶
Bases: Rule
An @post_processing_rule decorator creates a PostProcessingRule,
which takes an OrbiterProject,
after all other rules have been applied, and modifies it in-place.
@post_processing_rule
def foo(val: OrbiterProject) -> None:
val.dags["foo"].tasks["bar"].doc = "Hello World"
Parameters:
| Name | Type | Description |
|---|---|---|
val |
OrbiterProject
|
Returns:
| Type | Description |
|---|---|
None
|
|