Skip to content

Workflow

Airflow workflows are represented by a DAG which is a Directed Acyclic Graph (of Tasks).

Diagram

classDiagram
    direction LR
    class OrbiterRequirement["orbiter.objects.requirement.OrbiterRequirement"] {
            package: str | None
            module: str | None
            names: List[str] | None
            sys_package: str | None
    }

    OrbiterDAG "via schedule" --> OrbiterTimetable
    OrbiterDAG --> "many" OrbiterOperator
    OrbiterDAG --> "many" OrbiterTaskGroup
    OrbiterDAG --> "many" OrbiterRequirement
    class OrbiterDAG["orbiter.objects.dag.OrbiterDAG"] {
            imports: List[OrbiterRequirement]
            file_path: str
            dag_id: str
            schedule: str | OrbiterTimetable | None
            catchup: bool
            start_date: DateTime
            tags: List[str]
            default_args: Dict[str, Any]
            params: Dict[str, Any]
            doc_md: str | None
            tasks: Dict[str, OrbiterOperator]
            kwargs: dict
            orbiter_kwargs: dict
            orbiter_conns: Set[OrbiterConnection]
            orbiter_vars: Set[OrbiterVariable]
            orbiter_env_vars: Set[OrbiterEnvVar]
            orbiter_includes: Set[OrbiterInclude]
    }
    click OrbiterDAG href "#orbiter.objects.dag.OrbiterDAG" "OrbiterDAG Documentation"

    OrbiterTaskGroup --> "many" OrbiterRequirement
    class OrbiterTaskGroup["orbiter.objects.task.OrbiterTaskGroup"] {
            task_group_id: str
            tasks: List[OrbiterOperator | OrbiterTaskGroup]
            add_downstream(str | List[str] | OrbiterTaskDependency)
    }

    OrbiterOperator --> "many" OrbiterRequirement
    OrbiterOperator --> "one" OrbiterPool
    OrbiterOperator --> "many" OrbiterConnection
    OrbiterOperator --> "many" OrbiterVariable
    OrbiterOperator --> "many" OrbiterEnvVar
    class OrbiterOperator["orbiter.objects.task.OrbiterOperator"] {
            imports: List[OrbiterRequirement]
            operator: str
            task_id: str
            pool: str | None
            pool_slots: int | None
            trigger_rule: str | None
            downstream: Set[str]
            add_downstream(str | List[str] | OrbiterTaskDependency)
    }

    class OrbiterTimetable["orbiter.objects.timetables.OrbiterTimetable"] {
            imports: List[OrbiterRequirements]
            orbiter_includes: Set[OrbiterIncludes]
            **kwargs: dict
    }
    click OrbiterTimetable href "#orbiter.objects.timetables.OrbiterTimetable" "OrbiterTimetable Documentation"

    class OrbiterConnection["orbiter.objects.connection.OrbiterConnection"] {
            conn_id: str
            conn_type: str
            **kwargs
    }

    class OrbiterEnvVar["orbiter.objects.env_var.OrbiterEnvVar"] {
            key: str
            value: str
    }

    class OrbiterPool["orbiter.objects.pool.OrbiterPool"] {
            name: str
            description: str | None
            slots: int | None
    }

    class OrbiterRequirement["orbiter.objects.requirement.OrbiterRequirement"] {
            package: str | None
            module: str | None
            names: List[str] | None
            sys_package: str | None
    }

    class OrbiterVariable["orbiter.objects.variable.OrbiterVariable"] {
            key: str
            value: str
    }

orbiter.objects.dag.OrbiterDAG

Represents an Airflow DAG, with its tasks and dependencies.

Renders to a .py file in the /dags folder

Parameters:

Name Type Description
file_path str

File path of the DAG, relative to the /dags folder (filepath=my_dag.py would render to dags/my_dag.py)

dag_id str

The dag_id. Must be unique and snake_case. Good practice is to set dag_id == file_path

schedule str | OrbiterTimetable, optional

The schedule for the DAG. Defaults to None (only runs when manually triggered)

catchup bool, optional

Whether to catchup runs from the start_date to now, on first run. Defaults to False

start_date DateTime, optional

The start date for the DAG. Defaults to Unix Epoch

tags List[str], optional

Tags for the DAG, used for sorting and filtering in the Airflow UI

default_args Dict[str, Any], optional

Default arguments for any tasks in the DAG

params Dict[str, Any], optional

Params for the DAG

doc_md str, optional

Documentation for the DAG with markdown support

kwargs dict, optional

Additional keyword arguments to pass to the DAG

**OrbiterBase

OrbiterBase inherited properties

add_tasks

add_tasks(
    tasks: (
        OrbiterOperator
        | OrbiterTaskGroup
        | Iterable[OrbiterOperator | OrbiterTaskGroup]
    ),
) -> "OrbiterDAG"

Add one or more OrbiterOperators to the DAG

>>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
>>> OrbiterDAG(file_path="", dag_id="foo").add_tasks(OrbiterEmptyOperator(task_id="bar")).tasks
{'bar': bar_task = EmptyOperator(task_id='bar')}

>>> OrbiterDAG(file_path="", dag_id="foo").add_tasks([OrbiterEmptyOperator(task_id="bar")]).tasks
{'bar': bar_task = EmptyOperator(task_id='bar')}

Tip

Validation requires a OrbiterTaskGroup, OrbiterOperator (or subclass), or list of either to be passed

>>> # noinspection PyTypeChecker
... OrbiterDAG(file_path="", dag_id="foo").add_tasks("bar")
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...
>>> # noinspection PyTypeChecker
... OrbiterDAG(file_path="", dag_id="foo").add_tasks(["bar"])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
tasks OrbiterOperator | OrbiterTaskGroup | Iterable[OrbiterOperator | OrbiterTaskGroup]

List of OrbiterOperator, or OrbiterTaskGroup or subclass

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/dag.py
def add_tasks(
    self,
    tasks: (
        OrbiterOperator
        | OrbiterTaskGroup
        | Iterable[OrbiterOperator | OrbiterTaskGroup]
    ),
) -> "OrbiterDAG":
    """
    Add one or more [`OrbiterOperators`][orbiter.objects.task.OrbiterOperator] to the DAG

    ```pycon
    >>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
    >>> OrbiterDAG(file_path="", dag_id="foo").add_tasks(OrbiterEmptyOperator(task_id="bar")).tasks
    {'bar': bar_task = EmptyOperator(task_id='bar')}

    >>> OrbiterDAG(file_path="", dag_id="foo").add_tasks([OrbiterEmptyOperator(task_id="bar")]).tasks
    {'bar': bar_task = EmptyOperator(task_id='bar')}

    ```

    !!! tip

        Validation requires a `OrbiterTaskGroup`, `OrbiterOperator` (or subclass), or list of either to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterDAG(file_path="", dag_id="foo").add_tasks("bar")
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...
        >>> # noinspection PyTypeChecker
        ... OrbiterDAG(file_path="", dag_id="foo").add_tasks(["bar"])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```
    :param tasks: List of [OrbiterOperator][orbiter.objects.task.OrbiterOperator], or OrbiterTaskGroup or subclass
    :type tasks: OrbiterOperator | OrbiterTaskGroup | Iterable[OrbiterOperator | OrbiterTaskGroup]
    :return: self
    :rtype: OrbiterProject
    """
    if (
        isinstance(tasks, OrbiterOperator)
        or isinstance(tasks, OrbiterTaskGroup)
        or issubclass(type(tasks), OrbiterOperator)
    ):
        tasks = [tasks]

    for task in tasks:
        try:
            task_id = getattr(task, "task_id", None) or getattr(
                task, "task_group_id"
            )
        except AttributeError:
            raise AttributeError(
                f"Task {task} does not have a task_id or task_group_id attribute"
            )
        self.tasks[task_id] = task
    return self

Timetables

orbiter.objects.timetables.OrbiterTimetable

An Airflow Timetable reference.

Utilizes OrbiterInclude to add a file to a /plugins folder to register the timetable.

Parameters:

Name Type Description
**kwargs

any other kwargs to provide to Timetable

orbiter.objects.timetables.multi_cron_timetable

orbiter.objects.timetables.multi_cron_timetable.OrbiterMultiCronTimetable

An Airflow Timetable that can be supplied with multiple cron strings.

>>> OrbiterMultiCronTimetable(cron_defs=["*/5 * * * *", "*/7 * * * *"])
MultiCronTimetable(cron_defs=['*/5 * * * *', '*/7 * * * *'])

Parameters:

Name Type Description
cron_defs List[str]

A list of cron strings

timezone str

The timezone to use for the timetable

period_length int

The length of the period

period_unit str

The unit of the period