Skip to content

Overview

The brain of the Orbiter framework is in it's Rules and the Rulesets that contain them.

Different Rules are applied in different scenarios, such as:

Tip

To map the following input

{
    "id": "my_task",
    "command": "echo 'hi'"
}

to an Airflow BashOperator, a Rule could parse it as follows:

@task_rule
def my_rule(val: dict):
    if 'command' in val:
        return OrbiterBashOperator(task_id=val['id'], bash_command=val['command'])
    else:
        return None

This returns a OrbiterBashOperator, which will become an Airflow BashOperator when the translation completes.

orbiter.default_translation.translate

translate(
    translation_ruleset, input_dir: Path
) -> OrbiterProject

Orbiter expects a folder containing text files which may have a structure like:

{"<workflow name>": { ...<workflow properties>, "<task name>": { ...<task properties>} }}
However, the files may be in any format, and the structure may be different.

The standard translation function performs the following steps:

Diagram of Orbiter Translation

  1. Find all files with the expected TranslationRuleset.file_type (.json, .xml, .yaml, etc.) in the input folder.
  2. For each file: Apply the TranslationRuleset.dag_filter_ruleset to filter down to entries that can translate to a DAG, in priority order.
    • For each dictionary: Apply the TranslationRuleset.dag_ruleset, to convert the object to an OrbiterDAG, in priority-order, stopping when the first rule returns a match. If no rule returns a match, the entry is filtered.
  3. Apply the TranslationRuleset.task_filter_ruleset to filter down to entries in the DAG that can translate to a Task, in priority-order.
    • For each: Apply the TranslationRuleset.task_ruleset, to convert the object to a specific Task, in priority-order, stopping when the first rule returns a match. If no rule returns a match, the entry is filtered.
  4. After the DAG and Tasks are mapped, the TranslationRuleset.task_dependency_ruleset is applied in priority-order, stopping when the first rule returns a match, to create a list of OrbiterTaskDependency, which are then added to each task in the OrbiterDAG
  5. Apply the TranslationRuleset.post_processing_ruleset, against the OrbiterProject, which can make modifications after all other rules have been applied.
  6. After translation - the OrbiterProject is rendered to the output folder.

Info

Two modifications are made to both input and output dictionaries for a @dag_filter_rule:

  • the initial input directory is added under a __input_dir key to each dictionary
  • the relative file name is added under a __file key

Info

Information about matching rules is saved in an orbiter_meta property on each resulting orbiter object, via an OrbiterMeta object.

Info

Input is saved via an orbiter_kwargs property on each object. This can be useful for later OrbiterTaskDependency or TranslationRuleset.post_processing_ruleset modifications.

Source code in orbiter/default_translation.py
@validate_call
def translate(translation_ruleset, input_dir: Path) -> OrbiterProject:
    """
    Orbiter expects a folder containing text files which may have a structure like:
    ```json
    {"<workflow name>": { ...<workflow properties>, "<task name>": { ...<task properties>} }}
    ```
    However, the files may be in any format, and the structure may be different.

    The standard translation function performs the following steps:

    ![Diagram of Orbiter Translation](../orbiter_diagram.png)

    1. [**Find all files**][orbiter.rules.rulesets.TranslationRuleset.get_files_with_extension] with the expected
        [`TranslationRuleset.file_type`][orbiter.rules.rulesets.TranslationRuleset]
        (`.json`, `.xml`, `.yaml`, etc.) in the input folder.
        - [**Load each file**][orbiter.rules.rulesets.TranslationRuleset.loads] and turn it into a Python Dictionary.
    2. **For each file:** Apply the [`TranslationRuleset.dag_filter_ruleset`][orbiter.rules.rulesets.DAGFilterRuleset]
        to filter down to entries that can translate to a DAG, in priority order.
        - **For each dictionary**: Apply the [`TranslationRuleset.dag_ruleset`][orbiter.rules.rulesets.DAGRuleset],
        to convert the object to an [`OrbiterDAG`][orbiter.objects.dag.OrbiterDAG],
        in priority-order, stopping when the first rule returns a match.
        If no rule returns a match, the entry is filtered.
    3. Apply the [`TranslationRuleset.task_filter_ruleset`][orbiter.rules.rulesets.TaskFilterRuleset]
        to filter down to entries in the DAG that can translate to a Task, in priority-order.
        - **For each:** Apply the [`TranslationRuleset.task_ruleset`][orbiter.rules.rulesets.TaskRuleset],
            to convert the object to a specific Task, in priority-order, stopping when the first rule returns a match.
            If no rule returns a match, the entry is filtered.
    4. After the DAG and Tasks are mapped, the
        [`TranslationRuleset.task_dependency_ruleset`][orbiter.rules.rulesets.TaskDependencyRuleset]
        is applied in priority-order, stopping when the first rule returns a match,
        to create a list of
        [`OrbiterTaskDependency`][orbiter.objects.task.OrbiterTaskDependency],
        which are then added to each task in the
        [`OrbiterDAG`][orbiter.objects.dag.OrbiterDAG]
    5. Apply the [`TranslationRuleset.post_processing_ruleset`][orbiter.rules.rulesets.PostProcessingRuleset],
        against the [`OrbiterProject`][orbiter.objects.project.OrbiterProject], which can make modifications after all
        other rules have been applied.
    6. After translation - the [`OrbiterProject`][orbiter.objects.project.OrbiterProject]
        is rendered to the output folder.

    !!! info

        Two modifications are made to both input and output dictionaries for a
        [`@dag_filter_rule`][orbiter.rules.DAGFilterRule]:

        - the initial input directory is added under a `__input_dir` key to each dictionary
        - the relative file name is added under a `__file` key

    !!! info
        Information about matching rules is saved in an `orbiter_meta` property on each resulting orbiter object, via
        an [`OrbiterMeta`][orbiter.meta.OrbiterMeta] object.

    !!! info
        Input is saved via an `orbiter_kwargs` property on each object. This can be useful for later
        [`OrbiterTaskDependency`][orbiter.objects.task.OrbiterTaskDependency] or
        [`TranslationRuleset.post_processing_ruleset`][orbiter.rules.rulesets.PostProcessingRuleset] modifications.
    """
    validate_translate_function_inputs(translation_ruleset, input_dir)

    logger.debug("Creating an empty OrbiterProject")
    project = OrbiterProject()

    logger.info("Finding all files with the expected file type")
    files: enumerate[tuple[Path, dict]] = enumerate(translation_ruleset.get_files_with_extension(input_dir))

    def filter_dags(i: int, file: Path, input_dict: dict) -> tuple[list[dict], str]:
        """Step 1) Filter Dags from a file"""
        file_relative_to_input_dir_parent = file_relative_to_parent(file, input_dir)
        file_log_prefix = f"[File {i}={file_relative_to_input_dir_parent}]"
        logger.info(f"{file_log_prefix} Translating file")

        logger.debug(f"{file_log_prefix} Extracting DAG candidates")
        dag_dicts: list[dict] = translation_ruleset.dag_filter_ruleset.apply_ruleset(
            input_dict=input_dict, file=file_relative_to_input_dir_parent, input_dir=input_dir
        )
        logger.debug(f"{file_log_prefix} Found {len(dag_dicts)} DAG candidates")
        return dag_dicts, file_log_prefix

    def extract_dag(dag_dict: dict, file_log_prefix: str) -> tuple[OrbiterDAG | None, str]:
        """Step 2) Extract Dag from filtered Dag"""
        logger.debug(f"{file_log_prefix} Translating DAG Candidate to DAG")
        dag: OrbiterDAG = translation_ruleset.dag_ruleset.apply_ruleset(dag_dict=dag_dict)
        if dag:
            dag_log_prefix = f"{file_log_prefix}[DAG={dag.dag_id}]"
            return dag, dag_log_prefix
        else:
            return None, file_log_prefix

    def filter_tasks(
        dag_dict: dict,
        dag_log_prefix: str,
    ) -> list[dict]:
        """Step 3) Filter tasks from a filtered Dag"""
        logger.debug(f"{dag_log_prefix} Extracting Task Candidates to Tasks")
        task_dicts: list[dict] = translation_ruleset.task_filter_ruleset.apply_ruleset(dag_dict=dag_dict)
        logger.debug(f"{dag_log_prefix} Found {len(task_dicts)} Task candidates")
        return task_dicts

    def extract_tasks(task_dicts: list[dict], dag_log_prefix: str) -> list[OrbiterOperator | OrbiterTaskGroup]:
        logger.debug(f"{dag_log_prefix} Translating Task Candidates to Tasks")
        tasks = []
        for task_dict in task_dicts:
            task: OrbiterOperator | OrbiterTaskGroup = translation_ruleset.task_ruleset.apply_ruleset(
                task_dict=task_dict
            )
            if task is None:
                logger.warning(f"{dag_log_prefix} Couldn't extract task from expected task_dict={task_dict}")
                continue
            tasks.append(task)
        logger.debug(f"{dag_log_prefix} Adding {len(tasks)} tasks")
        return tasks

    def extract_task_dependencies(dag: OrbiterDAG, dag_log_prefix: str) -> None:
        """Step 4) Extract task dependencies from a filtered dag, add in-place"""
        logger.debug(f"{dag_log_prefix} Extracting Task Dependencies to apply to Tasks")
        task_dependencies: list[OrbiterTaskDependency] = translation_ruleset.task_dependency_ruleset.apply_ruleset(
            dag=dag
        )
        if not len(task_dependencies):
            logger.warning(f"{dag_log_prefix} Couldn't find task dependencies in dag={trim_dict(dag.orbiter_kwargs)}")

        logger.debug(f"{dag_log_prefix} Adding Task Dependencies to Tasks")
        for task_dependency in task_dependencies:
            if parent := dag.get_task_dependency_parent(task_dependency):
                parent.tasks[task_dependency.task_id].add_downstream(task_dependency)
            else:
                logger.warning(f"{dag_log_prefix} Couldn't find task_id={task_dependency.task_id} in tasks")
                continue

    # Translate each file individually - Default
    if not getattr(translation_ruleset.config, "upfront", False):
        for i, (file, input_dict) in files:
            dag_dicts, file_log_prefix = filter_dags(i, file, input_dict)
            for dag_dict in dag_dicts:
                dag, dag_log_prefix = extract_dag(dag_dict, file_log_prefix)
                if dag is None:
                    logger.warning(f"{file_log_prefix} Couldn't extract DAG from dag_dict={dag_dict}")
                    continue

                dag.add_tasks(extract_tasks(filter_tasks(dag_dict, dag_log_prefix), dag_log_prefix))
                extract_task_dependencies(dag, dag_log_prefix)

                logger.debug(f"{dag_log_prefix} Adding DAG {dag.dag_id} to project")
                project.add_dags(dag)

    # Filter against all files upfront
    else:
        import itertools

        def filter_dag_candidates_in_file(candidate_input: CandidateInput) -> list[tuple[str, dict, list[dict]]]:
            """Filter Dag and Task candidates from a file, upfront"""
            dag_dicts, file_log_prefix = filter_dags(
                candidate_input.i, candidate_input.file, candidate_input.input_dict
            )
            return [
                (file_log_prefix, dag_dict, filter_tasks(dag_dict, f"{file_log_prefix}[DAG=???]"))
                for dag_dict in dag_dicts
            ]

        if getattr(translation_ruleset.config, "parallel", False):
            import sys
            import pathos

            logger.info("Filtering DAG and Task Candidates from all files in parallel")
            with (  # noqa
                # pyinstaller/exe can't do multiproc
                pathos.multiprocessing.ThreadPool
                if getattr(sys, "frozen", False)
                else pathos.multiprocessing.ProcessingPool
            )() as pool:
                dag_and_task_candidates_all_files = pool.map(
                    filter_dag_candidates_in_file,
                    (
                        CandidateInput(i, input_dict, file, input_dir, translation_ruleset)
                        for i, (file, input_dict) in files
                    ),
                )
        else:
            logger.info("Filtering DAG and Task Candidates from all files in serial")
            dag_and_task_candidates_all_files = map(
                filter_dag_candidates_in_file,
                (
                    CandidateInput(i, input_dict, file, input_dir, translation_ruleset)
                    for i, (file, input_dict) in files
                ),
            )

        logger.info("Extracting Dags and Tasks in serial")
        for file_log_prefix, dag_dict, task_dicts in itertools.chain(*dag_and_task_candidates_all_files):
            dag, dag_log_prefix = extract_dag(dag_dict, file_log_prefix)
            if dag is None:
                logger.warning(f"{file_log_prefix} Couldn't extract DAG from dag_dict={dag_dict}")
                continue
            dag.add_tasks(extract_tasks(task_dicts, dag_log_prefix))
            extract_task_dependencies(dag, dag_log_prefix)

            logger.debug(f"{dag_log_prefix} Adding DAG {dag.dag_id} to project")
            project.add_dags(dag)

    logger.debug("Applying post-processing ruleset")
    translation_ruleset.post_processing_ruleset.apply_ruleset(project=project)

    return project