Skip to content

Overview

Airflow Tasks are units of work. An Operator is a pre-defined task with specific functionality.

Operators can be looked up in the Astronomer Registry.

The easiest way to create an operator in a translation to use an existing subclass of OrbiterOperator (e.g. OrbiterBashOperator).

If an OrbiterOperator subclass doesn't exist for your use case, you can:

1) Utilize OrbiterTask

from orbiter.objects.requirement import OrbiterRequirement
from orbiter.objects.task import OrbiterTask
from orbiter.rules import task_rule

@task_rule
def my_rule(val: dict):
    return OrbiterTask(
        task_id="my_task",
        imports=[OrbiterRequirement(
            package="apache-airflow",
            module="airflow.operators.trigger_dagrun",
            names=["TriggerDagRunOperator"],
        )],
        ...
    )

2) Create a new subclass of OrbiterOperator, which can be beneficial if you are using it frequently in separate @task_rules

from orbiter.objects.task import OrbiterOperator
from orbiter.objects.requirement import OrbiterRequirement
from orbiter.rules import task_rule

class OrbiterTriggerDagRunOperator(OrbiterOperator):
    # Define the imports required for the operator, and the operator name
    imports = [
        OrbiterRequirement(
            package="apache-airflow",
            module="airflow.operators.trigger_dagrun",
            names=["TriggerDagRunOperator"],
        )
    ]
    operator: str = "PythonOperator"

    # Add fields should be rendered in the output
    render_attributes = OrbiterOperator.render_attributes + [
        ...
    ]

    # Add the fields that are required for the operator here, with their types
    # Not all Airflow Operator fields are required, just the ones you will use.
    trigger_dag_id: str
    ...

@task_rule
def my_rule(val: dict):
    return OrbiterTriggerDagRunOperator(...)

Diagram

classDiagram
    direction LR
    OrbiterOperator "implements" <|-- OrbiterTask
    OrbiterOperator --> "many" OrbiterCallback
    class OrbiterOperator["orbiter.objects.task.OrbiterOperator"] {
            imports: List[OrbiterRequirement]
            operator: str
            task_id: str
            pool: str | None
            pool_slots: int | None
            trigger_rule: str | None
            downstream: Set[str]
            add_downstream(str | List[str] | OrbiterTaskDependency)
    }
    click OrbiterOperator href "#orbiter.objects.task.OrbiterOperator" "OrbiterOperator Documentation"

    class OrbiterTask["orbiter.objects.task.OrbiterTask"] {
        <<OrbiterOperator>>
            <<OrbiterOperator>>
            imports: List[OrbiterRequirement]
            task_id: str
            **kwargs
    }
    click OrbiterTask href "#orbiter.objects.task.OrbiterTask" "OrbiterTask Documentation"

    OrbiterOperator "implements" <|-- OrbiterBashOperator
    class OrbiterBashOperator["orbiter.objects.operators.bash.OrbiterBashOperator"] {
        <<OrbiterOperator>>
            operator = "BashOperator"
            task_id: str
            bash_command: str
    }
    click OrbiterBashOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.bash.OrbiterBashOperator" "OrbiterBashOperator Documentation"

    OrbiterOperator "implements" <|-- OrbiterEmailOperator
    class OrbiterEmailOperator["orbiter.objects.operators.smtp.OrbiterEmailOperator"] {
        <<OrbiterOperator>>
            operator = "EmailOperator"
            task_id: str
            to: str | list[str]
            subject: str
            html_content: str
            files: list | None
            conn_id: str
    }
    click OrbiterEmailOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.smtp.OrbiterEmailOperator" "OrbiterEmailOperator Documentation"

  OrbiterOperator  "implements" <|--  OrbiterEmptyOperator
    class OrbiterEmptyOperator["orbiter.objects.operators.empty.OrbiterEmptyOperator"] {
        <<OrbiterOperator>>
            operator = "BashOperator"
            task_id: str
    }
    click OrbiterEmptyOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.empty.OrbiterEmptyOperator" "OrbiterEmptyOperator Documentation"

    OrbiterOperator "implements" <|-- OrbiterPythonOperator
    class OrbiterPythonOperator["orbiter.objects.operators.python.OrbiterPythonOperator"] {
        <<OrbiterOperator>>
            operator = "PythonOperator"
            task_id: str
            python_callable: Callable
            op_args: list | None
            op_kwargs: dict | None
    }
    click OrbiterPythonOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.python.OrbiterPythonOperator" "OrbiterPythonOperator Documentation"

    OrbiterOperator "implements" <|-- OrbiterSQLExecuteQueryOperator
    class OrbiterSQLExecuteQueryOperator["orbiter.objects.operators.sql.OrbiterSQLExecuteQueryOperator"] {
        <<OrbiterOperator>>
            operator = "SQLExecuteQueryOperator"
            task_id: str
            conn_id: str
            sql: str
    }
    click OrbiterSQLExecuteQueryOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.sql.OrbiterSQLExecuteQueryOperator" "OrbiterSQLExecuteQueryOperator Documentation"

    OrbiterOperator "implements" <|-- OrbiterSSHOperator
    class OrbiterSSHOperator["orbiter.objects.operators.ssh.OrbiterSSHOperator"] {
        <<OrbiterOperator>>
            operator = "SSHOperator"
            task_id: str
            ssh_conn_id: str
            command: str
            environment: Dict[str, str] | None
    }
    click OrbiterSSHOperator href "Operators_and_Callbacks/operators#orbiter.objects.operators.ssh.OrbiterSSHOperator" "OrbiterSSHOperator Documentation"

    class OrbiterCallback["orbiter.objects.callbacks.OrbiterCallback"] {
            function: str
    }
    click OrbiterCallback href "Operators_and_Callbacks/callbacks#orbiter.objects.callbacks.OrbiterCallback" "OrbiterCallback Documentation"

    OrbiterCallback "implements" <|--  OrbiterSmtpNotifierCallback
    class OrbiterSmtpNotifierCallback["orbiter.objects.callbacks.smtp.OrbiterSmtpNotifierCallback"] {
        <<OrbiterCallback>>
            to: str
            from_email: str
            smtp_conn_id: str
            subject: str
            html_content: str
            cc: str | Iterable[str]
    }
    click OrbiterSmtpNotifierCallback href "Operators_and_Callbacks/callbacks#orbiter.objects.callbacks.smtp.OrbiterSmtpNotifierCallback" "OrbiterSmtpNotifierCallback Documentation"

orbiter.objects.task.OrbiterOperator

Abstract class representing a Task in Airflow.

Must be subclassed (such as OrbiterBashOperator, or OrbiterTask).

Subclassing Example:

>>> from orbiter.objects import OrbiterRequirement
>>> class OrbiterMyOperator(OrbiterOperator):
...   imports: ImportList = [OrbiterRequirement(package="apache-airflow")]
...   operator: str = "MyOperator"

>>> foo = OrbiterMyOperator(task_id="task_id"); foo
task_id_task = MyOperator(task_id='task_id')

Adding single downstream tasks:

>>> from orbiter.ast_helper import render_ast
>>> render_ast(foo.add_downstream("downstream")._downstream_to_ast())
'task_id_task >> downstream_task'

Adding multiple downstream tasks:

>>> render_ast(foo.add_downstream(["a", "b"])._downstream_to_ast())
'task_id_task >> [a_task, b_task, downstream_task]'

Note

Validation - task_id in OrbiterTaskDependency must match this task_id

>>> foo.add_downstream(OrbiterTaskDependency(task_id="other", downstream="bar")).downstream
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ValueError: Task dependency ... has a different task_id than task_id

Parameters:

Name Type Description
imports List[OrbiterRequirement]

List of requirements for the operator

task_id str

The task_id for the operator, must be unique and snake_case

trigger_rule str, optional

Conditions under which to start the task (docs)

pool str, optional

Name of the pool to use

pool_slots int, optional

Slots for this task to occupy

operator str, optional

Operator name

downstream Set[str], optional

Downstream tasks, defaults to set()

**kwargs

Other properties that may be passed to operator

**OrbiterBase

OrbiterBase inherited properties

orbiter.objects.task.OrbiterTaskDependency

Represents a task dependency, which is added to either an OrbiterOperator or an OrbiterTaskGroup.

Parameters:

Name Type Description
task_id str

The task_id for the operator

downstream str | List[str]

downstream task(s)

orbiter.objects.task.OrbiterTask

A generic version of OrbiterOperator that can be instantiated directly.

The operator that is instantiated is inferred from the imports field, via the first *Operator or *Sensor import.

View info for specific operators at the Astronomer Registry.

>>> from orbiter.objects.requirement import OrbiterRequirement
>>> OrbiterTask(task_id="foo", bash_command="echo 'hello world'", other=1, imports=[
...   OrbiterRequirement(package="apache-airflow", module="airflow.operators.bash", names=["BashOperator"])
... ])
foo_task = BashOperator(task_id='foo', bash_command="echo 'hello world'", other=1)

>>> def foo():
...   pass
>>> OrbiterTask(task_id="foo", python_callable=foo, other=1, imports=[
...   OrbiterRequirement(package="apache-airflow", module="airflow.sensors.python", names=["PythonSensor"])
... ])
def foo():
    pass
foo_task = PythonSensor(task_id='foo', other=1, python_callable=foo)

Parameters:

Name Type Description
task_id str

The task_id for the operator. Must be unique and snake_case

imports List[OrbiterRequirement]

List of requirements for the operator. The Operator is inferred from first *Operator or *Sensor imported.

**kwargs

Any other keyword arguments to be passed to the operator

orbiter.objects.task_group.OrbiterTaskGroup

Represents a TaskGroup in Airflow, which contains multiple tasks

>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> from orbiter.ast_helper import render_ast
>>> OrbiterTaskGroup(task_group_id="foo", tasks=[
...   OrbiterBashOperator(task_id="b", bash_command="b"),
...   OrbiterBashOperator(task_id="a", bash_command="a").add_downstream("b"),
... ], downstream={"c"})
with TaskGroup(group_id='foo') as foo:
    b_task = BashOperator(task_id='b', bash_command='b')
    a_task = BashOperator(task_id='a', bash_command='a')
    a_task >> b_task

>>> render_ast(OrbiterTaskGroup(task_group_id="foo", tasks=[], downstream={"c"})._downstream_to_ast())
'foo >> c_task'

Parameters:

Name Type Description
task_group_id str

The id of the TaskGroup

tasks List[OrbiterOperator | OrbiterTaskGroup]

The tasks in the TaskGroup

**OrbiterBase

OrbiterBase inherited properties