Skip to content

Datasets

DAG Factory supports Airflow’s Datasets.

Datasets Outlets and Inlets

To leverage datasets, you need to specify the Dataset in the outlets and inlets keys in the configuration file. The outlets and inlets keys should contain a list of strings representing dataset locations. In the schedule key of the consumer DAG, you can set the Dataset that the DAG should be scheduled against. The key should contain a list of dataset locations. The consumer DAG will run when all the specified datasets become avai

Example: Outlet and Inlet

example_dag_datasets_outlet_inlet.yml
producer_dag:
  default_args:
    owner: "example_owner"
    retries: 1
    start_date: '2024-01-01'
  description: "Example DAG producer simple datasets"
  schedule_interval: "0 5 * * *"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
      inlets: [ 's3://bucket_example/raw/dataset1_source.json' ]
      outlets: [ 's3://bucket_example/raw/dataset1.json' ]
    task_2:
      bash_command: "echo 2"
      dependencies: [ task_1 ]
      inlets: [ 's3://bucket_example/raw/dataset2_source.json' ]
      outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
  default_args:
    owner: "example_owner"
    retries: 1
    start_date: '2024-01-01'
  description: "Example DAG consumer simple datasets"
  schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"

datasets_example.png

Conditional Dataset Scheduling

Minimum Requirements:

Logical operators for datasets

Airflow supports two logical operators for combining dataset conditions:

  • AND (&): Specifies that the DAG should be triggered only after all of the specified datasets have been updated.
  • OR (|): Specifies that the DAG should be triggered when any of the specified datasets is updated.

These operators enable you to configure your Airflow workflows to use more complex dataset update conditions, making them more dynamic and flexible.

Examples of Conditional Dataset Scheduling

Below are examples demonstrating how to configure a consumer DAG using conditional dataset scheduling.

Example 1: String Condition
example_dataset_condition_string.yml
consumer_dag:
  default_args:
    owner: "example_owner"
    retries: 1
    start_date: '2024-01-01'
  description: "Example DAG consumer simple datasets"
  schedule:
    datasets: "((s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2) | s3://bucket-cjmm/raw/dataset_custom_3)"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"
Example 2: YAML Syntax
example_dataset_yaml_syntax.yml
consumer_dag:
  default_args:
    owner: "example_owner"
    retries: 1
    start_date: '2024-01-01'
  description: "Example DAG consumer simple datasets"
  schedule:
    datasets:
      !or
        - !and
          - "s3://bucket-cjmm/raw/dataset_custom_1"
          - "s3://bucket-cjmm/raw/dataset_custom_2"
        - "s3://bucket-cjmm/raw/dataset_custom_3"
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 'consumer datasets'"

Visualization

The following diagrams illustrate the dataset conditions described in the example configurations:

  1. s3://bucket-cjmm/raw/dataset_custom_1 and s3://bucket-cjmm/raw/dataset_custom_2 must both be updated for the first condition to be satisfied.
  2. Alternatively, s3://bucket-cjmm/raw/dataset_custom_3 alone can satisfy the condition.

Graph Conditional Dataset 1 Graph Conditional Dataset 2