Skip to content

Overview

Objects are returned from Rules during a translation, and are rendered to produce an Apache Airflow Project

An OrbiterProject holds everything necessary to render an Airflow Project. It is generated by a TranslationRuleset.translate_fn.

Diagram of Orbiter Translation

Workflows are represented by a OrbiterDAG which is a Directed Acyclic Graph (of Tasks).

OrbiterOperators represent Airflow Tasks, which are units of work. An Operator is a pre-defined task with specific functionality.

orbiter.objects.OrbiterBase

AbstractBaseClass for Orbiter objects, provides a number of properties

Parameters:

Name Type Description
imports List[OrbiterRequirement]

List of OrbiterRequirement objects

orbiter_kwargs (dict, optional)

Optional dictionary of keyword arguments, to preserve what was originally parsed by a rule

orbiter_conns (Set[OrbiterConnection], optional)

Optional set of OrbiterConnection objects

orbiter_env_vars (Set[OrbiterEnvVar], optional)

Optional set of OrbiterEnvVar objects

orbiter_includes (Set[OrbiterInclude], optional)

Optional set of OrbiterInclude objects

orbiter_vars (Set[OrbiterVariable], optional)

Optional set of OrbiterVariable objects

Methods:

Name Description
add_connections

Add OrbiterConnections to orbiter_conns

add_env_vars

Add OrbiterEnvVars to orbiter_env_vars

add_includes

Add OrbiterIncludes to orbiter_includes

add_requirements

Add OrbiterRequirements to imports

add_variables

Add OrbiterVariables to orbiter_vars

add_connections

add_connections(
    connections: (
        OrbiterConnection
        | Iterable[OrbiterConnection]
        | None
    ) = None,
) -> Self

Add OrbiterConnections to orbiter_conns

>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_connections(
...     connections=OrbiterConnection(conn_id='postgres')
... ).orbiter_conns
{OrbiterConnection(conn_id=postgres, conn_type=generic)}
>>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_connections(connections=[
...     OrbiterConnection(conn_id='postgres'),
...     OrbiterConnection(conn_id='mysql')
... ]).orbiter_conns, key=str)
[OrbiterConnection(conn_id=mysql, conn_type=generic), OrbiterConnection(conn_id=postgres, conn_type=generic)]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_connections(
...     connections=None
... ).orbiter_conns is None
True

Parameters:

Name Type Description
connections OrbiterConnection | Iterable[OrbiterConnection] | None

Single, iterable of OrbiterConnection, or None

Returns:

Type Description
Self

self

Source code in orbiter/objects/__init__.py
@validate_call()
def add_connections(self, connections: OrbiterConnection | Iterable[OrbiterConnection] | None = None) -> Self:
    """Add [OrbiterConnections][orbiter.objects.connection.OrbiterConnection] to orbiter_conns

    ```pycon
    >>> from orbiter.objects.operators.bash import OrbiterBashOperator
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_connections(
    ...     connections=OrbiterConnection(conn_id='postgres')
    ... ).orbiter_conns
    {OrbiterConnection(conn_id=postgres, conn_type=generic)}
    >>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_connections(connections=[
    ...     OrbiterConnection(conn_id='postgres'),
    ...     OrbiterConnection(conn_id='mysql')
    ... ]).orbiter_conns, key=str)
    [OrbiterConnection(conn_id=mysql, conn_type=generic), OrbiterConnection(conn_id=postgres, conn_type=generic)]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_connections(
    ...     connections=None
    ... ).orbiter_conns is None
    True

    ```
    :param connections: Single, iterable of [OrbiterConnection][orbiter.objects.connection.OrbiterConnection], or None
    :type connections: OrbiterConnection | Iterable[OrbiterConnection] | None
    :return: self
    :rtype: Self
    """
    if not connections:
        return self
    if self.orbiter_conns is None:
        self.orbiter_conns = set()
    for _connection in [connections] if isinstance(connections, OrbiterConnection) else connections:
        self.orbiter_conns.add(_connection)
    return self

add_env_vars

add_env_vars(
    env_vars: (
        OrbiterEnvVar | Iterable[OrbiterEnvVar] | None
    ) = None,
) -> Self

Add OrbiterEnvVars to orbiter_env_vars

>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_env_vars(
...     env_vars=OrbiterEnvVar(key='ENV', value='prod')
... ).orbiter_env_vars
{OrbiterEnvVar(key='ENV', value='prod')}
>>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_env_vars(env_vars=[
...     OrbiterEnvVar(key='ENV', value='prod'),
...     OrbiterEnvVar(key='REGION', value='us-west-2')
... ]).orbiter_env_vars, key=str)
[OrbiterEnvVar(key='ENV', value='prod'), OrbiterEnvVar(key='REGION', value='us-west-2')]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_env_vars(
...     env_vars=None
... ).orbiter_env_vars is None
True

Parameters:

Name Type Description
env_vars OrbiterEnvVar | Iterable[OrbiterEnvVar] | None

Single, iterable of OrbiterEnvVar, or None

Returns:

Type Description
Self

self

Source code in orbiter/objects/__init__.py
@validate_call()
def add_env_vars(self, env_vars: OrbiterEnvVar | Iterable[OrbiterEnvVar] | None = None) -> Self:
    """Add [OrbiterEnvVars][orbiter.objects.env_var.OrbiterEnvVar] to orbiter_env_vars

    ```pycon
    >>> from orbiter.objects.operators.bash import OrbiterBashOperator
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_env_vars(
    ...     env_vars=OrbiterEnvVar(key='ENV', value='prod')
    ... ).orbiter_env_vars
    {OrbiterEnvVar(key='ENV', value='prod')}
    >>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_env_vars(env_vars=[
    ...     OrbiterEnvVar(key='ENV', value='prod'),
    ...     OrbiterEnvVar(key='REGION', value='us-west-2')
    ... ]).orbiter_env_vars, key=str)
    [OrbiterEnvVar(key='ENV', value='prod'), OrbiterEnvVar(key='REGION', value='us-west-2')]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_env_vars(
    ...     env_vars=None
    ... ).orbiter_env_vars is None
    True

    ```
    :param env_vars: Single, iterable of [OrbiterEnvVar][orbiter.objects.env_var.OrbiterEnvVar], or None
    :type env_vars: OrbiterEnvVar | Iterable[OrbiterEnvVar] | None
    :return: self
    :rtype: Self
    """
    if not env_vars:
        return self
    if self.orbiter_env_vars is None:
        self.orbiter_env_vars = set()
    for _env_var in [env_vars] if isinstance(env_vars, OrbiterEnvVar) else env_vars:
        self.orbiter_env_vars.add(_env_var)
    return self

add_includes

add_includes(
    includes: (
        OrbiterInclude | Iterable[OrbiterInclude] | None
    ) = None,
) -> Self

Add OrbiterIncludes to orbiter_includes

>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_includes(
...     includes=OrbiterInclude(filepath='utils.py', contents='# Utils')
... ).orbiter_includes
{OrbiterInclude(filepath='utils.py', contents='# Utils')}
>>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_includes(includes=[
...     OrbiterInclude(filepath='utils.py', contents='# Utils'),
...     OrbiterInclude(filepath='helpers.py', contents='# Helpers')
... ]).orbiter_includes, key=str)
[OrbiterInclude(filepath='helpers.py', contents='# Helpers'), OrbiterInclude(filepath='utils.py', contents='# Utils')]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_includes(
...     includes=None
... ).orbiter_includes is None
True

Parameters:

Name Type Description
includes OrbiterInclude | Iterable[OrbiterInclude] | None

Single, iterable of OrbiterInclude, or None

Returns:

Type Description
Self

self

Source code in orbiter/objects/__init__.py
@validate_call()
def add_includes(self, includes: OrbiterInclude | Iterable[OrbiterInclude] | None = None) -> Self:
    """Add [OrbiterIncludes][orbiter.objects.include.OrbiterInclude] to orbiter_includes

    ```pycon
    >>> from orbiter.objects.operators.bash import OrbiterBashOperator
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_includes(
    ...     includes=OrbiterInclude(filepath='utils.py', contents='# Utils')
    ... ).orbiter_includes
    {OrbiterInclude(filepath='utils.py', contents='# Utils')}
    >>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_includes(includes=[
    ...     OrbiterInclude(filepath='utils.py', contents='# Utils'),
    ...     OrbiterInclude(filepath='helpers.py', contents='# Helpers')
    ... ]).orbiter_includes, key=str)
    [OrbiterInclude(filepath='helpers.py', contents='# Helpers'), OrbiterInclude(filepath='utils.py', contents='# Utils')]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_includes(
    ...     includes=None
    ... ).orbiter_includes is None
    True

    ```
    :param includes: Single, iterable of [OrbiterInclude][orbiter.objects.include.OrbiterInclude], or None
    :type includes: OrbiterInclude | Iterable[OrbiterInclude] | None
    :return: self
    :rtype: Self
    """
    if not includes:
        return self
    if self.orbiter_includes is None:
        self.orbiter_includes = set()
    for _include in [includes] if isinstance(includes, OrbiterInclude) else includes:
        self.orbiter_includes.add(_include)
    return self

add_requirements

add_requirements(
    requirements: (
        OrbiterRequirement
        | Iterable[OrbiterRequirement]
        | None
    ) = None,
) -> Self

Add OrbiterRequirements to imports

>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(
...     requirements=OrbiterRequirement(package='apache-airflow', names=['DAG'], module='airflow')
... ).imports
[OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None), OrbiterRequirement(names=[DAG], package=apache-airflow, module=airflow, sys_package=None)]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(requirements=[
...     OrbiterRequirement(package='pandas', names=['DataFrame'], module='pandas'),
...     OrbiterRequirement(package='numpy', names=['array'], module='numpy')
... ]).imports  # doctest: +ELLIPSIS
[OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None), OrbiterRequirement(names=[DataFrame], package=pandas, module=pandas, sys_package=None), OrbiterRequirement(names=[array], package=numpy, module=numpy, sys_package=None)]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(requirements=None).imports
[OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None)]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(requirements=[]).imports
[OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None)]

Parameters:

Name Type Description
requirements OrbiterRequirement | Iterable[OrbiterRequirement] | None

Single, list of OrbiterRequirement, or None

Returns:

Type Description
Self

self

Source code in orbiter/objects/__init__.py
@validate_call()
def add_requirements(self, requirements: OrbiterRequirement | Iterable[OrbiterRequirement] | None = None) -> Self:
    """Add [OrbiterRequirements][orbiter.objects.requirement.OrbiterRequirement] to imports

    ```pycon
    >>> from orbiter.objects.operators.bash import OrbiterBashOperator
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(
    ...     requirements=OrbiterRequirement(package='apache-airflow', names=['DAG'], module='airflow')
    ... ).imports
    [OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None), OrbiterRequirement(names=[DAG], package=apache-airflow, module=airflow, sys_package=None)]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(requirements=[
    ...     OrbiterRequirement(package='pandas', names=['DataFrame'], module='pandas'),
    ...     OrbiterRequirement(package='numpy', names=['array'], module='numpy')
    ... ]).imports  # doctest: +ELLIPSIS
    [OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None), OrbiterRequirement(names=[DataFrame], package=pandas, module=pandas, sys_package=None), OrbiterRequirement(names=[array], package=numpy, module=numpy, sys_package=None)]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(requirements=None).imports
    [OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None)]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_requirements(requirements=[]).imports
    [OrbiterRequirement(names=[BashOperator], package=apache-airflow, module=airflow.operators.bash, sys_package=None)]

    ```
    :param requirements: Single, list of [OrbiterRequirement][orbiter.objects.requirement.OrbiterRequirement], or None
    :type requirements: OrbiterRequirement | Iterable[OrbiterRequirement] | None
    :return: self
    :rtype: Self
    """
    if not requirements:
        return self
    for _requirement in [requirements] if isinstance(requirements, OrbiterRequirement) else requirements:
        self.imports.append(_requirement)
    return self

add_variables

add_variables(
    variables: (
        OrbiterVariable | Iterable[OrbiterVariable] | None
    ) = None,
) -> Self

Add OrbiterVariables to orbiter_vars

>>> from orbiter.objects.operators.bash import OrbiterBashOperator
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_variables(
...     variables=OrbiterVariable(key='db_host', value='localhost')
... ).orbiter_vars
{OrbiterVariable(key='db_host', value='localhost')}
>>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_variables(variables=[
...     OrbiterVariable(key='db_host', value='localhost'),
...     OrbiterVariable(key='db_port', value='5432')
... ]).orbiter_vars, key=str)
[OrbiterVariable(key='db_host', value='localhost'), OrbiterVariable(key='db_port', value='5432')]
>>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_variables(
...     variables=None
... ).orbiter_vars is None
True

Parameters:

Name Type Description
variables OrbiterVariable | Iterable[OrbiterVariable] | None

Single, iterable of OrbiterVariable, or None

Returns:

Type Description
Self

self

Source code in orbiter/objects/__init__.py
@validate_call()
def add_variables(self, variables: OrbiterVariable | Iterable[OrbiterVariable] | None = None) -> Self:
    """Add [OrbiterVariables][orbiter.objects.variable.OrbiterVariable] to orbiter_vars

    ```pycon
    >>> from orbiter.objects.operators.bash import OrbiterBashOperator
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_variables(
    ...     variables=OrbiterVariable(key='db_host', value='localhost')
    ... ).orbiter_vars
    {OrbiterVariable(key='db_host', value='localhost')}
    >>> sorted(OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_variables(variables=[
    ...     OrbiterVariable(key='db_host', value='localhost'),
    ...     OrbiterVariable(key='db_port', value='5432')
    ... ]).orbiter_vars, key=str)
    [OrbiterVariable(key='db_host', value='localhost'), OrbiterVariable(key='db_port', value='5432')]
    >>> OrbiterBashOperator(task_id='foo', bash_command='echo hello').add_variables(
    ...     variables=None
    ... ).orbiter_vars is None
    True

    ```
    :param variables: Single, iterable of [OrbiterVariable][orbiter.objects.variable.OrbiterVariable], or None
    :type variables: OrbiterVariable | Iterable[OrbiterVariable] | None
    :return: self
    :rtype: Self
    """
    if not variables:
        return self
    if self.orbiter_vars is None:
        self.orbiter_vars = set()
    for _variable in [variables] if isinstance(variables, OrbiterVariable) else variables:
        self.orbiter_vars.add(_variable)
    return self