Callbacks#
Note
Feature available when using ExecutionMode.LOCAL
and ExecutionMode.VIRTUALENV
.
Most dbt commands output one or more artifacts
such as semantic_manifest.json
, manifest.json
, catalog.json
, run_results.json
, and sources.json
in the target folder, which by default resides in the dbt project’s root folder.
However, since Cosmos creates temporary folders to run each dbt command, this folder vanishes by the end of the Cosmos task execution,
alongside the artifacts created by dbt.
Many users care about those artifacts and want to perform additional actions after running the dbt command. Some examples of usage:
Upload the artifacts to an object storage;
Run a command after the dbt command runs, such as montecarlo; or
Define other custom behaviours based on a specific artifact.
To support these use cases, Cosmos allows users to define functions called callbacks that can run as part of the task execution before deleting the target’s folder.
Users can define their custom callback methods or, since Cosmos 1.8.0, they can leverage built-in callbacks, available in cosmos/io.py module. These functions illustrate how to upload the generated dbt artifacts to remote cloud storage providers such as AWS S3, GCP GCS, and Azure WASB.
There are two ways users can leverage using Cosmos auxiliary callback functions:
When instantiating a Cosmos operator;
When using
DbtDag
orDbtTaskGroup
(users can define a callback that will apply to all tasks).
Example: Using Callbacks with a Single Operator#
To demonstrate how to specify a callback function for uploading files from the target directory, here’s an example using a single operator in an Airflow DAG:
seed_operator = DbtSeedLocalOperator(
profile_config=profile_config,
project_dir=DBT_PROJ_DIR,
task_id="seed",
dbt_cmd_flags=["--select", "raw_customers"],
install_deps=True,
append_env=True,
# --------------------------------------------------------------
# Callback function to upload artifacts to AWS S3
callback=upload_to_aws_s3,
callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to GCP GS
# callback=upload_to_gcp_gs,
# callback_args={"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload artifacts to Azure WASB
# callback=upload_to_azure_wasb,
# callback_args={"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
)
Example: Using DbtDag or DbtTaskGroup#
If you’re using Airflow 2.8 or later, you can leverage the remote_target_path: configuration to upload files
from the target directory to a remote storage. Below is an example of how to define a callback helper function in your
DbtDag
that utilizes this configuration:
cosmos_callback_dag = DbtDag(
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
operator_args={
"install_deps": True, # install any necessary dependencies before running any dbt command
"full_refresh": True, # used only in dbt commands that support this flag
# --------------------------------------------------------------
# Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
"callback": upload_to_cloud_storage,
# --------------------------------------------------------------
# Callback function to upload files to AWS S3, works for Airflow < 2.8 too
# "callback": upload_to_aws_s3,
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to GCP GS, works for Airflow < 2.8 too
# "callback": upload_to_gcp_gs,
# "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
# Callback function to upload files to Azure WASB, works for Airflow < 2.8 too
# "callback": upload_to_azure_wasb,
# "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
# --------------------------------------------------------------
},
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="cosmos_callback_dag",
default_args={"retries": 2},
)
An example of how the data uploaded to GCS looks like when using upload_to_gcp_gs
in a DbtDag
:

The path naming convention is:
Bucket configured by the user
Name of the DAG
DAG Run identifier
Task ID
Task retry identifier
Target folder with its contents
If users are unhappy with this structure or format, they can implement similar methods, which can be based (or not) on the Cosmos standard ones.
Custom Callbacks#
The helper functions introduced in Cosmos 1.8.0 are examples of how callback functions. Users are not limited to using these predefined functions — they can also create their custom callback functions to meet specific needs.
Cosmos passes a few arguments to these functions, including the path to the dbt project directory and the Airflow task context, which includes DAG and task instance metadata.
Below, find an example of a callback method that raises an exception if the query takes more than 10 seconds to run:
def error_if_slow(project_dir: str, **kwargs: Any) -> None:
"""
An example of a custom callback that errors if a particular query is slow.
:param project_dir: Path of the project directory used by Cosmos to run the dbt command
"""
import json
from pathlib import Path
slow_query_threshold = 10
run_results_path = Path(project_dir, "run_results.json")
if run_results_path.exists():
with open(run_results_path) as fp:
run_results = json.load(fp)
node_name = run_results["unique_id"]
execution_time = run_results["execution_time"]
if execution_time > slow_query_threshold:
raise TimeoutError(
f"The query for the node {node_name} took too long: {execution_time}"
)
Users can use the same approach to call the data observability platform montecarlo or other services.
Limitations and Contributions#
Callback support is available only when using ExecutionMode.LOCAL
and ExecutionMode.VIRTUALENV
.
Contributions to extend this functionality to other execution modes are welcome and encouraged. You can reference the
implementation for ExecutionMode.LOCAL
to add support for other modes.