Skip to content

Dynamic tasks

DAG Factory supports Airflow’s Dynamic Task Mapping, enabling workflows to dynamically create tasks at runtime. This approach allows the number of tasks to be determined during execution, usually based on the outcome of a preceding task, rather than being predefined during DAG authoring.

Example: Defining Dynamic Tasks

Below is an example configuration for implementing dynamic tasks using DAG Factory:

example_dynamic_task_mapping.yml
test_expand:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "test expand"
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:
    process:
      operator: airflow.operators.python_operator.PythonOperator
      python_callable_name: consume_value
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py
      partial:
        op_kwargs:
          fixed_param: "test"
      expand:
        op_args:
            request.output
      dependencies: [request]
    # This task is intentionally placed after the "process" task to demonstrate that DAG Factory does not require tasks
    # to be topologically ordered in the YAML file according to their dependencies.
    request:
      operator: airflow.operators.python.PythonOperator
      python_callable_name: make_list
      python_callable_file: $CONFIG_ROOT_DIR/expand_tasks.py

Explanation of the Configuration

  1. request Task:

    • Generates a list of items using the make_list function from the expand_tasks.py module.
    • This task serves as the input provider for the dynamically mapped tasks.
  2. process Task:

    • Dynamically generates one task for each item in the list produced by the request task.
    • The expand argument is used to create these tasks at runtime, with request.output supplying the input list.
    • Additionally, the partial argument is used to specify fixed parameters (op_kwargs) that are applied to all dynamically generated tasks.

How It Works

  • Dynamic Task Creation: The expand keyword allows the process task to spawn multiple tasks at runtime, each processing a single item from the list output of the request task.

  • Fixed Parameters: The partial keyword ensures that common parameters, such as fixed_param, are passed to every dynamically created task instance.

Benefits of Dynamic Task Mapping with DAG Factory

  • Flexibility: Handle varying input sizes and conditions dynamically without modifying the DAG definition.
  • Scalability: Efficiently process large datasets by leveraging Airflow’s parallel execution capabilities.
  • Simplicity: Define dynamic workflows declaratively using YAML, minimizing boilerplate code.

Airflow mapped tasks view

Below, you can see a list of mapped tasks generated dynamically as part of the process task.

example_dynamic_task_mapping.png

Advanced Dynamic Task Mapping with DAG Factory

Below, we explain the different methods for defining dynamic task mapping, illustrated by the provided example configuration.

Dynamic Task Mapping advanced usage
example_taskflow:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "Example of TaskFlow powered DAG that includes dynamic task mapping"
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:
    some_number:
      decorator: airflow.decorators.task
      python_callable: sample.some_number
    numbers_list:
      decorator: airflow.decorators.task
      python_callable_name: build_numbers_list
      python_callable_file: $CONFIG_ROOT_DIR/sample.py
    another_numbers_list:
      decorator: airflow.decorators.task
      python_callable: sample.build_numbers_list
    double_number_from_arg:
      decorator: airflow.decorators.task
      python_callable: sample.double
      number: 2
    double_number_from_task:
      decorator: airflow.decorators.task
      python_callable: sample.double
      number: +some_number  # the prefix + leads to resolving this value as the task `some_number`, previously defined
    double_number_with_dynamic_task_mapping_static:
      decorator: airflow.decorators.task
      python_callable: sample.double
      expand:
          number:
            - 1
            - 3
            - 5
    double_number_with_dynamic_task_mapping_taskflow:
      decorator: airflow.decorators.task
      python_callable: sample.double
      expand:
          number: +numbers_list  # the prefix + tells DagFactory to resolve this value as the task `numbers_list`, previously defined
    multiply_with_multiple_parameters:
      decorator: airflow.decorators.task
      python_callable: sample.multiply
      expand:
          a: +numbers_list  # the prefix + tells DagFactory to resolve this value as the task `numbers_list`, previously defined
          b: +another_numbers_list # the prefix + tells DagFactory to resolve this value as the task `another_numbers_list`, previously defined
    double_number_with_dynamic_task_and_partial:
      decorator: airflow.decorators.task
      python_callable: sample.double_with_label
      expand:
          number: +numbers_list  # the prefix + tells DagFactory to resolve this value as the task `numbers_list`, previously defined
      partial:
          label: True

The example above illustrates advanced usage of Dynamic Task Mapping using Dag Factory (the callable functions used in the example are kept in sample.py):

  1. Static Input Mapping

    The task double_number_with_dynamic_task_mapping_static shows how dynamic tasks can be created using static lists as input. Three tasks are created, each processing one number.

  2. Task-Generated Input Mapping

    The task double_number_with_dynamic_task_mapping_taskflow shows how tasks can use outputs from other tasks as input for dynamic task mapping. The prefix + tells DAG Factory to resolve this value as the task numbers_list, previously defined.

  3. Mapping with Multiple Inputs

    The task multiply_with_multiple_parameters shows how dynamic task mapping can combine outputs from multiple tasks as input parameters.

Named Mapping in Dynamic Tasks with DAG Factory

Starting with Airflow 2.9, the map_index_template feature allows for custom mapping name for dynamic tasks based on a user-defined key. DAG Factory fully supports this feature, enabling users to name tasks dynamically in a meaningful way during runtime. This can be useful for tracing and debugging tasks.

Below is an example of how to configure and use custom names for mapped tasks

example_map_index_template.yml
# Requires Airflow 2.9 or higher
example_map_index_template:
  default_args:
    owner: "custom_owner"
    start_date: 2 days
  description: "Example of TaskFlow powered DAG that includes dynamic task mapping"
  schedule_interval: "0 3 * * *"
  default_view: "graph"
  tasks:
    dynamic_task_with_named_mapping:
      decorator: airflow.decorators.task
      python_callable: sample.extract_last_name
      map_index_template: "{{ custom_mapping_key }}"
      expand:
        full_name:
          - Lucy Black
          - Vera Santos
          - Marks Spencer

How it works

  1. map_index_template: Customizes the naming of dynamically mapped tasks using a Jinja2 expression. In this example, it uses custom_mapping_key from the task context to define task names.
  2. expand: Dynamically generates tasks for each entry in the full_name list
    • Lucy Black
    • Vera Santos
    • Marks Spencer
  3. Dynamic Task Naming: The custom_mapping_key is set to the first name of each person, e.g., Lucy, Vera, and Marks using the callable function extract_last_name. This callable function is kept in sample.py

Airflow named mapped tasks view

The image below shows that the map_index gets the first name of the person in the mapped tasks with the above configuration.

example_map_index_template.png

Scope and limitations

The Airflow documentation on dynamic task mapping provides various examples of this feature. While the previous sections have discussed the forms supported by DAG Factory, it’s important to note the scenarios that have not been tested or are known to be unsupported.

The following cases are tested and expected to work (you can refer to previous sections on how to use them with DAG Factory):

The following cases are untested but are expected to work:

The following cases are untested and may not work: