Cosmos-managed virtual environment execution mode#
The virtualenv mode runs dbt commands from Python virtual environments created and managed by Cosmos. This mode removes the need to create a virtual environment at build time, unlike ExecutionMode.LOCAL, while avoiding package conflicts. It is intended for cases where:
You can’t install dbt directly in the Airflow environment, either in the same environment or a dedicated one.
Multiple dbt installations are required, and you prefer Cosmos to manage them without modifying the Airflow deployment.
Speed is not a concern, and you can afford for Cosmos to create and update the Python virtual environment during the execution of each dbt node.
In most cases, the local execution mode with ExecutionConfig.dbt_executable_path is the preferred option, as it allows you to manage the dbt environment during the Airflow deployment process, instead of per-dbt node execution.
When you use virtualenv mode, you are responsible for declaring which version of dbt to use by giving the argument py_requirements. Set this argument directly in operator instances or when you instantiate DbtDag and DbtTaskGroup as part of operator_args.
Similar to the local execution mode, Cosmos converts Airflow Connections into a way dbt understands them by creating a dbt profile file (profiles.yml).
Also similar to the local execution mode, Cosmos will by default attempt to use a partial_parse.msgpack if one exists to speed up parsing.
Some drawbacks of the virtualenv approach:
It is slower than
localbecause it may create and update a new Python virtual environment for each Cosmos dbt task run, depending on the Airflow executor and if you set theExecutionConfig.virtualenv_dirconfiguration.If dbt is unavailable in the Airflow scheduler, the default
LoadMode.DBT_LSwill not work. In this scenario, you must use a Parsing Methods that does not rely on dbt, such asLoadMode.MANIFEST.Only
InvocationMode.SUBPROCESSis supported currently, attempt to useInvocationMode.DBT_RUNNERwill raise error.
Example of how to use:
@dag(
schedule="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
)
def example_virtualenv() -> None:
start_task = EmptyOperator(task_id="start-venv-examples")
end_task = EmptyOperator(task_id="end-venv-examples")
# This first task group creates a new Cosmos virtualenv every time a task is run
# and deletes it afterwards
# It is much slower than if the user sets the `virtualenv_dir`
tmp_venv_task_group = DbtTaskGroup(
group_id="tmp-venv-group",
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# Without setting virtualenv_dir="/some/path/persistent-venv",
# Cosmos creates a new Python virtualenv for each dbt task being executed
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres"],
"install_deps": True,
"emit_datasets": False, # Example of how to not set inlets and outlets
# --------------------------------------------------------------------------
# For the sake of avoiding additional latency observed while uploading files for each of the tasks, the
# below callback functions to be executed are commented, but you can uncomment them if you'd like to
# enable callback execution.
# Callback function to upload files using Airflow Object Storage and Cosmos remote_target_path setting
# "callback": upload_to_cloud_storage,
# --------------------------------------------------------------------------
# Alternative callback function to upload files from the target directory to remote store e.g. AWS S3
# "callback": upload_to_aws_s3,
# "callback_args": {"aws_conn_id": "aws_s3_conn", "bucket_name": "cosmos-artifacts-upload"}
# --------------------------------------------------------------------------
},
)
# The following task group reuses the Cosmos-managed Python virtualenv across multiple tasks.
# It runs approximately 70% faster than the previous TaskGroup.
cached_venv_task_group = DbtTaskGroup(
group_id="cached-venv-group",
# dbt/cosmos-specific parameters
project_config=ProjectConfig(
DBT_ROOT_PATH / "jaffle_shop",
),
profile_config=profile_config,
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.VIRTUALENV,
# We can set the argument `virtualenv_dir` if we want Cosmos to create one Python virtualenv
# and reuse that to run all the dbt tasks within the same worker node
virtualenv_dir=Path("/tmp/persistent-venv2"),
),
operator_args={
"py_system_site_packages": False,
"py_requirements": ["dbt-postgres"],
"install_deps": True,
},
)
start_task >> [tmp_venv_task_group, cached_venv_task_group] >> end_task
example_virtualenv()