Datasets
DAG Factory supports Airflow’s Datasets.
Datasets Outlets and Inlets
To leverage datasets, you need to specify the Dataset
in the outlets
and inlets
keys in the configuration file.
The outlets
and inlets
keys should contain a list of strings representing dataset locations.
In the schedule
key of the consumer DAG, you can set the Dataset
that the DAG should be scheduled against. The key
should contain a list of dataset locations.
The consumer DAG will run when all the specified datasets become avai
Example: Outlet and Inlet
producer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG producer simple datasets"
schedule_interval: "0 5 * * *"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
inlets: [ 's3://bucket_example/raw/dataset1_source.json' ]
outlets: [ 's3://bucket_example/raw/dataset1.json' ]
task_2:
bash_command: "echo 2"
dependencies: [ task_1 ]
inlets: [ 's3://bucket_example/raw/dataset2_source.json' ]
outlets: [ 's3://bucket_example/raw/dataset2.json' ]
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule: [ 's3://bucket_example/raw/dataset1.json', 's3://bucket_example/raw/dataset2.json' ]
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
Conditional Dataset Scheduling
Minimum Requirements:
- dag-factory 0.22.0+
- Apache Airflow® 2.9+
Logical operators for datasets
Airflow supports two logical operators for combining dataset conditions:
- AND (
&
): Specifies that the DAG should be triggered only after all of the specified datasets have been updated. - OR (
|
): Specifies that the DAG should be triggered when any of the specified datasets is updated.
These operators enable you to configure your Airflow workflows to use more complex dataset update conditions, making them more dynamic and flexible.
Examples of Conditional Dataset Scheduling
Below are examples demonstrating how to configure a consumer DAG using conditional dataset scheduling.
Example 1: String Condition
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule:
datasets: "((s3://bucket-cjmm/raw/dataset_custom_1 & s3://bucket-cjmm/raw/dataset_custom_2) | s3://bucket-cjmm/raw/dataset_custom_3)"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
Example 2: YAML Syntax
consumer_dag:
default_args:
owner: "example_owner"
retries: 1
start_date: '2024-01-01'
description: "Example DAG consumer simple datasets"
schedule:
datasets:
!or
- !and
- "s3://bucket-cjmm/raw/dataset_custom_1"
- "s3://bucket-cjmm/raw/dataset_custom_2"
- "s3://bucket-cjmm/raw/dataset_custom_3"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 'consumer datasets'"
Visualization
The following diagrams illustrate the dataset conditions described in the example configurations:
s3://bucket-cjmm/raw/dataset_custom_1
ands3://bucket-cjmm/raw/dataset_custom_2
must both be updated for the first condition to be satisfied.- Alternatively,
s3://bucket-cjmm/raw/dataset_custom_3
alone can satisfy the condition.