Skip to content

Overview

The brain of the Orbiter framework is in it's Rules and the Rulesets that contain them.

Different Rules are applied in different scenarios, such as:

Tip

To map the following input

{
    "id": "my_task",
    "command": "echo 'hi'"
}

to an Airflow BashOperator, a Rule could parse it as follows:

@task_rule
def my_rule(val):
    if 'command' in val:
        return OrbiterBashOperator(task_id=val['id'], bash_command=val['command'])
    else:
        return None

This returns a OrbiterBashOperator, which will become an Airflow BashOperator when the translation completes.

orbiter.rules.rulesets.translate

translate(
    translation_ruleset, input_dir: Path
) -> OrbiterProject

Orbiter expects a folder containing text files which may have a structure like:

{"<workflow name>": { ...<workflow properties>, "<task name>": { ...<task properties>} }}

The standard translation function performs the following steps:

Diagram of Orbiter Translation

  1. Find all files with the expected TranslationRuleset.file_type (.json, .xml, .yaml, etc.) in the input folder.
  2. For each file: Apply the TranslationRuleset.dag_filter_ruleset to filter down to entries that can translate to a DAG, in priority order. The file name is added under a __file key to both input and output dictionaries for the DAG Filter rule.
    • For each dictionary: Apply the TranslationRuleset.dag_ruleset, to convert the object to an OrbiterDAG, in priority-order, stopping when the first rule returns a match. If no rule returns a match, the entry is filtered.
  3. Apply the TranslationRuleset.task_filter_ruleset to filter down to entries in the DAG that can translate to a Task, in priority-order.
    • For each: Apply the TranslationRuleset.task_ruleset, to convert the object to a specific Task, in priority-order, stopping when the first rule returns a match. If no rule returns a match, the entry is filtered.
  4. After the DAG and Tasks are mapped, the TranslationRuleset.task_dependency_ruleset is applied in priority-order, stopping when the first rule returns a match, to create a list of OrbiterTaskDependency, which are then added to each task in the OrbiterDAG
  5. Apply the TranslationRuleset.post_processing_ruleset, against the OrbiterProject, which can make modifications after all other rules have been applied.
  6. After translation - the OrbiterProject is rendered to the output folder.
Source code in orbiter/rules/rulesets.py
@validate_call
def translate(translation_ruleset, input_dir: Path) -> OrbiterProject:
    """
    Orbiter expects a folder containing text files which may have a structure like:
    ```json
    {"<workflow name>": { ...<workflow properties>, "<task name>": { ...<task properties>} }}
    ```

    The standard translation function performs the following steps:

    ![Diagram of Orbiter Translation](../orbiter_diagram.png)

    1. [**Find all files**][orbiter.rules.rulesets.TranslationRuleset.get_files_with_extension] with the expected
        [`TranslationRuleset.file_type`][orbiter.rules.rulesets.TranslationRuleset]
        (`.json`, `.xml`, `.yaml`, etc.) in the input folder.
        - [**Load each file**][orbiter.rules.rulesets.TranslationRuleset.loads] and turn it into a Python Dictionary.
    2. **For each file:** Apply the [`TranslationRuleset.dag_filter_ruleset`][orbiter.rules.rulesets.DAGFilterRuleset]
        to filter down to entries that can translate to a DAG, in priority order.
        The file name is added under a `__file` key to both input and output dictionaries for the DAG Filter rule.
        - **For each dictionary**: Apply the [`TranslationRuleset.dag_ruleset`][orbiter.rules.rulesets.DAGRuleset],
        to convert the object to an [`OrbiterDAG`][orbiter.objects.dag.OrbiterDAG],
        in priority-order, stopping when the first rule returns a match.
        If no rule returns a match, the entry is filtered.
    3. Apply the [`TranslationRuleset.task_filter_ruleset`][orbiter.rules.rulesets.TaskFilterRuleset]
        to filter down to entries in the DAG that can translate to a Task, in priority-order.
        - **For each:** Apply the [`TranslationRuleset.task_ruleset`][orbiter.rules.rulesets.TaskRuleset],
            to convert the object to a specific Task, in priority-order, stopping when the first rule returns a match.
            If no rule returns a match, the entry is filtered.
    4. After the DAG and Tasks are mapped, the
        [`TranslationRuleset.task_dependency_ruleset`][orbiter.rules.rulesets.TaskDependencyRuleset]
        is applied in priority-order, stopping when the first rule returns a match,
        to create a list of
        [`OrbiterTaskDependency`][orbiter.objects.task.OrbiterTaskDependency],
        which are then added to each task in the
        [`OrbiterDAG`][orbiter.objects.dag.OrbiterDAG]
    5. Apply the [`TranslationRuleset.post_processing_ruleset`][orbiter.rules.rulesets.PostProcessingRuleset],
        against the [`OrbiterProject`][orbiter.objects.project.OrbiterProject], which can make modifications after all
        other rules have been applied.
    6. After translation - the [`OrbiterProject`][orbiter.objects.project.OrbiterProject]
        is rendered to the output folder.
    """
    if not isinstance(translation_ruleset, TranslationRuleset):
        raise RuntimeError(
            f"Error! type(translation_ruleset)=={type(translation_ruleset)}!=TranslationRuleset! Exiting!"
        )

    # Create an initial OrbiterProject
    project = OrbiterProject()

    for i, (file, input_dict) in enumerate(translation_ruleset.get_files_with_extension(input_dir)):
        logger.info(f"Translating [File {i}]={file.resolve()}")

        # DAG FILTER Ruleset - filter down to keys suspected of being translatable to a DAG, in priority order.
        # Add __file DAG FILTER inputs and outputs, so it's available for both DAG and DAG FILTER rules
        __file_addition = {"__file": (input_dir / file.relative_to(input_dir))}
        dag_dicts: List[dict] = [
            __file_addition | dag_dict
            for dag_dict in functools.reduce(
                add,
                translation_ruleset.dag_filter_ruleset.apply(val=input_dict | __file_addition),
                [],
            )
        ]
        logger.debug(f"Found {len(dag_dicts)} DAG candidates in {file.resolve()}")
        for dag_dict in dag_dicts:
            # DAG Ruleset - convert the object to an `OrbiterDAG` via `dag_ruleset`,
            #         in priority-order, stopping when the first rule returns a match
            dag: OrbiterDAG | None = translation_ruleset.dag_ruleset.apply(
                val=dag_dict,
                take_first=True,
            )
            if dag is None:
                logger.warning(
                    f"Couldn't extract DAG from dag_dict={dag_dict} with dag_ruleset={translation_ruleset.dag_ruleset}"
                )
                continue
            dag.orbiter_kwargs["file_path"] = file.relative_to(input_dir)

            tasks = {}
            # TASK FILTER Ruleset - Many entries in dag_dict -> Many task_dict
            task_dicts = functools.reduce(
                add,
                translation_ruleset.task_filter_ruleset.apply(val=dag_dict),
                [],
            )
            logger.debug(f"Found {len(task_dicts)} Task candidates in {dag.dag_id} in {file.resolve()}")
            for task_dict in task_dicts:
                # TASK Ruleset one -> one
                task: OrbiterOperator = translation_ruleset.task_ruleset.apply(val=task_dict, take_first=True)
                if task is None:
                    logger.warning(f"Couldn't extract task from expected task_dict={task_dict}")
                    continue

                _add_task_deduped(task, tasks)
            logger.debug(f"Adding {len(tasks)} tasks to DAG {dag.dag_id}")
            dag.add_tasks(tasks.values())

            # Dag-Level TASK DEPENDENCY Ruleset
            task_dependencies: List[OrbiterTaskDependency] = (
                list(chain(*translation_ruleset.task_dependency_ruleset.apply(val=dag))) or []
            )
            if not len(task_dependencies):
                logger.warning(f"Couldn't find task dependencies in dag={trim_dict(dag_dict)}")
            for task_dependency in task_dependencies:
                task_dependency: OrbiterTaskDependency
                if parent := _get_parent_for_task_dependency(task_dependency, dag):
                    parent.tasks[task_dependency.task_id].add_downstream(task_dependency)
                else:
                    logger.warning(f"Couldn't find task_id={task_dependency.task_id} in tasks for dag_id={dag.dag_id}")
                    continue

            logger.debug(f"Adding DAG {dag.dag_id} to project")
            project.add_dags(dag)

    # POST PROCESSING Ruleset
    translation_ruleset.post_processing_ruleset.apply(val=project, take_first=False)

    return project