ExecutionMode.WATCHER_KUBERNETES: High-Performance dbt Execution in Kubernetes#

Added in version 1.13.0.

The ExecutionMode.WATCHER_KUBERNETES combines the speed of the Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos with the isolation of Kubernetes Execution Mode.

This execution mode is ideal for users who:

  • Want to leverage the performance benefits of the watcher execution mode

  • Need to run dbt in isolated Kubernetes pods

  • Prefer not to install dbt in their Airflow deployment


Background#

The Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos introduced in Cosmos 1.11.0 significantly reduces dbt pipeline run times by running dbt as a single command while maintaining model-level observability in Airflow.

However, the original ExecutionMode.WATCHER requires dbt to be installed alongside Airflow. The ExecutionMode.WATCHER_KUBERNETES removes this limitation by running the dbt command inside Kubernetes pods, similar to ExecutionMode.KUBERNETES.

For more details on the watcher concept and how it works, please refer to the Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos documentation.


How to Use#

Users previously using ExecutionMode.KUBERNETES can simply replace the execution_mode to use ExecutionMode.WATCHER_KUBERNETES.

The following example shows how to configure a DbtDag with ExecutionMode.WATCHER_KUBERNETES:

from cosmos import DbtDag
from cosmos.config import ExecutionConfig
from cosmos.constants import ExecutionMode

dag = DbtDag(
    dag_id="jaffle_shop_watcher_kubernetes",
    # ... other DAG parameters ...
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.WATCHER_KUBERNETES,
        dbt_project_path=K8S_PROJECT_DIR,
    ),
    operator_args={
        "image": DBT_IMAGE,
        "get_logs": True,
        "log_events_on_failure": True,
    },
)

Key differences from ExecutionMode.KUBERNETES:

  • The execution_mode is set to ExecutionMode.WATCHER_KUBERNETES instead of ExecutionMode.KUBERNETES

  • The producer task runs the entire dbt build command in a single Kubernetes pod

  • Consumer tasks (sensors) watch for the completion of their corresponding dbt models

For the complete setup including Kubernetes secrets, Docker image configuration, and profile setup, refer to the Kubernetes Execution Mode documentation.


Performance Gains#

Early benchmarks using the jaffle_shop_watcher_kubernetes DAG show significant improvements:

Execution Mode

Total Runtime

ExecutionMode.KUBERNETES

00:00:32.155

ExecutionMode.WATCHER_KUBERNETES

00:00:11.783

This represents approximately a 63% reduction in total DAG runtime.

The performance improvement comes from:

  • Running dbt as a single command (reducing Kubernetes pod startup overhead)

  • Leveraging dbt’s native threading capabilities

  • Eliminating repeated dbt initialization for each model


Known Limitations#

Kubernetes Provider Version Compatibility#

ExecutionMode.WATCHER_KUBERNETES does not work with older versions of the apache-airflow-providers-cncf-kubernetes provider (<=10.7.0).

Please ensure you have a compatible version installed:

pip install "apache-airflow-providers-cncf-kubernetes>10.7.0"

We successfully tested against the most recent release of the provider (10.12.2).

Support for KPO deferrable mode#

The producer node created by the ExecutionMode.WATCHER_KUBERNETES producer task can be set to deferrable mode as long as:

  • The correct version of Airflow Kubernetes is installed (>=10.12.2). This version fixed a bug (PR) that prevented setting callbacks and parsing the logs when the Kubernetes Operator run using deferrable. The experience should be further improved once this other PR is merged.

pip install "apache-airflow-providers-cncf-kubernetes>=10.12.2"
  • The arguments deferrable=True and is_delete_operator_pod=True are set:

dag = DbtDag(
    dag_id="jaffle_shop_watcher_kubernetes",
    # ... other DAG parameters ...
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.WATCHER_KUBERNETES,
        dbt_project_path=K8S_PROJECT_DIR,
    ),
    operator_args={
        "deferrable": True,
        "is_delete_operator_pod": True,
        "image": DBT_IMAGE,
        "get_logs": True,
        "log_events_on_failure": True,
    },
)

Conversely, the consumer tasks that subclass DbtConsumerWatcherKubernetesSensor run in deferrable mode by default when operating as a sensor. They can also operate in deferrable mode if they are running dbt themselves upon retry.

Mandatory operator_args#

The operator_args must define get_logs and log_events_on_failure:

Other Inherited Limitations#

The following limitations from ExecutionMode.WATCHER also apply to ExecutionMode.WATCHER_KUBERNETES:

  • Individual dbt Operators: Only DbtSeedWatcherKubernetesOperator, DbtSnapshotWatcherKubernetesOperator, and DbtRunWatcherKubernetesOperator are implemented. The DbtTestWatcherKubernetesOperator is currently a placeholder.

  • Test behavior: The TestBehavior.AFTER_EACH is not supported. Tests are run as part of the dbt build command by the producer task.

  • Source freshness nodes: The dbt build command does not run source freshness checks.

For more details on these limitations, refer to the Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos documentation.


Example DAG#

Below is a complete example of a DAG using ExecutionMode.WATCHER_KUBERNETES:

"""
## Jaffle Shop Airflow DAG using ExecutionMode.WATCHER_KUBERNETES

[Jaffle Shop](https://github.com/dbt-labs/jaffle_shop) is a fictional eCommerce store. This dbt project originates from
dbt Labs as an example project with dummy data to demonstrate a working dbt core project.

This DAG uses Cosmos in a way that there is a clear split between Airflow and dbt:
- The Airflow DAG is built using dbt manifest.json file
- The dbt commands are run inside Kubernetes pods

This allows users to not have to install dbt in their Airflow deployment.

This approach is a hybrid between the Cosmos ExecutionMode.KUBERNETES:
https://astronomer.github.io/astronomer-cosmos/getting_started/kubernetes.html#kubernetes

And the Cosmos ExecutionMode.WATCHER:
https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html
"""

import os
from pathlib import Path

from airflow.providers.cncf.kubernetes.secret import Secret
from pendulum import datetime

from cosmos import DbtDag
from cosmos.config import (
    ExecutionConfig,
    ProfileConfig,
    ProjectConfig,
    RenderConfig,
)
from cosmos.constants import ExecutionMode, LoadMode

DEFAULT_DBT_ROOT_PATH = Path(__file__).resolve().parent / "dbt"

DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
AIRFLOW_DBT_PROJECT_DIR = DBT_ROOT_PATH / "jaffle_shop"

K8S_PROJECT_DIR = "dags/dbt/jaffle_shop"
KBS_DBT_PROFILES_YAML_FILEPATH = Path(K8S_PROJECT_DIR) / "profiles.yml"

DBT_IMAGE = "dbt-jaffle-shop:1.0.0"

project_seeds = [{"project": "jaffle_shop", "seeds": ["raw_customers", "raw_payments", "raw_orders"]}]

postgres_password_secret = Secret(
    deploy_type="env",
    deploy_target="POSTGRES_PASSWORD",
    secret="postgres-secrets",
    key="password",
)

postgres_host_secret = Secret(
    deploy_type="env",
    deploy_target="POSTGRES_HOST",
    secret="postgres-secrets",
    key="host",
)

operator_args = {
    "deferrable": False,
    "image": DBT_IMAGE,
    "get_logs": True,
    "is_delete_operator_pod": False,
    "log_events_on_failure": True,
    "secrets": [postgres_password_secret, postgres_host_secret],
    "env_vars": {
        "POSTGRES_DB": "postgres",
        "POSTGRES_SCHEMA": "public",
        "POSTGRES_USER": "postgres",
    },
    "retry": 0,
}

profile_config = ProfileConfig(
    profile_name="postgres_profile", target_name="dev", profiles_yml_filepath=KBS_DBT_PROFILES_YAML_FILEPATH
)

project_config = ProjectConfig(
    project_name="jaffle_shop",
    manifest_path=AIRFLOW_DBT_PROJECT_DIR / "target/manifest.json",
)

render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST)


# Currently airflow dags test ignores priority_weight and weight_rule, for this reason, we're setting the following in the CI only:
if os.getenv("CI"):
    operator_args["trigger_rule"] = "all_success"

dag = DbtDag(
    dag_id="jaffle_shop_watcher_kubernetes",
    start_date=datetime(2022, 11, 27),
    doc_md=__doc__,
    catchup=False,
    # Cosmos-specific parameters:
    project_config=project_config,
    profile_config=profile_config,
    render_config=render_config,
    execution_config=ExecutionConfig(
        execution_mode=ExecutionMode.WATCHER_KUBERNETES,
        dbt_project_path=K8S_PROJECT_DIR,
    ),
    operator_args=operator_args,
)

Prerequisites#

Before using ExecutionMode.WATCHER_KUBERNETES, ensure you have:

  1. A Kubernetes cluster configured and accessible from your Airflow deployment

  2. A Docker image containing your dbt project and profile

  3. The apache-airflow-providers-cncf-kubernetes provider installed (version >10.7.0)

For detailed setup instructions, refer to the Kubernetes Execution Mode documentation.


Summary#

ExecutionMode.WATCHER_KUBERNETES provides:

  • ~63% faster dbt DAG runs compared to ExecutionMode.KUBERNETES

  • Isolation between dbt and Airflow dependencies

  • Model-level visibility in Airflow

  • Easy migration from ExecutionMode.KUBERNETES

This execution mode is ideal for teams who want the performance benefits of the watcher mode while maintaining the isolation provided by Kubernetes execution.