Skip to content

Asset

DAG Factory supports Airflow's Asset, which allows you to define data assets that DAGs can emit or depend on.

To leverage asset, use the __type__ annotation to define asset metadata. You can then reference assets using inlets and outlets to emit and track asset events.

Example DAG

First, define your DAG configuration in a YAML file

asset_triggered_dags.yml
default:
  default_args:
    start_date: 2025-01-01
  catchup: false

producer_dag:
  schedule: "@daily"
  tasks:
    produce_data:
      operator: "airflow.providers.standard.operators.python.PythonOperator"
      python_callable: sample.generate_data
      outlets:
        - __type__: airflow.sdk.Asset
          uri: "file:///$AIRFLOW_HONE/data.csv"
          name: "data_asset"

consumer_dag:
  schedule:
    - __type__: airflow.sdk.Asset
      uri: "file:///$AIRFLOW_HONE/data.csv"
      name: "data_asset"
  tasks:
    consume_data:
      operator: "airflow.providers.standard.operators.bash.BashOperator"
      bash_command: "echo 'Asset was updated, running DAG!'"

Then, you can load this YAML configuration dynamically

asset_triggered_dags.py
import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
from dagfactory import load_yaml_dags

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "asset_triggered_dags.yml")

load_yaml_dags(
    globals_dict=globals(),
    config_filepath=config_file,
)