Skip to content

Workflow

Airflow workflows are represented by a DAG which is a Directed Acyclic Graph (of Tasks).

Diagram

classDiagram
    direction LR
    class OrbiterRequirement["orbiter.objects.requirement.OrbiterRequirement"] {
            package: str | None
            module: str | None
            names: List[str] | None
            sys_package: str | None
    }

    OrbiterDAG "via schedule" --> OrbiterTimetable
    OrbiterDAG --> "many" OrbiterOperator
    OrbiterDAG --> "many" OrbiterTaskGroup
    OrbiterDAG --> "many" OrbiterRequirement
    class OrbiterDAG["orbiter.objects.dag.OrbiterDAG"] {
            imports: List[OrbiterRequirement]
            file_path: str
            dag_id: str
            schedule: str | OrbiterTimetable | None
            catchup: bool
            start_date: DateTime
            tags: List[str]
            default_args: Dict[str, Any]
            params: Dict[str, Any]
            doc_md: str | None
            tasks: Dict[str, OrbiterOperator]
            kwargs: dict
            orbiter_kwargs: dict
            orbiter_conns: Set[OrbiterConnection]
            orbiter_vars: Set[OrbiterVariable]
            orbiter_env_vars: Set[OrbiterEnvVar]
            orbiter_includes: Set[OrbiterInclude]
    }
    click OrbiterDAG href "#orbiter.objects.dag.OrbiterDAG" "OrbiterDAG Documentation"

    OrbiterTaskGroup --> "many" OrbiterRequirement
    class OrbiterTaskGroup["orbiter.objects.task.OrbiterTaskGroup"] {
            task_group_id: str
            tasks: Dict[str, OrbiterOperator | OrbiterTaskGroup]
            add_downstream(str | List[str] | OrbiterTaskDependency)
    }

    OrbiterOperator --> "many" OrbiterRequirement
    OrbiterOperator --> "one" OrbiterPool
    OrbiterOperator --> "many" OrbiterConnection
    OrbiterOperator --> "many" OrbiterVariable
    OrbiterOperator --> "many" OrbiterEnvVar
    class OrbiterOperator["orbiter.objects.task.OrbiterOperator"] {
            imports: List[OrbiterRequirement]
            operator: str
            task_id: str
            pool: str | None
            pool_slots: int | None
            trigger_rule: str | None
            downstream: Set[str]
            add_downstream(str | List[str] | OrbiterTaskDependency)
    }

    class OrbiterTimetable["orbiter.objects.timetables.OrbiterTimetable"] {
            imports: List[OrbiterRequirement]
            orbiter_includes: Set[OrbiterIncludes]
            **kwargs: dict
    }
    click OrbiterTimetable href "#orbiter.objects.timetables.OrbiterTimetable" "OrbiterTimetable Documentation"

    class OrbiterConnection["orbiter.objects.connection.OrbiterConnection"] {
            conn_id: str
            conn_type: str
            **kwargs
    }

    class OrbiterEnvVar["orbiter.objects.env_var.OrbiterEnvVar"] {
            key: str
            value: str
    }

    class OrbiterPool["orbiter.objects.pool.OrbiterPool"] {
            name: str
            description: str | None
            slots: int | None
    }

    class OrbiterRequirement["orbiter.objects.requirement.OrbiterRequirement"] {
            package: str | None
            module: str | None
            names: List[str] | None
            sys_package: str | None
    }

    class OrbiterVariable["orbiter.objects.variable.OrbiterVariable"] {
            key: str
            value: str
    }

orbiter.objects.dag.OrbiterDAG

Represents an Airflow DAG, with its tasks and dependencies.

Renders to a .py file in the /dags folder

Parameters:

Name Type Description
file_path str

File path of the DAG, relative to the /dags folder (filepath=my_dag.py would render to dags/my_dag.py)

dag_id str

The dag_id. Must be unique and snake_case. Good practice is to set dag_id == file_path

schedule str | OrbiterTimetable, optional

The schedule for the DAG. Defaults to None (only runs when manually triggered)

catchup bool, optional

Whether to catchup runs from the start_date to now, on first run. Defaults to False

start_date DateTime, optional

The start date for the DAG. Defaults to Unix Epoch

tags List[str], optional

Tags for the DAG, used for sorting and filtering in the Airflow UI

default_args Dict[str, Any], optional

Default arguments for any tasks in the DAG

params Dict[str, Any], optional

Params for the DAG

doc_md str, optional

Documentation for the DAG with markdown support

kwargs dict, optional

Additional keyword arguments to pass to the DAG

**OrbiterBase

OrbiterBase inherited properties

Timetables

Modules:

Name Description
multi_cron_timetable

Classes:

Name Description
OrbiterTimetable

orbiter.objects.timetables.OrbiterTimetable

An Airflow Timetable reference.

Utilizes OrbiterInclude to add a file to a /plugins folder to register the timetable.

Parameters:

Name Type Description
**kwargs

any other kwargs to provide to Timetable

orbiter.objects.timetables.multi_cron_timetable

Classes:

Name Description
OrbiterMultiCronTimetable

An

orbiter.objects.timetables.multi_cron_timetable.OrbiterMultiCronTimetable

An Airflow Timetable that can be supplied with multiple cron strings.

>>> OrbiterMultiCronTimetable(cron_defs=["*/5 * * * *", "*/7 * * * *"])
MultiCronTimetable(cron_defs=['*/5 * * * *', '*/7 * * * *'])

Parameters:

Name Type Description
cron_defs List[str]

A list of cron strings

timezone str

The timezone to use for the timetable

period_length int

The length of the period

period_unit str

The unit of the period