Operator arguments#
It is possible to pass arguments to Cosmos operators in two ways. Either by passing them when directly instantiating Cosmos Operators
or by defining the operator_args
within a DbtDag
or a DbtTaskGroup
instance.
The value of operator_args
should be a dictionary that will become the underlining operators’ kwargs
.
Example of how to set Kubernetes-specific operator arguments:
DbtDag(
# ...
operator_args={
"queue": "kubernetes",
"image": "dbt-jaffle-shop:1.0.0",
"image_pull_policy": "Always",
"get_logs": True,
"is_delete_operator_pod": False,
"namespace": "default",
},
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
)
Example of setting a Cosmos-specific operator argument:
DbtDag(
# ...
operator_args={"dbt_cmd_global_flags": ["--cache-selected-only"]}
)
Overriding operator arguments per dbt node (or group of nodes)#
Added in version 1.8.0.
Cosmos 1.8 introduced the capability for users to customise the operator arguments per dbt node, or per group of dbt nodes. This can be done by defining the arguments via a dbt meta property alongside other dbt project configurations.
Let’s say there is a DbtTaskGroup that sets a default pool to run all the dbt tasks, but a user would like the model expensive to run a separate pool.
Users could either use operator_args
or default args
for defining the default behavior:
dbt_task_group = DbtTaskGroup(
# ...
profile_config=ProfileConfig,
default_args={"pool": "default_pool"},
)
While configuring in the dbt_project.yml
a different behaviour for the model “expensive”, that should use the “expensive-pool”:
version: 2
models:
- name: expensive
description: description
meta:
cosmos:
operator_kwargs:
pool: expensive-pool
More information about this feature can be found in Airflow Configuration Overrides with Astronomer Cosmos.
To learn how to customise the profile per dbt model or Cosmos task, check Customising the profile config per dbt node.
Summary of Cosmos-specific arguments#
Sample usage#
DbtTaskGroup(
# ...
operator_args={
"append_env": True,
"dbt_cmd_flags": ["--models", "stg_customers"],
"dbt_cmd_global_flags": ["--cache-selected-only"],
"dbt_executable_path": Path("/home/user/dbt"),
"env": {"MY_ENVVAR": "some-value"},
"fail_fast": True,
"no_version_check": True,
"quiet": True,
"vars": {
"start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
"end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
},
"warn_error": True,
"cancel_query_on_kill": False,
"output_enconding": "utf-8",
"skip_exit_code": 1,
}
)
Template fields#
Some of the operator args are template fields for your convenience.
These template fields can be useful for hooking into Airflow Params, or for more advanced customization with XComs.
The following operator args support templating, and are accessible both through the DbtDag
and DbtTaskGroup
constructors in addition to being accessible standalone:
env
vars
full_refresh
(for thebuild
,seed
, andrun
operators since Cosmos 1.4.)
Note
Using Jinja templating for env
and vars
may cause problems when using LoadMode.DBT_LS
to render your DAG.
The following template fields are only selectable when using the operators in a standalone context (starting in Cosmos 1.4):
select
exclude
selector
models
Since Airflow resolves template fields during Airflow DAG execution and not DAG parsing, the args above cannot be templated via DbtDag
and DbtTaskGroup
because both need to select dbt nodes during DAG parsing.
Additionally, the SQL for compiled dbt models is stored in the template fields, which is viewable in the Airflow UI for each task run. This is provided for telemetry on task execution, and is not an operator arg. For more information about this, see the Compiled SQL docs.