Workflow¶
Airflow workflows are represented by a DAG which is a Directed Acyclic Graph (of Tasks).
Diagram¶
classDiagram
direction LR
OrbiterDAG --> "many" OrbiterOperator
OrbiterDAG --> "many" OrbiterTaskGroup
OrbiterDAG --> "many" OrbiterRequirement
OrbiterDAG --> "many" OrbiterCallback
class OrbiterDAG["orbiter.objects.dag.OrbiterDAG"] {
imports: List[OrbiterRequirement]
file_path: str
dag_id: str
schedule: str | OrbiterTimetable | None
catchup: bool
start_date: DateTime
tags: List[str]
default_args: Dict[str, Any]
params: Dict[str, Any]
doc_md: str | None
tasks: Dict[str, OrbiterOperator]
kwargs: dict
orbiter_kwargs: dict
orbiter_conns: Set[OrbiterConnection]
orbiter_vars: Set[OrbiterVariable]
orbiter_env_vars: Set[OrbiterEnvVar]
orbiter_includes: Set[OrbiterInclude]
on_failure_callback: OrbiterCallback
on_success_callback: OrbiterCallback
on_retry_callback: OrbiterCallback
on_skipped_callback: OrbiterCallback
on_execute_callback: OrbiterCallback
sla_miss_callback: OrbiterCallback
}
click OrbiterDAG href "#orbiter.objects.dag.OrbiterDAG" "OrbiterDAG Documentation"
OrbiterTaskGroup --> "many" OrbiterRequirement
class OrbiterTaskGroup["orbiter.objects.task.OrbiterTaskGroup"] {
task_group_id: str
tasks: Dict[str, OrbiterOperator | OrbiterTaskGroup]
add_downstream(str | List[str] | OrbiterTaskDependency)
}
click OrbiterTaskGroup href "../tasks/#orbiter.objects.task_group.OrbiterTaskGroup" "OrbiterTaskGroup Documentation"
OrbiterOperator --> "many" OrbiterRequirement
OrbiterOperator --> "one" OrbiterPool
OrbiterOperator --> "many" OrbiterConnection
OrbiterOperator --> "many" OrbiterVariable
OrbiterOperator --> "many" OrbiterEnvVar
class OrbiterOperator["orbiter.objects.task.OrbiterOperator"] {
imports: List[OrbiterRequirement]
operator: str
task_id: str
pool: str | None
pool_slots: int | None
trigger_rule: str | None
on_failure_callback: OrbiterCallback
on_success_callback: OrbiterCallback
on_retry_callback: OrbiterCallback
on_skipped_callback: OrbiterCallback
on_execute_callback: OrbiterCallback
sla_miss_callback: OrbiterCallback
downstream: Set[str]
add_downstream(str | List[str] | OrbiterTaskDependency)
}
click OrbiterOperator href "../tasks/#orbiter.objects.task.OrbiterOperator" "OrbiterOperator Documentation"
class OrbiterTimetable["orbiter.objects.timetables.OrbiterTimetable"] {
}
click OrbiterTimetable href "#orbiter.objects.timetables.OrbiterTimetable" "OrbiterTimetable Documentation"
OrbiterTimetable "implements" <|-- OrbiterMultipleCronTriggerTimetable
class OrbiterMultipleCronTriggerTimetable["orbiter.objects.timetables.multiple_cron_trigger_timetable.OrbiterMultipleCronTriggerTimetable"] {
timetable = "MultipleCronTriggerTimetable"
crons: List[str]
timezone: str
interval: timedelta
run_immediately: bool
}
click OrbiterMultipleCronTriggerTimetable href "#orbiter.objects.timetables.multiple_cron_trigger_timetable.OrbiterMultipleCronTriggerTimetable" "OrbiterMultipleCronTriggerTimetable Documentation"
class OrbiterConnection["orbiter.objects.connection.OrbiterConnection"] {
conn_id: str
conn_type: str
**kwargs
}
click OrbiterConnection href "../project/#orbiter.objects.connection.OrbiterConnection" "OrbiterConnection Documentation"
class OrbiterEnvVar["orbiter.objects.env_var.OrbiterEnvVar"] {
key: str
value: str
}
click OrbiterEnvVar href "../project/#orbiter.objects.env_var.OrbiterEnvVar" "OrbiterEnvVar Documentation"
class OrbiterPool["orbiter.objects.pool.OrbiterPool"] {
name: str
description: str | None
slots: int | None
}
click OrbiterPool href "../project/#orbiter.objects.pool.OrbiterPool" "OrbiterPool Documentation"
class OrbiterRequirement["orbiter.objects.requirement.OrbiterRequirement"] {
package: str | None
module: str | None
names: List[str] | None
sys_package: str | None
}
click OrbiterRequirement href "../project/#orbiter.objects.requirement.OrbiterRequirement" "OrbiterDAG Documentation"
class OrbiterVariable["orbiter.objects.variable.OrbiterVariable"] {
key: str
value: str
}
click OrbiterVariable href "../project/#orbiter.objects.variable.OrbiterVariable" "OrbiterVariable Documentation"
orbiter.objects.dag.OrbiterDAG ¶
Represents an Airflow DAG, with its tasks and dependencies.
Renders to a .py file in the /dags folder
Parameters:
| Name | Type | Description |
|---|---|---|
file_path |
str
|
File path of the DAG, relative to the |
dag_id |
str
|
The |
schedule |
(str | OrbiterTimetable, optional)
|
The schedule for the DAG. Defaults to None (only runs when manually triggered) |
catchup |
(bool, optional)
|
Whether to catchup runs from the |
start_date |
(DateTime, optional)
|
The start date for the DAG. Defaults to Unix Epoch |
tags |
(List[str], optional)
|
Tags for the DAG, used for sorting and filtering in the Airflow UI |
default_args |
(Dict[str, Any], optional)
|
Default arguments for any tasks in the DAG |
params |
(Dict[str, Any], optional)
|
Params for the DAG |
doc_md |
(str, optional)
|
Documentation for the DAG with markdown support |
kwargs |
(dict, optional)
|
Additional keyword arguments to pass to the DAG |
**OrbiterBase |
OrbiterBase inherited properties |
Timetables¶
orbiter.objects.timetables.OrbiterTimetable ¶
An Airflow Timetable reference.
Utilizes OrbiterInclude
to add a file to a /plugins folder to register the timetable.
Parameters:
| Name | Type | Description |
|---|---|---|
**kwargs |
any other kwargs to provide to Timetable |
orbiter.objects.timetables.multiple_cron_trigger_timetable.OrbiterMultipleCronTriggerTimetable ¶
An Airflow Multiple Cron Timetable that can be supplied with multiple cron strings.
>>> OrbiterMultipleCronTriggerTimetable(crons=["*/5 * * * *", "*/7 * * * *"])
MultipleCronTriggerTimetable('*/5 * * * *', '*/7 * * * *', timezone='UTC')
Parameters:
| Name | Type | Description |
|---|---|---|
crons |
List[str]
|
A list of cron strings |
timezone |
str
|
The timezone to use for the timetable |
interval |
timedelta
|
to set the Dag data interval |
run_immediately |
bool
|
Whether to run immediately on Dag creation |