Callbacks#

Cosmos supports callback functions that execute at the end of a task’s execution when using ExecutionMode.LOCAL and ExecutionMode.VIRTUALENV. These callbacks can be used for various purposes, such as uploading files from the target directory to remote storage. While this feature has been available for some time, users may not be fully aware of its capabilities.

With the Cosmos 1.8.0 release, several helper functions were added in the cosmos/io.py module. These functions provide examples of callback functions that can be hooked into Cosmos DAGs to upload files from the project’s target directory to remote cloud storage providers such as AWS S3, GCP GS, and Azure WASB.

Example: Using Callbacks with a Single Operator#

To demonstrate how to specify a callback function for uploading files from the target directory, here’s an example using a single operator in an Airflow DAG:

    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
        # --------------------------------------------------------------
        # Callback function to upload artifacts to AWS S3
        callback=upload_to_aws_s3,
        callback_args={"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
        # --------------------------------------------------------------
        # Callback function to upload artifacts to GCP GS
        # callback=upload_to_gcp_gs,
        # callback_args={"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
        # --------------------------------------------------------------
        # Callback function to upload artifacts to Azure WASB
        # callback=upload_to_azure_wasb,
        # callback_args={"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
        # --------------------------------------------------------------
    )

Example: Using Callbacks with remote_target_path (Airflow 2.8+)#

If you’re using Airflow 2.8 or later, you can leverage the remote_target_path: configuration to upload files from the target directory to a remote storage. Below is an example of how to define a callback helper function in your DbtDag that utilizes this configuration:

cosmos_callback_dag = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(
        DBT_ROOT_PATH / "jaffle_shop",
    ),
    profile_config=profile_config,
    operator_args={
        "install_deps": True,  # install any necessary dependencies before running any dbt command
        "full_refresh": True,  # used only in dbt commands that support this flag
        # --------------------------------------------------------------
        # Callback function to upload files using Airflow Object storage and Cosmos remote_target_path setting on Airflow 2.8 and above
        "callback": upload_to_cloud_storage,
        # --------------------------------------------------------------
        # Callback function to upload files to AWS S3, works for Airflow < 2.8 too
        # "callback": upload_to_aws_s3,
        # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"},
        # --------------------------------------------------------------
        # Callback function to upload files to GCP GS, works for Airflow < 2.8 too
        # "callback": upload_to_gcp_gs,
        # "callback_args": {"gcp_conn_id": "gcp_gs_conn", "bucket_name": "cosmos-artifacts-upload"},
        # --------------------------------------------------------------
        # Callback function to upload files to Azure WASB, works for Airflow < 2.8 too
        # "callback": upload_to_azure_wasb,
        # "callback_args": {"azure_conn_id": "azure_wasb_conn", "container_name": "cosmos-artifacts-upload"},
        # --------------------------------------------------------------
    },
    # normal dag parameters
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="cosmos_callback_dag",
    default_args={"retries": 2},
)

Custom Callbacks#

The helper functions introduced in Cosmos 1.8.0 are just examples of how callback functions can be written and passed to Cosmos DAGs. Users are not limited to using these predefined functions — they can also create their own custom callback functions to meet specific needs. These custom functions can be provided to Cosmos DAGs, where they will receive the path to the cloned project directory and the Airflow task context, which includes DAG and task instance metadata.

Limitations and Contributions#

Currently, callback support is available only when using ExecutionMode.LOCAL and ExecutionMode.VIRTUALENV. Contributions to extend this functionality to other execution modes are welcome and encouraged. You can reference the implementation for ExecutionMode.LOCAL to add support for other modes.