Skip to content

Traditional Operators: Using YAML instead of Python

Traditionally, operators are Airflow's building blocks, and while they are robust and diverse, they can sometimes lead to boilerplate-heavy DAGs compared to the newer TaskFlow API.

Most of the Airflow providers come with built-in traditional operators. Some examples include BashOperator, PythonOperator, KubernetesPodOperator, and PostgresOperator.

Below, we illustrate how to represent an Airflow DAG using traditional operators and how to define the same DAG using DAG Factory. Ultimately, both implementations use the same Airflow operators. The main difference is the language used to declare the workflow: one uses Python and the other uses YAML.

Goal

Let's say we'd like to create a workflow that performs the following:

  1. Retrieve the top ten stories from Hacker News using the Hacker News API.
  2. Fetch the details for the two top stories using the Hacker News API.
  3. Summarize the selected stories as Markdown, using Python.

We will implement the first two steps using BashOperator and the third step using PythonOperator. The last task will generate a Markdown snippet similar to:

| title                                                                       | url                                                                                                                    |
|:----------------------------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------|
| I keep turning my Google Sheets into phone-friendly webapps                 | https://arstechnica.com/gadgets/2024/12/making-tiny-no-code-webapps-out-of-spreadsheets-is-a-weirdly-fulfilling-hobby/ |
| Coconut by Meta AI – Better LLM Reasoning with Chain of Continuous Thought? | https://aipapersacademy.com/chain-of-continuous-thought/                                                               |

The main logic is implemented as plain Python functions in hacker_news.py:

pypi_stats.py
def summarize(**kwargs):
    """
    Given the Airflow context is provided to this function, it will extract the XCom hackernews records from its
    upstream tasks and summarise in Markdown.
    """
    ti = kwargs["ti"]
    upstream_task_ids = ti.task.upstream_task_ids  # Get upstream task IDs dynamically
    values = [json.loads(ti.xcom_pull(task_ids=task_id)) for task_id in upstream_task_ids]

    df = pd.DataFrame(values)
    selected_columns = ["title", "url"]
    df = df[selected_columns]
    markdown_output = df.to_markdown(index=False)
    print(markdown_output)
    return markdown_output

Implementation

As a reference, the following workflows run using Airflow 2.10.2 and DAG Factory 0.21.0.

Plain Airflow Python DAG

example_hackernews_plain_airflow.py
from datetime import datetime

from airflow.models.dag import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python import PythonOperator
from hacker_news import summarize

with DAG(dag_id="example_hackernews_plain_airflow", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

    fetch_top_ten_news = BashOperator(
        task_id="fetch_top_ten_news",
        bash_command="curl -s https://hacker-news.firebaseio.com/v0/topstories.json  | jq -c -r '.[0:10]'",
    )

    fetch_first_top_news = BashOperator(
        task_id="fetch_first_top_news",
        bash_command="""
            echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' |  xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'
        """,
    )

    fetch_second_top_news = BashOperator(
        task_id="fetch_second_news",
        bash_command="""
            echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' |  xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'
        """,
    )

    summarize = PythonOperator(task_id="summarize", python_callable=summarize)

    fetch_top_ten_news >> [fetch_first_top_news, fetch_second_top_news] >> summarize

Alternative DAG Factory YAML

example_hackernews_dagfactory.py
example_hackernews_dagfactory:
  default_args:
    start_date: 2022-03-04
  tasks:
    fetch_top_ten_news:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "curl -s https://hacker-news.firebaseio.com/v0/topstories.json  | jq -c -r '.[0:10]'"
    fetch_first_top_news:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[0]' |  xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'"
      dependencies: [fetch_top_ten_news]
    fetch_second_top_news:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo {{ task_instance.xcom_pull(task_ids='fetch_top_ten_news') }} | jq -c -r '.[1]' |  xargs -I {} curl -s 'https://hacker-news.firebaseio.com/v0/item/{}.json'"
      dependencies: [fetch_top_ten_news]
    summarize:
      operator: airflow.operators.python.PythonOperator
      python_callable: hacker_news.summarize
      dependencies: [fetch_first_top_news, fetch_second_top_news]

Comparison

Goal

Both implementations accomplish the same goal and result in the expected Markdown table.

Airflow Graph view

As shown in the screenshots below, both the DAG created using Python with standard Airflow and the DAG created using YAML and DAG Factory look identical, from a graph topology perspective, and also from the underlining operators being used.

Graph view: Plain Airflow Python DAG

alt text

Graph view: Alternative DAG Factory YAML

alt text

Airflow Code view

From an Airflow UI perspective, the content displayed in the "Code" view is the main difference between the two implementations. While Airflow renders the original Python DAG, as expected, in the case of the YAML DAGs, Airflow displays the Python file that references the DAG Factory YAML files:

example_load_yaml_dags.py
import os
from pathlib import Path

from dagfactory import load_yaml_dags

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))
config_dir = str(CONFIG_ROOT_DIR / "comparison")

load_yaml_dags(
    globals_dict=globals(),
    dags_folder=config_dir,
)

Code view: Plain Airflow Python DAG

alt text

Code view: Alternative DAG Factory YAML

alt text

To overcome this limitation, DAG Factory appends the YAML content to the DAG Documentation so users can better troubleshoot the DAG:

alt text