ExecutionMode.WATCHER_KUBERNETES: High-Performance dbt Execution in Kubernetes#
Added in version 1.13.0.
The ExecutionMode.WATCHER_KUBERNETES combines the speed of the Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos with the isolation of Kubernetes Execution Mode.
This execution mode is ideal for users who:
Want to leverage the performance benefits of the watcher execution mode
Need to run dbt in isolated Kubernetes pods
Prefer not to install dbt in their Airflow deployment
Background#
The Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos introduced in Cosmos 1.11.0 significantly reduces dbt pipeline run times by running dbt as a single command while maintaining model-level observability in Airflow.
However, the original ExecutionMode.WATCHER requires dbt to be installed alongside Airflow. The ExecutionMode.WATCHER_KUBERNETES removes this limitation by running the dbt command inside Kubernetes pods, similar to ExecutionMode.KUBERNETES.
For more details on the watcher concept and how it works, please refer to the Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos documentation.
How to Use#
Users previously using ExecutionMode.KUBERNETES can simply replace the execution_mode to use ExecutionMode.WATCHER_KUBERNETES.
The following example shows how to configure a DbtDag with ExecutionMode.WATCHER_KUBERNETES:
from cosmos import DbtDag
from cosmos.config import ExecutionConfig
from cosmos.constants import ExecutionMode
dag = DbtDag(
dag_id="jaffle_shop_watcher_kubernetes",
# ... other DAG parameters ...
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER_KUBERNETES,
dbt_project_path=K8S_PROJECT_DIR,
),
operator_args={
"image": DBT_IMAGE,
"get_logs": True,
"log_events_on_failure": True,
},
)
Key differences from ExecutionMode.KUBERNETES:
The
execution_modeis set toExecutionMode.WATCHER_KUBERNETESinstead ofExecutionMode.KUBERNETESThe producer task runs the entire
dbt buildcommand in a single Kubernetes podConsumer tasks (sensors) watch for the completion of their corresponding dbt models
For the complete setup including Kubernetes secrets, Docker image configuration, and profile setup, refer to the Kubernetes Execution Mode documentation.
Performance Gains#
Early benchmarks using the jaffle_shop_watcher_kubernetes DAG show significant improvements:
Execution Mode |
Total Runtime |
|---|---|
|
00:00:32.155 |
|
00:00:11.783 |
This represents approximately a 63% reduction in total DAG runtime.
The performance improvement comes from:
Running dbt as a single command (reducing Kubernetes pod startup overhead)
Leveraging dbt’s native threading capabilities
Eliminating repeated dbt initialization for each model
Known Limitations#
Kubernetes Provider Version Compatibility#
ExecutionMode.WATCHER_KUBERNETES does not work with older versions of the apache-airflow-providers-cncf-kubernetes provider (<=10.7.0).
Please ensure you have a compatible version installed:
pip install "apache-airflow-providers-cncf-kubernetes>10.7.0"
We successfully tested against the most recent release of the provider (10.12.2).
Support for KPO deferrable mode#
The producer node created by the ExecutionMode.WATCHER_KUBERNETES producer task can be set to deferrable mode as long as:
The correct version of Airflow Kubernetes is installed (
>=10.12.2). This version fixed a bug (PR) that prevented setting callbacks and parsing the logs when the Kubernetes Operator run usingdeferrable. The experience should be further improved once this other PR is merged.
pip install "apache-airflow-providers-cncf-kubernetes>=10.12.2"
The arguments
deferrable=Trueandis_delete_operator_pod=Trueare set:
dag = DbtDag(
dag_id="jaffle_shop_watcher_kubernetes",
# ... other DAG parameters ...
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER_KUBERNETES,
dbt_project_path=K8S_PROJECT_DIR,
),
operator_args={
"deferrable": True,
"is_delete_operator_pod": True,
"image": DBT_IMAGE,
"get_logs": True,
"log_events_on_failure": True,
},
)
Conversely, the consumer tasks that subclass DbtConsumerWatcherKubernetesSensor run in deferrable mode by default when operating as a sensor. They can also operate in deferrable mode if they are running dbt themselves upon retry.
Mandatory operator_args#
The operator_args must define get_logs and log_events_on_failure:
Other Inherited Limitations#
The following limitations from ExecutionMode.WATCHER also apply to ExecutionMode.WATCHER_KUBERNETES:
Individual dbt Operators: Only
DbtSeedWatcherKubernetesOperator,DbtSnapshotWatcherKubernetesOperator, andDbtRunWatcherKubernetesOperatorare implemented. TheDbtTestWatcherKubernetesOperatoris currently a placeholder.Test behavior: The
TestBehavior.AFTER_EACHis not supported. Tests are run as part of thedbt buildcommand by the producer task.Source freshness nodes: The
dbt buildcommand does not run source freshness checks.
For more details on these limitations, refer to the Introducing ExecutionMode.WATCHER: Experimental High-Performance dbt Execution in Cosmos documentation.
Example DAG#
Below is a complete example of a DAG using ExecutionMode.WATCHER_KUBERNETES:
"""
## Jaffle Shop Airflow DAG using ExecutionMode.WATCHER_KUBERNETES
[Jaffle Shop](https://github.com/dbt-labs/jaffle_shop) is a fictional eCommerce store. This dbt project originates from
dbt Labs as an example project with dummy data to demonstrate a working dbt core project.
This DAG uses Cosmos in a way that there is a clear split between Airflow and dbt:
- The Airflow DAG is built using dbt manifest.json file
- The dbt commands are run inside Kubernetes pods
This allows users to not have to install dbt in their Airflow deployment.
This approach is a hybrid between the Cosmos ExecutionMode.KUBERNETES:
https://astronomer.github.io/astronomer-cosmos/getting_started/kubernetes.html#kubernetes
And the Cosmos ExecutionMode.WATCHER:
https://astronomer.github.io/astronomer-cosmos/getting_started/watcher-execution-mode.html
"""
import os
from pathlib import Path
from airflow.providers.cncf.kubernetes.secret import Secret
from pendulum import datetime
from cosmos import DbtDag
from cosmos.config import (
ExecutionConfig,
ProfileConfig,
ProjectConfig,
RenderConfig,
)
from cosmos.constants import ExecutionMode, LoadMode
DEFAULT_DBT_ROOT_PATH = Path(__file__).resolve().parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
AIRFLOW_DBT_PROJECT_DIR = DBT_ROOT_PATH / "jaffle_shop"
K8S_PROJECT_DIR = "dags/dbt/jaffle_shop"
KBS_DBT_PROFILES_YAML_FILEPATH = Path(K8S_PROJECT_DIR) / "profiles.yml"
DBT_IMAGE = "dbt-jaffle-shop:1.0.0"
project_seeds = [{"project": "jaffle_shop", "seeds": ["raw_customers", "raw_payments", "raw_orders"]}]
postgres_password_secret = Secret(
deploy_type="env",
deploy_target="POSTGRES_PASSWORD",
secret="postgres-secrets",
key="password",
)
postgres_host_secret = Secret(
deploy_type="env",
deploy_target="POSTGRES_HOST",
secret="postgres-secrets",
key="host",
)
operator_args = {
"deferrable": False,
"image": DBT_IMAGE,
"get_logs": True,
"is_delete_operator_pod": False,
"log_events_on_failure": True,
"secrets": [postgres_password_secret, postgres_host_secret],
"env_vars": {
"POSTGRES_DB": "postgres",
"POSTGRES_SCHEMA": "public",
"POSTGRES_USER": "postgres",
},
"retry": 0,
}
profile_config = ProfileConfig(
profile_name="postgres_profile", target_name="dev", profiles_yml_filepath=KBS_DBT_PROFILES_YAML_FILEPATH
)
project_config = ProjectConfig(
project_name="jaffle_shop",
manifest_path=AIRFLOW_DBT_PROJECT_DIR / "target/manifest.json",
)
render_config = RenderConfig(load_method=LoadMode.DBT_MANIFEST)
# Currently airflow dags test ignores priority_weight and weight_rule, for this reason, we're setting the following in the CI only:
if os.getenv("CI"):
operator_args["trigger_rule"] = "all_success"
dag = DbtDag(
dag_id="jaffle_shop_watcher_kubernetes",
start_date=datetime(2022, 11, 27),
doc_md=__doc__,
catchup=False,
# Cosmos-specific parameters:
project_config=project_config,
profile_config=profile_config,
render_config=render_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.WATCHER_KUBERNETES,
dbt_project_path=K8S_PROJECT_DIR,
),
operator_args=operator_args,
)
Prerequisites#
Before using ExecutionMode.WATCHER_KUBERNETES, ensure you have:
A Kubernetes cluster configured and accessible from your Airflow deployment
A Docker image containing your dbt project and profile
The
apache-airflow-providers-cncf-kubernetesprovider installed (version >10.7.0)
For detailed setup instructions, refer to the Kubernetes Execution Mode documentation.
Summary#
ExecutionMode.WATCHER_KUBERNETES provides:
✅ ~63% faster dbt DAG runs compared to
ExecutionMode.KUBERNETES✅ Isolation between dbt and Airflow dependencies
✅ Model-level visibility in Airflow
✅ Easy migration from
ExecutionMode.KUBERNETES
This execution mode is ideal for teams who want the performance benefits of the watcher mode while maintaining the isolation provided by Kubernetes execution.