Asset
DAG Factory supports Airflow's Asset, which allows you to define data assets that DAGs can emit or depend on.
To leverage asset
, use the __type__
annotation to define asset metadata. You can then reference assets using inlets
and outlets
to emit and track asset events.
Example DAG
First, define your DAG configuration in a YAML file
asset_triggered_dags.yml
default:
default_args:
start_date: 2025-01-01
catchup: false
producer_dag:
schedule: "@daily"
tasks:
produce_data:
operator: "airflow.providers.standard.operators.python.PythonOperator"
python_callable: sample.generate_data
outlets:
- __type__: airflow.sdk.Asset
uri: "file:///$AIRFLOW_HONE/data.csv"
name: "data_asset"
consumer_dag:
schedule:
- __type__: airflow.sdk.Asset
uri: "file:///$AIRFLOW_HONE/data.csv"
name: "data_asset"
tasks:
consume_data:
operator: "airflow.providers.standard.operators.bash.BashOperator"
bash_command: "echo 'Asset was updated, running DAG!'"
Then, you can load this YAML configuration dynamically
asset_triggered_dags.py
import os
from pathlib import Path
# The following import is here so Airflow parses this file
# from airflow import DAG
from dagfactory import load_yaml_dags
DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))
config_file = str(CONFIG_ROOT_DIR / "asset_triggered_dags.yml")
load_yaml_dags(
globals_dict=globals(),
config_filepath=config_file,
)