Skip to content

Project

An OrbiterProject holds everything necessary to render an Airflow Project. It is generated by a TranslationRuleset.translate_fn.

Diagram

classDiagram
    direction LR

    OrbiterProject --> "many" OrbiterConnection
    OrbiterProject --> "many" OrbiterDAG
    OrbiterProject --> "many" OrbiterEnvVar
    OrbiterProject --> "many" OrbiterInclude
    OrbiterProject --> "many" OrbiterPool
    OrbiterProject --> "many" OrbiterRequirement
    OrbiterProject --> "many" OrbiterVariable
    class OrbiterProject["orbiter.objects.project.OrbiterProject"] {
            connections: Dict[str, OrbiterConnection]
            dags: Dict[str, OrbiterDAG]
            env_vars: Dict[str, OrbiterEnvVar]
            includes: Dict[str, OrbiterInclude]
            pools: Dict[str, OrbiterPool]
            requirements: Set[OrbiterRequirement]
            variables: Dict[str, OrbiterVariable]
    }
    click OrbiterProject href "#orbiter.objects.project.OrbiterProject" "OrbiterProject Documentation"

    class OrbiterConnection["orbiter.objects.connection.OrbiterConnection"] {
            conn_id: str
            conn_type: str
            **kwargs
    }
    click OrbiterConnection href "#orbiter.objects.connection.OrbiterConnection" "OrbiterConnection Documentation"

    OrbiterDAG --> "many" OrbiterInclude
    OrbiterDAG --> "many" OrbiterConnection
    OrbiterDAG --> "many" OrbiterEnvVar
    OrbiterDAG --> "many" OrbiterRequirement
    OrbiterDAG --> "many" OrbiterVariable
    class OrbiterDAG["orbiter.objects.dag.OrbiterDAG"] {
            imports: List[OrbiterRequirement]
            file_path: str
            dag_id: str
            schedule: str | OrbiterTimetable | None
            catchup: bool
            start_date: DateTime
            tags: List[str]
            default_args: Dict[str, Any]
            params: Dict[str, Any]
            doc_md: str | None
            tasks: Dict[str, OrbiterOperator]
            kwargs: dict
            orbiter_kwargs: dict
            orbiter_conns: Set[OrbiterConnection]
            orbiter_vars: Set[OrbiterVariable]
            orbiter_env_vars: Set[OrbiterEnvVar]
            orbiter_includes: Set[OrbiterInclude]
    }

    class OrbiterEnvVar["orbiter.objects.env_var.OrbiterEnvVar"] {
            key: str
            value: str
    }
    click OrbiterEnvVar href "#orbiter.objects.env_var.OrbiterEnvVar" "OrbiterEnvVar Documentation"

    class OrbiterInclude["orbiter.objects.include.OrbiterInclude"] {
            filepath: str
            contents: str
    }
    click OrbiterInclude href "#orbiter.objects.include.OrbiterInclude" "OrbiterInclude Documentation"

    class OrbiterPool["orbiter.objects.pool.OrbiterPool"] {
            name: str
            description: str | None
            slots: int | None
    }
    click OrbiterPool href "#orbiter.objects.pool.OrbiterPool" "OrbiterPool Documentation"

    class OrbiterRequirement["orbiter.objects.requirement.OrbiterRequirement"] {
            package: str | None
            module: str | None
            names: List[str] | None
            sys_package: str | None
    }
    click OrbiterRequirement href "#orbiter.objects.requirement.OrbiterRequirement" "OrbiterRequirement Documentation"

    class OrbiterVariable["orbiter.objects.variable.OrbiterVariable"] {
            key: str
            value: str
    }
    click OrbiterVariable href "#orbiter.objects.variable.OrbiterVariable" "OrbiterVariable Documentation"

orbiter.objects.connection.OrbiterConnection

An Airflow Connection, rendered to an airflow_settings.yaml file.

See also other Connection documentation.

>>> OrbiterConnection(
...     conn_id="my_conn_id", conn_type="mysql", host="localhost", port=3306, login="root"
... ).render()
{'conn_id': 'my_conn_id', 'conn_type': 'mysql', 'conn_host': 'localhost', 'conn_port': 3306, 'conn_login': 'root'}

Note

Use the utility conn_id function to generate both an OrbiterConnection and connection property for an operator

from orbiter.objects import conn_id

OrbiterTask(
    ... ,
    **conn_id("my_conn_id", conn_type="mysql"),
)

Parameters:

Name Type Description
conn_id str

The ID of the connection

conn_type str, optional

The type of the connection, always lowercase. Defaults to 'generic'

**kwargs

Additional properties for the connection

orbiter.objects.env_var.OrbiterEnvVar

Represents an Environmental Variable, renders to a line in .env file

>>> OrbiterEnvVar(key="foo", value="bar").render()
'foo=bar'

Parameters:

Name Type Description
key str

The key of the environment variable

value str

The value of the environment variable

orbiter.objects.include.OrbiterInclude

Represents an included file in an /include directory

Parameters:

Name Type Description
filepath str

The relative path (from the output directory) to write the file to

contents str

The contents of the file

orbiter.objects.pool.OrbiterPool

An Airflow Pool, rendered to an airflow_settings.yaml file.

>>> OrbiterPool(name="foo", description="bar", slots=5).render()
{'pool_name': 'foo', 'pool_description': 'bar', 'pool_slot': 5}

Note

Use the utility pool function to easily generate both an OrbiterPool and pool property for an operator

from orbiter.objects import pool

OrbiterTask(
    ... ,
    **pool("my_pool"),
)

Parameters:

Name Type Description
name str

The name of the pool

description str, optional

The description of the pool

slots int, optional

The number of slots in the pool. Defaults to 128

orbiter.objects.requirement.OrbiterRequirement

A requirement for a project (e.g. apache-airflow-providers-google), and it's representation in the DAG file.

Renders via the DAG File (as an import statement), requirements.txt, and packages.txt

Tip

If a given requirement has multiple packages required, it can be defined as multiple OrbiterRequirement objects.

Example:

OrbiterTask(
    ...,
    imports=[
        OrbiterRequirement(package="apache-airflow-providers-google", ...),
        OrbiterRequirement(package="bigquery", sys_package="mysql", ...),
    ],
)

Parameters:

Name Type Description
package str, optional

e.g. "apache-airflow-providers-google"

module str, optional

e.g. "airflow.providers.google.cloud.operators.bigquery", defaults to None

names List[str], optional

e.g. ["BigQueryCreateEmptyDatasetOperator"], defaults to []

sys_package Set[str], optional

e.g. "mysql" - represents a Debian system package

orbiter.objects.variable.OrbiterVariable

An Airflow Variable, rendered to an airflow_settings.yaml file.

>>> OrbiterVariable(key="foo", value="bar").render()
{'variable_value': 'bar', 'variable_name': 'foo'}

Parameters:

Name Type Description
key str

The key of the variable

value str

The value of the variable

orbiter.objects.project.OrbiterProject

OrbiterProject()

Holds everything necessary to render an Airflow Project. This is generated by a TranslationRuleset.translate_fn.

Tip

They can be added together

>>> OrbiterProject() + OrbiterProject()
OrbiterProject(dags=[], requirements=[], pools=[], connections=[], variables=[], env_vars=[])

And compared

>>> OrbiterProject() == OrbiterProject()
True

Parameters:

Name Type Description
connections Dict[str, OrbiterConnection]

A dictionary of OrbiterConnections

dags Dict[str, OrbiterDAG]

A dictionary of OrbiterDAGs

env_vars Dict[str, OrbiterEnvVar]

A dictionary of OrbiterEnvVars

includes Dict[str, OrbiterInclude]

A dictionary of OrbiterIncludes

pools Dict[str, OrbiterPool]

A dictionary of OrbiterPools

requirements Set[OrbiterRequirement]
variables Dict[str, OrbiterVariable]

A dictionary of OrbiterVariables

Source code in orbiter/objects/project.py
def __init__(self):
    self.dags: Dict[str, OrbiterDAG] = dict()
    self.requirements: Set[OrbiterRequirement] = set()
    self.pools: Dict[str, OrbiterPool] = dict()
    self.connections: Dict[str, OrbiterConnection] = dict()
    self.variables: Dict[str, OrbiterVariable] = dict()
    self.env_vars: Dict[str, OrbiterEnvVar] = dict()
    self.includes: Dict[str, OrbiterInclude] = dict()

add_connections

add_connections(
    connections: (
        OrbiterConnection | Iterable[OrbiterConnection]
    ),
) -> "OrbiterProject"

Add OrbiterConnections to the Project or override an existing connection with new properties

>>> OrbiterProject().add_connections(OrbiterConnection(conn_id='foo')).connections
{'foo': OrbiterConnection(conn_id=foo, conn_type=generic)}

>>> OrbiterProject().add_connections(
...     [OrbiterConnection(conn_id='foo'), OrbiterConnection(conn_id='bar')]
... ).connections
{'foo': OrbiterConnection(conn_id=foo, conn_type=generic), 'bar': OrbiterConnection(conn_id=bar, conn_type=generic)}

Tip

Validation requires an OrbiterConnection to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_connections('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...
>>> # noinspection PyTypeChecker
>>> OrbiterProject().add_connections(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
connections OrbiterConnection | Iterable[OrbiterConnection]

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_connections(
    self, connections: OrbiterConnection | Iterable[OrbiterConnection]
) -> "OrbiterProject":
    """Add [`OrbiterConnections`][orbiter.objects.connection.OrbiterConnection] to the Project
    or override an existing connection with new properties

    ```pycon
    >>> OrbiterProject().add_connections(OrbiterConnection(conn_id='foo')).connections
    {'foo': OrbiterConnection(conn_id=foo, conn_type=generic)}

    >>> OrbiterProject().add_connections(
    ...     [OrbiterConnection(conn_id='foo'), OrbiterConnection(conn_id='bar')]
    ... ).connections
    {'foo': OrbiterConnection(conn_id=foo, conn_type=generic), 'bar': OrbiterConnection(conn_id=bar, conn_type=generic)}

    ```

    !!! tip

        Validation requires an `OrbiterConnection` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_connections('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...
        >>> # noinspection PyTypeChecker
        >>> OrbiterProject().add_connections(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```

    :param connections: List of [`OrbiterConnections`][orbiter.objects.connection.OrbiterConnection]
    :type connections: List[OrbiterConnection] | OrbiterConnection
    :return: self
    :rtype: OrbiterProject
    """  # noqa: E501
    for connection in (
        [connections] if isinstance(connections, OrbiterConnection) else connections
    ):
        self.connections[connection.conn_id] = connection
    return self

add_dags

add_dags(
    dags: OrbiterDAG | Iterable[OrbiterDAG],
) -> "OrbiterProject"

Add OrbiterDAGs (and any OrbiterRequirements, OrbiterConns, OrbiterVars, OrbiterPools, OrbiterEnvVars, etc.) to the Project.

>>> OrbiterProject().add_dags(OrbiterDAG(dag_id='foo', file_path="")).dags['foo'].repr()
'OrbiterDAG(dag_id=foo, schedule=None, start_date=1970-01-01 00:00:00, catchup=False)'

>>> dags = OrbiterProject().add_dags(
...     [OrbiterDAG(dag_id='foo', file_path=""), OrbiterDAG(dag_id='bar', file_path="")]
... ).dags; dags['foo'].repr(), dags['bar'].repr()
... # doctest: +NORMALIZE_WHITESPACE
('OrbiterDAG(dag_id=foo, schedule=None, start_date=1970-01-01 00:00:00, catchup=False)', 'OrbiterDAG(dag_id=bar, schedule=None, start_date=1970-01-01 00:00:00, catchup=False)')

>>> # An example adding a little of everything, including deeply nested things
... from orbiter.objects.operators.bash import OrbiterBashOperator
>>> from orbiter.objects.timetables.multi_cron_timetable import OrbiterMultiCronTimetable
>>> from orbiter.objects.callbacks.smtp import OrbiterSmtpNotifierCallback
>>> OrbiterProject().add_dags(OrbiterDAG(
...     dag_id='foo', file_path="",
...     orbiter_env_vars={OrbiterEnvVar(key="foo", value="bar")},
...     orbiter_includes={OrbiterInclude(filepath='foo.txt', contents="Hello, World!")},
...     schedule=OrbiterMultiCronTimetable(cron_defs=["0 */5 * * *", "0 */3 * * *"]),
...     tasks={'foo': OrbiterTaskGroup(task_group_id="foo",
...         tasks=[OrbiterBashOperator(
...             task_id='foo', bash_command='echo "Hello, World!"',
...             orbiter_pool=OrbiterPool(name='foo', slots=1),
...             orbiter_vars={OrbiterVariable(key='foo', value='bar')},
...             orbiter_conns={OrbiterConnection(conn_id='foo')},
...             orbiter_env_vars={OrbiterEnvVar(key='foo', value='bar')},
...             on_success_callback=OrbiterSmtpNotifierCallback(
...                 to="foo@bar.com",
...                 smtp_conn_id="SMTP",
...                 orbiter_conns={OrbiterConnection(conn_id="SMTP", conn_type="smtp")}
...             )
...         )]
...     )}
... ))
... # doctest: +NORMALIZE_WHITESPACE
OrbiterProject(dags=[foo],
requirements=[OrbiterRequirements(names=[DAG], package=apache-airflow, module=airflow, sys_package=None),
OrbiterRequirements(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None),
OrbiterRequirements(names=[send_smtp_notification], package=apache-airflow-providers-smtp, module=airflow.providers.smtp.notifications.smtp, sys_package=None),
OrbiterRequirements(names=[TaskGroup], package=apache-airflow, module=airflow.utils.task_group, sys_package=None),
OrbiterRequirements(names=[MultiCronTimetable], package=croniter, module=multi_cron_timetable, sys_package=None),
OrbiterRequirements(names=[DateTime,Timezone], package=pendulum, module=pendulum, sys_package=None)],
pools=['foo'],
connections=['SMTP', 'foo'],
variables=['foo'],
env_vars=['foo'])

Tip

Validation requires an OrbiterDAG to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_dags('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...
>>> # noinspection PyTypeChecker
>>> OrbiterProject().add_dags(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
dags OrbiterDAG | Iterable[OrbiterDAG]

List of OrbiterDAGs

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_dags(self, dags: OrbiterDAG | Iterable[OrbiterDAG]) -> "OrbiterProject":
    """Add [OrbiterDAGs][orbiter.objects.dag.OrbiterDAG]
    (and any [OrbiterRequirements][orbiter.objects.requirement.OrbiterRequirement],
    [OrbiterConns][orbiter.objects.connection.OrbiterConnection],
    [OrbiterVars][orbiter.objects.variable.OrbiterVariable],
    [OrbiterPools][orbiter.objects.pool.OrbiterPool],
    [OrbiterEnvVars][orbiter.objects.env_var.OrbiterEnvVar], etc.)
    to the Project.

    ```pycon
    >>> OrbiterProject().add_dags(OrbiterDAG(dag_id='foo', file_path="")).dags['foo'].repr()
    'OrbiterDAG(dag_id=foo, schedule=None, start_date=1970-01-01 00:00:00, catchup=False)'

    >>> dags = OrbiterProject().add_dags(
    ...     [OrbiterDAG(dag_id='foo', file_path=""), OrbiterDAG(dag_id='bar', file_path="")]
    ... ).dags; dags['foo'].repr(), dags['bar'].repr()
    ... # doctest: +NORMALIZE_WHITESPACE
    ('OrbiterDAG(dag_id=foo, schedule=None, start_date=1970-01-01 00:00:00, catchup=False)', 'OrbiterDAG(dag_id=bar, schedule=None, start_date=1970-01-01 00:00:00, catchup=False)')

    >>> # An example adding a little of everything, including deeply nested things
    ... from orbiter.objects.operators.bash import OrbiterBashOperator
    >>> from orbiter.objects.timetables.multi_cron_timetable import OrbiterMultiCronTimetable
    >>> from orbiter.objects.callbacks.smtp import OrbiterSmtpNotifierCallback
    >>> OrbiterProject().add_dags(OrbiterDAG(
    ...     dag_id='foo', file_path="",
    ...     orbiter_env_vars={OrbiterEnvVar(key="foo", value="bar")},
    ...     orbiter_includes={OrbiterInclude(filepath='foo.txt', contents="Hello, World!")},
    ...     schedule=OrbiterMultiCronTimetable(cron_defs=["0 */5 * * *", "0 */3 * * *"]),
    ...     tasks={'foo': OrbiterTaskGroup(task_group_id="foo",
    ...         tasks=[OrbiterBashOperator(
    ...             task_id='foo', bash_command='echo "Hello, World!"',
    ...             orbiter_pool=OrbiterPool(name='foo', slots=1),
    ...             orbiter_vars={OrbiterVariable(key='foo', value='bar')},
    ...             orbiter_conns={OrbiterConnection(conn_id='foo')},
    ...             orbiter_env_vars={OrbiterEnvVar(key='foo', value='bar')},
    ...             on_success_callback=OrbiterSmtpNotifierCallback(
    ...                 to="foo@bar.com",
    ...                 smtp_conn_id="SMTP",
    ...                 orbiter_conns={OrbiterConnection(conn_id="SMTP", conn_type="smtp")}
    ...             )
    ...         )]
    ...     )}
    ... ))
    ... # doctest: +NORMALIZE_WHITESPACE
    OrbiterProject(dags=[foo],
    requirements=[OrbiterRequirements(names=[DAG], package=apache-airflow, module=airflow, sys_package=None),
    OrbiterRequirements(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None),
    OrbiterRequirements(names=[send_smtp_notification], package=apache-airflow-providers-smtp, module=airflow.providers.smtp.notifications.smtp, sys_package=None),
    OrbiterRequirements(names=[TaskGroup], package=apache-airflow, module=airflow.utils.task_group, sys_package=None),
    OrbiterRequirements(names=[MultiCronTimetable], package=croniter, module=multi_cron_timetable, sys_package=None),
    OrbiterRequirements(names=[DateTime,Timezone], package=pendulum, module=pendulum, sys_package=None)],
    pools=['foo'],
    connections=['SMTP', 'foo'],
    variables=['foo'],
    env_vars=['foo'])

    ```

    !!! tip

        Validation requires an `OrbiterDAG` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_dags('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...
        >>> # noinspection PyTypeChecker
        >>> OrbiterProject().add_dags(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```

    :param dags: List of [OrbiterDAGs][orbiter.objects.dag.OrbiterDAG]
    :type dags: List[OrbiterDAG] | OrbiterDAG
    :return: self
    :rtype: OrbiterProject
    """  # noqa: E501

    # noinspection t
    def _add_recursively(
        things: Iterable[
            OrbiterOperator
            | OrbiterTaskGroup
            | OrbiterCallback
            | OrbiterTimetable
            | OrbiterDAG
        ],
    ):
        for thing in things:
            if isinstance(thing, str):
                continue
            if hasattr(thing, "orbiter_pool") and (pool := thing.orbiter_pool):
                self.add_pools(pool)
            if hasattr(thing, "orbiter_conns") and (conns := thing.orbiter_conns):
                self.add_connections(conns)
            if hasattr(thing, "orbiter_vars") and (variables := thing.orbiter_vars):
                self.add_variables(variables)
            if hasattr(thing, "orbiter_env_vars") and (
                env_vars := thing.orbiter_env_vars
            ):
                self.add_env_vars(env_vars)
            if hasattr(thing, "orbiter_includes") and (
                includes := thing.orbiter_includes
            ):
                self.add_includes(includes)
            if hasattr(thing, "imports") and (imports := thing.imports):
                self.add_requirements(imports)
            if isinstance(thing, OrbiterTaskGroup) and (tasks := thing.tasks):
                _add_recursively(tasks)
            if hasattr(thing, "__dict__") or hasattr(thing, "model_extra"):
                # If it's a pydantic model or dict, check its properties for more things to add
                _add_recursively(
                    (
                        (getattr(thing, "__dict__", {}) or dict())
                        | (getattr(thing, "model_extra", {}) or dict())
                    ).values()
                )

    for dag in [dags] if isinstance(dags, OrbiterDAG) else dags:
        dag_id = dag.dag_id

        # Add or update the DAG
        if dag_id in self.dags:
            self.dags[dag_id] += dag
        else:
            self.dags[dag_id] = dag

        # Add anything that might be in the tasks of the DAG - such as imports, Connections, etc
        _add_recursively((dag.tasks or {}).values())

        # Add anything that might be in the `dag.schedule` - such as Includes, Timetables, Connections, etc
        _add_recursively([dag])
    return self

add_env_vars

add_env_vars(
    env_vars: OrbiterEnvVar | Iterable[OrbiterEnvVar],
) -> "OrbiterProject"

Add OrbiterEnvVars to the Project or override an existing env var with new properties

>>> OrbiterProject().add_env_vars(OrbiterEnvVar(key="foo", value="bar")).env_vars
{'foo': OrbiterEnvVar(key='foo', value='bar')}

>>> OrbiterProject().add_env_vars([OrbiterEnvVar(key="foo", value="bar")]).env_vars
{'foo': OrbiterEnvVar(key='foo', value='bar')}

Tip

Validation requires an OrbiterEnvVar to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_env_vars('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

>>> # noinspection PyTypeChecker
... OrbiterProject().add_env_vars(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
env_vars OrbiterEnvVar | Iterable[OrbiterEnvVar]

List of OrbiterEnvVar

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_env_vars(
    self, env_vars: OrbiterEnvVar | Iterable[OrbiterEnvVar]
) -> "OrbiterProject":
    """
    Add [OrbiterEnvVars][orbiter.objects.env_var.OrbiterEnvVar] to the Project
    or override an existing env var with new properties

    ```pycon
    >>> OrbiterProject().add_env_vars(OrbiterEnvVar(key="foo", value="bar")).env_vars
    {'foo': OrbiterEnvVar(key='foo', value='bar')}

    >>> OrbiterProject().add_env_vars([OrbiterEnvVar(key="foo", value="bar")]).env_vars
    {'foo': OrbiterEnvVar(key='foo', value='bar')}

    ```

    !!! tip

        Validation requires an `OrbiterEnvVar` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_env_vars('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_env_vars(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```

    :param env_vars: List of [OrbiterEnvVar][orbiter.objects.env_var.OrbiterEnvVar]
    :type env_vars: List[OrbiterEnvVar] | OrbiterEnvVar
    :return: self
    :rtype: OrbiterProject
    """
    for env_var in [env_vars] if isinstance(env_vars, OrbiterEnvVar) else env_vars:
        self.env_vars[env_var.key] = env_var
    return self

add_includes

add_includes(
    includes: OrbiterInclude | Iterable[OrbiterInclude],
) -> "OrbiterProject"

Add OrbiterIncludes to the Project or override an existing OrbiterInclude with new properties

>>> OrbiterProject().add_includes(OrbiterInclude(filepath="foo", contents="bar")).includes
{'foo': OrbiterInclude(filepath='foo', contents='bar')}

>>> OrbiterProject().add_includes([OrbiterInclude(filepath="foo", contents="bar")]).includes
{'foo': OrbiterInclude(filepath='foo', contents='bar')}

Tip

Validation requires an OrbiterInclude to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_includes('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

>>> # noinspection PyTypeChecker
... OrbiterProject().add_includes(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
includes OrbiterInclude | Iterable[OrbiterInclude]

List of OrbiterIncludes

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_includes(
    self, includes: OrbiterInclude | Iterable[OrbiterInclude]
) -> "OrbiterProject":
    """Add [OrbiterIncludes][orbiter.objects.include.OrbiterInclude] to the Project
    or override an existing [OrbiterInclude][orbiter.objects.include.OrbiterInclude] with new properties

    ```pycon
    >>> OrbiterProject().add_includes(OrbiterInclude(filepath="foo", contents="bar")).includes
    {'foo': OrbiterInclude(filepath='foo', contents='bar')}

    >>> OrbiterProject().add_includes([OrbiterInclude(filepath="foo", contents="bar")]).includes
    {'foo': OrbiterInclude(filepath='foo', contents='bar')}

    ```

    !!! tip

        Validation requires an `OrbiterInclude` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_includes('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_includes(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```
    :param includes: List of [OrbiterIncludes][orbiter.objects.include.OrbiterInclude]
    :type includes: List[OrbiterInclude]
    :return: self
    :rtype: OrbiterProject
    """
    for include in [includes] if isinstance(includes, OrbiterInclude) else includes:
        self.includes[include.filepath] = include
    return self

add_pools

add_pools(
    pools: OrbiterPool | Iterable[OrbiterPool],
) -> "OrbiterProject"

Add OrbiterPool to the Project or override existing pools with new properties

>>> OrbiterProject().add_pools(OrbiterPool(name="foo", slots=1)).pools
{'foo': OrbiterPool(name='foo', description='', slots=1)}

>>> ( OrbiterProject()
...     .add_pools([OrbiterPool(name="foo", slots=1)])
...     .add_pools([OrbiterPool(name="foo", slots=2)])
...     .pools
... )
{'foo': OrbiterPool(name='foo', description='', slots=2)}

Tip

Validation requires an OrbiterPool to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_pools('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...
>>> # noinspection PyTypeChecker
... OrbiterProject().add_pools(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
pools OrbiterPool | Iterable[OrbiterPool]

List of OrbiterPools

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_pools(self, pools: OrbiterPool | Iterable[OrbiterPool]) -> "OrbiterProject":
    """Add [OrbiterPool][orbiter.objects.pool.OrbiterPool] to the Project
    or override existing pools with new properties

    ```pycon
    >>> OrbiterProject().add_pools(OrbiterPool(name="foo", slots=1)).pools
    {'foo': OrbiterPool(name='foo', description='', slots=1)}

    >>> ( OrbiterProject()
    ...     .add_pools([OrbiterPool(name="foo", slots=1)])
    ...     .add_pools([OrbiterPool(name="foo", slots=2)])
    ...     .pools
    ... )
    {'foo': OrbiterPool(name='foo', description='', slots=2)}

    ```

    !!! tip

        Validation requires an `OrbiterPool` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_pools('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_pools(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```
    :param pools: List of [OrbiterPools][orbiter.objects.pool.OrbiterPool]
    :type pools: List[OrbiterPool] | OrbiterPool
    :return: self
    :rtype: OrbiterProject
    """
    for pool in [pools] if isinstance(pools, OrbiterPool) else pools:
        if pool.name in self.pools:
            self.pools[pool.name] += pool
        else:
            self.pools[pool.name] = pool
    return self

add_requirements

add_requirements(
    requirements: (
        OrbiterRequirement | Iterable[OrbiterRequirement]
    ),
) -> "OrbiterProject"

Add OrbiterRequirements to the Project or override an existing requirement with new properties

>>> OrbiterProject().add_requirements(
...    OrbiterRequirement(package="apache-airflow", names=['foo'], module='bar'),
... ).requirements
{OrbiterRequirements(names=[foo], package=apache-airflow, module=bar, sys_package=None)}

>>> OrbiterProject().add_requirements(
...    [OrbiterRequirement(package="apache-airflow", names=['foo'], module='bar')],
... ).requirements
{OrbiterRequirements(names=[foo], package=apache-airflow, module=bar, sys_package=None)}

Tip

Validation requires an OrbiterRequirement to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_requirements('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...
>>> # noinspection PyTypeChecker
>>> OrbiterProject().add_requirements(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
requirements OrbiterRequirement | Iterable[OrbiterRequirement]

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_requirements(
    self, requirements: OrbiterRequirement | Iterable[OrbiterRequirement]
) -> "OrbiterProject":
    """Add [OrbiterRequirements][orbiter.objects.requirement.OrbiterRequirement] to the Project
    or override an existing requirement with new properties

    ```pycon
    >>> OrbiterProject().add_requirements(
    ...    OrbiterRequirement(package="apache-airflow", names=['foo'], module='bar'),
    ... ).requirements
    {OrbiterRequirements(names=[foo], package=apache-airflow, module=bar, sys_package=None)}

    >>> OrbiterProject().add_requirements(
    ...    [OrbiterRequirement(package="apache-airflow", names=['foo'], module='bar')],
    ... ).requirements
    {OrbiterRequirements(names=[foo], package=apache-airflow, module=bar, sys_package=None)}

    ```

    !!! tip

        Validation requires an `OrbiterRequirement` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_requirements('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...
        >>> # noinspection PyTypeChecker
        >>> OrbiterProject().add_requirements(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```
    :param requirements: List of [OrbiterRequirements][orbiter.objects.requirement.OrbiterRequirement]
    :type requirements: List[OrbiterRequirement] | OrbiterRequirement
    :return: self
    :rtype: OrbiterProject
    """
    for requirement in (
        [requirements]
        if isinstance(requirements, OrbiterRequirement)
        else requirements
    ):
        self.requirements.add(requirement)
    return self

add_variables

add_variables(
    variables: OrbiterVariable | Iterable[OrbiterVariable],
) -> "OrbiterProject"

Add OrbiterVariables to the Project or override an existing variable with new properties

>>> OrbiterProject().add_variables(OrbiterVariable(key="foo", value="bar")).variables
{'foo': OrbiterVariable(key='foo', value='bar')}

>>> OrbiterProject().add_variables([OrbiterVariable(key="foo", value="bar")]).variables
{'foo': OrbiterVariable(key='foo', value='bar')}

Tip

Validation requires an OrbiterVariable to be passed

>>> # noinspection PyTypeChecker
... OrbiterProject().add_variables('foo')
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...
>>> # noinspection PyTypeChecker
... OrbiterProject().add_variables(['foo'])
... # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
pydantic_core._pydantic_core.ValidationError: ...

Parameters:

Name Type Description
variables OrbiterVariable | Iterable[OrbiterVariable]

List of OrbiterVariable

Returns:

Type Description
OrbiterProject

self

Source code in orbiter/objects/project.py
def add_variables(
    self, variables: OrbiterVariable | Iterable[OrbiterVariable]
) -> "OrbiterProject":
    """Add [OrbiterVariables][orbiter.objects.variable.OrbiterVariable] to the Project
    or override an existing variable with new properties

    ```pycon
    >>> OrbiterProject().add_variables(OrbiterVariable(key="foo", value="bar")).variables
    {'foo': OrbiterVariable(key='foo', value='bar')}

    >>> OrbiterProject().add_variables([OrbiterVariable(key="foo", value="bar")]).variables
    {'foo': OrbiterVariable(key='foo', value='bar')}

    ```

    !!! tip

        Validation requires an `OrbiterVariable` to be passed
        ```pycon
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_variables('foo')
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...
        >>> # noinspection PyTypeChecker
        ... OrbiterProject().add_variables(['foo'])
        ... # doctest: +IGNORE_EXCEPTION_DETAIL
        Traceback (most recent call last):
        pydantic_core._pydantic_core.ValidationError: ...

        ```
    :param variables: List of [OrbiterVariable][orbiter.objects.variable.OrbiterVariable]
    :type variables: List[OrbiterVariable] | OrbiterVariable
    :return: self
    :rtype: OrbiterProject
    """
    for variable in (
        [variables] if isinstance(variables, OrbiterVariable) else variables
    ):
        self.variables[variable.key] = variable
    return self

analyze

analyze(
    output_fmt: Literal["json", "csv", "md"] = "md",
    output_file=None,
)

Print an analysis of the project to the console.

Tip

Looks for a specific [task_type=XYZ] in the Task's doc_md property or uses type(task) to infer the type of task.

>>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
>>> OrbiterProject().add_dags([
...     OrbiterDAG(file_path="", dag_id="foo", orbiter_kwargs={"file_path": "foo.py"},
...         tasks={"bar": OrbiterEmptyOperator(task_id="bar")}
...     ),
...     OrbiterDAG(file_path="", dag_id="baz", orbiter_kwargs={"file_path": "baz.py"},
...         tasks={"bop": OrbiterEmptyOperator(task_id="bop")}
...     )
... ]).analyze()
... # doctest: +ELLIPSIS
┏━...
...Analysis...
┗━...
<BLANKLINE>
<BLANKLINE>
           DAGs   OrbiterEmptyOperator
 ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
  foo.py      1                      1
  baz.py      1                      1
  Totals      2                      2
<BLANKLINE>
Source code in orbiter/objects/project.py
@validate_call
def analyze(
    self, output_fmt: Literal["json", "csv", "md"] = "md", output_file=None
):
    """Print an analysis of the project to the console.

    !!! tip

        Looks for a specific `[task_type=XYZ]` in the Task's `doc_md` property
        or uses `type(task)` to infer the type of task.

    ```pycon
    >>> from orbiter.objects.operators.empty import OrbiterEmptyOperator
    >>> OrbiterProject().add_dags([
    ...     OrbiterDAG(file_path="", dag_id="foo", orbiter_kwargs={"file_path": "foo.py"},
    ...         tasks={"bar": OrbiterEmptyOperator(task_id="bar")}
    ...     ),
    ...     OrbiterDAG(file_path="", dag_id="baz", orbiter_kwargs={"file_path": "baz.py"},
    ...         tasks={"bop": OrbiterEmptyOperator(task_id="bop")}
    ...     )
    ... ]).analyze()
    ... # doctest: +ELLIPSIS
    ┏━...
    ...Analysis...
    ┗━...
    <BLANKLINE>
    <BLANKLINE>
               DAGs   OrbiterEmptyOperator
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
      foo.py      1                      1
      baz.py      1                      1
      Totals      2                      2
    <BLANKLINE>

    ```
    """
    if output_file is None:
        output_file = sys.stdout

    _task_type = re.compile(r"\[task_type=(?P<task_type>[A-Za-z0-9-_]+)")

    def get_task_type(task):
        match = _task_type.match(getattr(task, "doc_md", None) or "")
        match_or_task_type = (
            match.groupdict().get("task_type") if match else None
        ) or type(task).__name__
        return match_or_task_type

    dag_analysis = [
        {
            "file": dag.orbiter_kwargs.get("file_path", dag.file_path),
            "dag_id": dag.dag_id,
            "task_types": [get_task_type(task) for task in dag.tasks.values()],
        }
        for dag in self.dags.values()
    ]

    file_analysis = {}
    for analysis in dag_analysis:
        analysis_output = file_analysis.get(analysis["file"], {})
        analysis_output["DAGs"] = analysis_output.get("DAGs", 0) + 1
        tasks_of_type = reduce(
            lambda acc, task_type: acc | {task_type: acc.get(task_type, 0) + 1},
            analysis["task_types"],
            dict(),
        )
        analysis_output |= tasks_of_type
        file_analysis[analysis["file"]] = analysis_output

    file_analysis = [{"": k} | v for k, v in file_analysis.items()]
    totals = {"": "Totals"}
    for file in file_analysis:
        for k, v in file.items():
            if k != "":
                totals[k] = totals.get(k, 0) + v
    file_analysis.append(totals)

    if output_fmt == "json":
        import json

        json.dump(file_analysis, output_file, default=str)
    elif output_fmt == "csv":
        import csv

        writer = csv.DictWriter(output_file, fieldnames={""} | totals.keys())
        writer.writeheader()
        writer.writerows(file_analysis)
    elif output_fmt == "md":
        from rich.console import Console
        from rich.markdown import Markdown
        from tabulate import tabulate

        console = Console(file=output_file)

        #         DAGs  EmptyOp
        # file_a     1        1
        table = tabulate(
            tabular_data=file_analysis,
            headers="keys",
            tablefmt="pipe",
            # https://github.com/Textualize/rich/issues/3027
            missingval="⠀",  # (special 'braille space' character)
        )
        console.print(
            Markdown(
                f"# Analysis\n{table}",
                style="magenta",
            )
        )