Overview¶
Airflow Tasks are units of work. An Operator is a pre-defined task with specific functionality.
Operators can be looked up in the Astronomer Registry.
The easiest way to create an operator in a translation to use an existing subclass of OrbiterOperator
(e.g. OrbiterBashOperator
).
If an OrbiterOperator
subclass doesn't exist for your use case, you can:
1) Utilize OrbiterTask
from orbiter.objects.requirement import OrbiterRequirement
from orbiter.objects.task import OrbiterTask
from orbiter.rules import task_rule
@task_rule
def my_rule(val: dict):
return OrbiterTask(
task_id="my_task",
imports=[OrbiterRequirement(
package="apache-airflow",
module="airflow.operators.trigger_dagrun",
names=["TriggerDagRunOperator"],
)],
...
)
2) Create a new subclass of OrbiterOperator
, which can be beneficial if you are using it frequently
in separate @task_rules
from orbiter.objects.task import OrbiterOperator
from orbiter.objects.requirement import OrbiterRequirement
from orbiter.rules import task_rule
class OrbiterTriggerDagRunOperator(OrbiterOperator):
# Define the imports required for the operator, and the operator name
imports = [
OrbiterRequirement(
package="apache-airflow",
module="airflow.operators.trigger_dagrun",
names=["TriggerDagRunOperator"],
)
]
operator: str = "PythonOperator"
# Add fields should be rendered in the output
render_attributes = OrbiterOperator.render_attributes + [
...
]
# Add the fields that are required for the operator here, with their types
# Not all Airflow Operator fields are required, just the ones you will use.
trigger_dag_id: str
...
@task_rule
def my_rule(val: dict):
return OrbiterTriggerDagRunOperator(...)
Diagram¶
classDiagram
direction LR
OrbiterOperator "implements" <|-- OrbiterTask
OrbiterOperator --> "many" OrbiterCallback
class OrbiterOperator["orbiter.objects.task.OrbiterOperator"] {
imports: List[OrbiterRequirement]
operator: str
task_id: str
pool: str | None
pool_slots: int | None
trigger_rule: str | None
downstream: Set[str]
add_downstream(str | List[str] | OrbiterTaskDependency)
}
click OrbiterOperator href "#orbiter.objects.task.OrbiterOperator" "OrbiterOperator Documentation"
class OrbiterTask["orbiter.objects.task.OrbiterTask"] {
<<OrbiterOperator>>
<<OrbiterOperator>>
imports: List[OrbiterRequirement]
task_id: str
**kwargs
}
click OrbiterTask href "#orbiter.objects.task.OrbiterTask" "OrbiterTask Documentation"
OrbiterOperator "implements" <|-- OrbiterBashOperator
class OrbiterBashOperator["orbiter.objects.operators.bash.OrbiterBashOperator"] {
<<OrbiterOperator>>
operator = "BashOperator"
task_id: str
bash_command: str
}
click OrbiterBashOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.bash.OrbiterBashOperator" "OrbiterBashOperator Documentation"
OrbiterOperator "implements" <|-- OrbiterEmailOperator
class OrbiterEmailOperator["orbiter.objects.operators.smtp.OrbiterEmailOperator"] {
<<OrbiterOperator>>
operator = "EmailOperator"
task_id: str
to: str | list[str]
subject: str
html_content: str
files: list | None
conn_id: str
}
click OrbiterEmailOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.smtp.OrbiterEmailOperator" "OrbiterEmailOperator Documentation"
OrbiterOperator "implements" <|-- OrbiterEmptyOperator
class OrbiterEmptyOperator["orbiter.objects.operators.empty.OrbiterEmptyOperator"] {
<<OrbiterOperator>>
operator = "BashOperator"
task_id: str
}
click OrbiterEmptyOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.empty.OrbiterEmptyOperator" "OrbiterEmptyOperator Documentation"
OrbiterOperator "implements" <|-- OrbiterPythonOperator
class OrbiterPythonOperator["orbiter.objects.operators.python.OrbiterPythonOperator"] {
<<OrbiterOperator>>
operator = "PythonOperator"
task_id: str
python_callable: Callable
op_args: list | None
op_kwargs: dict | None
}
click OrbiterPythonOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.python.OrbiterPythonOperator" "OrbiterPythonOperator Documentation"
OrbiterOperator "implements" <|-- OrbiterSQLExecuteQueryOperator
class OrbiterSQLExecuteQueryOperator["orbiter.objects.operators.sql.OrbiterSQLExecuteQueryOperator"] {
<<OrbiterOperator>>
operator = "SQLExecuteQueryOperator"
task_id: str
conn_id: str
sql: str
}
click OrbiterSQLExecuteQueryOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.sql.OrbiterSQLExecuteQueryOperator" "OrbiterSQLExecuteQueryOperator Documentation"
OrbiterOperator "implements" <|-- OrbiterSSHOperator
class OrbiterSSHOperator["orbiter.objects.operators.ssh.OrbiterSSHOperator"] {
<<OrbiterOperator>>
operator = "SSHOperator"
task_id: str
ssh_conn_id: str
command: str
environment: Dict[str, str] | None
}
click OrbiterSSHOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.ssh.OrbiterSSHOperator" "OrbiterSSHOperator Documentation"
class OrbiterCallback["orbiter.objects.callbacks.OrbiterCallback"] {
function: str
}
click OrbiterCallback href "Operators_and_Callbacks/callbacks#orbiter.objects.callbacks.OrbiterCallback" "OrbiterCallback Documentation"
OrbiterCallback "implements" <|-- OrbiterSmtpNotifierCallback
class OrbiterSmtpNotifierCallback["orbiter.objects.callbacks.smtp.OrbiterSmtpNotifierCallback"] {
<<OrbiterCallback>>
to: str
from_email: str
smtp_conn_id: str
subject: str
html_content: str
cc: str | Sequence[str]
}
click OrbiterSmtpNotifierCallback href "Operators_and_Callbacks/callbacks#orbiter.objects.callbacks.smtp.OrbiterSmtpNotifierCallback" "OrbiterSmtpNotifierCallback Documentation"
orbiter.objects.task.OrbiterOperator ¶
Abstract class representing a Task in Airflow.
Must be subclassed (such as OrbiterBashOperator
,
or OrbiterTask
).
Subclassing Example:
>>> from orbiter.objects import OrbiterRequirement
>>> class OrbiterMyOperator(OrbiterOperator):
... imports: ImportList = [OrbiterRequirement(package="apache-airflow")]
... operator: str = "MyOperator"
>>> foo = OrbiterMyOperator(task_id="task_id"); foo
task_id_task = MyOperator(task_id='task_id')
Adding single downstream tasks:
>>> from orbiter.ast_helper import render_ast
>>> render_ast(foo.add_downstream("downstream")._downstream_to_ast())
'task_id_task >> downstream_task'
Adding multiple downstream tasks:
>>> render_ast(foo.add_downstream(["a", "b"])._downstream_to_ast())
'task_id_task >> [a_task, b_task, downstream_task]'
Note
Validation - task_id in OrbiterTaskDependency must match this task_id
Parameters:
Name | Type | Description |
---|---|---|
imports |
List[OrbiterRequirement]
|
List of requirements for the operator |
task_id |
str
|
The |
trigger_rule |
str, optional
|
Conditions under which to start the task (docs) |
pool |
str, optional
|
Name of the pool to use |
pool_slots |
int, optional
|
Slots for this task to occupy |
operator |
str, optional
|
Operator name |
downstream |
Set[str], optional
|
Downstream tasks, defaults to |
**kwargs |
Other properties that may be passed to operator |
|
**OrbiterBase |
OrbiterBase inherited properties |
orbiter.objects.task.OrbiterTaskDependency ¶
Represents a task dependency, which is added to either an
OrbiterOperator
or an OrbiterTaskGroup
.
Parameters:
Name | Type | Description |
---|---|---|
task_id |
str
|
The task_id for the operator |
downstream |
str | List[str]
|
downstream task(s) |
orbiter.objects.task.OrbiterTask ¶
A generic version of OrbiterOperator
that can be instantiated directly.
The operator that is instantiated is inferred from the imports
field,
via the first *Operator
or *Sensor
import.
View info for specific operators at the Astronomer Registry.
>>> from orbiter.objects.requirement import OrbiterRequirement
>>> OrbiterTask(task_id="foo", bash_command="echo 'hello world'", other=1, imports=[
... OrbiterRequirement(package="apache-airflow", module="airflow.operators.bash", names=["BashOperator"])
... ])
foo_task = BashOperator(task_id='foo', bash_command="echo 'hello world'", other=1)
>>> def foo():
... pass
>>> OrbiterTask(task_id="foo", python_callable=foo, other=1, imports=[
... OrbiterRequirement(package="apache-airflow", module="airflow.sensors.python", names=["PythonSensor"])
... ])
def foo():
pass
foo_task = PythonSensor(task_id='foo', other=1, python_callable=foo)
Parameters:
Name | Type | Description |
---|---|---|
task_id |
str
|
The |
imports |
List[OrbiterRequirement]
|
List of requirements for the operator. The Operator is inferred from first |
**kwargs |
Any other keyword arguments to be passed to the operator |
orbiter.objects.task_group.OrbiterTaskGroup ¶
Represents a TaskGroup in Airflow, which contains multiple tasks
>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> from orbiter.ast_helper import render_ast
>>> OrbiterTaskGroup(task_group_id="foo").add_tasks([
... OrbiterBashOperator(task_id="b", bash_command="b"),
... OrbiterBashOperator(task_id="a", bash_command="a").add_downstream("b"),
... ]).add_downstream("c")
with TaskGroup(group_id='foo') as foo:
b_task = BashOperator(task_id='b', bash_command='b')
a_task = BashOperator(task_id='a', bash_command='a')
a_task >> b_task
>>> render_ast(OrbiterTaskGroup(task_group_id="foo", downstream={"c"})._downstream_to_ast())
'foo >> c_task'
Parameters:
Name | Type | Description |
---|---|---|
task_group_id |
str
|
The id of the TaskGroup |
tasks |
Dict[str, OrbiterOperator | OrbiterTaskGroup]
|
The tasks in the TaskGroup |
**OrbiterBase |
OrbiterBase inherited properties |