Cosmos-managed virtual environment execution mode#

The virtualenv mode runs dbt commands from Python virtual environments created and managed by Cosmos. This mode removes the need to create a virtual environment at build time, unlike ExecutionMode.LOCAL, while avoiding package conflicts. It is intended for cases where:

  • You can’t install dbt directly in the Airflow environment, either in the same environment or a dedicated one.

  • Multiple dbt installations are required, and you prefer Cosmos to manage them without modifying the Airflow deployment.

  • Speed is not a concern, and you can afford for Cosmos to create and update the Python virtual environment during the execution of each dbt node.

In most cases, the local execution mode with ExecutionConfig.dbt_executable_path is the preferred option, as it allows you to manage the dbt environment during the Airflow deployment process, instead of per-dbt node execution.

When you use virtualenv mode, you are responsible for declaring which version of dbt to use by giving the argument py_requirements. Set this argument directly in operator instances or when you instantiate DbtDag and DbtTaskGroup as part of operator_args.

Similar to the local execution mode, Cosmos converts Airflow Connections into a way dbt understands them by creating a dbt profile file (profiles.yml). Also similar to the local execution mode, Cosmos will by default attempt to use a partial_parse.msgpack if one exists to speed up parsing.

Some drawbacks of the virtualenv approach:

  • It is slower than local because it may create and update a new Python virtual environment for each Cosmos dbt task run, depending on the Airflow executor and if you set the ExecutionConfig.virtualenv_dir configuration.

  • If dbt is unavailable in the Airflow scheduler, the default LoadMode.DBT_LS will not work. In this scenario, you must use a Parsing Methods that does not rely on dbt, such as LoadMode.MANIFEST.

  • Only InvocationMode.SUBPROCESS is supported currently, attempt to use InvocationMode.DBT_RUNNER will raise error.

Example of how to use:

@dag(
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
)
def example_virtualenv() -> None:
    start_task = EmptyOperator(task_id="start-venv-examples")
    end_task = EmptyOperator(task_id="end-venv-examples")

    # This first task group creates a new Cosmos virtualenv every time a task is run
    # and deletes it afterwards
    # It is much slower than if the user sets the `virtualenv_dir`
    tmp_venv_task_group = DbtTaskGroup(
        group_id="tmp-venv-group",
        # dbt/cosmos-specific parameters
        project_config=ProjectConfig(
            DBT_ROOT_PATH / "jaffle_shop",
        ),
        profile_config=profile_config,
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.VIRTUALENV,
            # Without setting virtualenv_dir="/some/path/persistent-venv",
            # Cosmos creates a new Python virtualenv for each dbt task being executed
        ),
        operator_args={
            "py_system_site_packages": False,
            "py_requirements": ["dbt-postgres"],
            "install_deps": True,
            "emit_datasets": False,  # Example of how to not set inlets and outlets
            # --------------------------------------------------------------------------
            # For the sake of avoiding additional latency observed while uploading files for each of the tasks, the
            # below callback functions to be executed are commented, but you can uncomment them if you'd like to
            # enable callback execution.
            # Callback function to upload files using Airflow Object Storage and Cosmos remote_target_path setting
            # "callback": upload_to_cloud_storage,
            # --------------------------------------------------------------------------
            # Alternative callback function to upload files from the target directory to remote store e.g. AWS S3
            # "callback": upload_to_aws_s3,
            # "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}
            # --------------------------------------------------------------------------
        },
    )

    # The following task group reuses the Cosmos-managed Python virtualenv across multiple tasks.
    # It runs approximately 70% faster than the previous TaskGroup.
    cached_venv_task_group = DbtTaskGroup(
        group_id="cached-venv-group",
        # dbt/cosmos-specific parameters
        project_config=ProjectConfig(
            DBT_ROOT_PATH / "jaffle_shop",
        ),
        profile_config=profile_config,
        execution_config=ExecutionConfig(
            execution_mode=ExecutionMode.VIRTUALENV,
            # We can set the argument `virtualenv_dir` if we want Cosmos to create one Python virtualenv
            # and reuse that to run all the dbt tasks within the same worker node
            virtualenv_dir=Path("/tmp/persistent-venv2"),
        ),
        operator_args={
            "py_system_site_packages": False,
            "py_requirements": ["dbt-postgres"],
            "install_deps": True,
        },
    )

    start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task


example_virtualenv()