Operator arguments#
It is possible to pass arguments to Cosmos operators in two ways. Either by passing them when directly instantiating Cosmos Operators
or by defining the operator_args within a DbtDag or a DbtTaskGroup instance.
The value of operator_args should be a dictionary that will become the underlining operators’ kwargs.
Example of how to set Kubernetes-specific operator arguments:
DbtDag(
# ...
operator_args={
"queue": "kubernetes",
"image": "dbt-jaffle-shop:1.0.0",
"image_pull_policy": "Always",
"get_logs": True,
"is_delete_operator_pod": False,
"namespace": "default",
},
execution_config=ExecutionConfig(
execution_mode=ExecutionMode.KUBERNETES,
),
)
Example of setting a Cosmos-specific operator argument:
DbtDag(
# ...
operator_args={"dbt_cmd_global_flags": ["--cache-selected-only"]}
)
Summary of Cosmos-specific arguments#
Sample usage#
DbtTaskGroup(
# ...
operator_args={
"append_env": True,
"dbt_cmd_flags": ["--models", "stg_customers"],
"dbt_cmd_global_flags": ["--cache-selected-only"],
"dbt_executable_path": Path("/home/user/dbt"),
"env": {"MY_ENVVAR": "some-value"},
"fail_fast": True,
"no_version_check": True,
"quiet": True,
"vars": {
"start_time": "{{ data_interval_start.strftime('%Y%m%d%H%M%S') }}",
"end_time": "{{ data_interval_end.strftime('%Y%m%d%H%M%S') }}",
},
"warn_error": True,
"cancel_query_on_kill": False,
"output_enconding": "utf-8",
"skip_exit_code": 1,
}
)
Example: using interceptors to set vars and env at runtime (e.g. from Airflow context or connections):
def set_runtime_vars(context, task):
task.vars = {
"run_id": context["run_id"],
"execution_date": str(context["data_interval_start"]),
}
task.env = {"MY_ENV": "value"}
DbtTaskGroup(
# ...
operator_args={"interceptors": [set_runtime_vars]},
)
Template fields#
Some of the operator args are template fields for your convenience.
These template fields can be useful for hooking into Airflow Params, or for more advanced customization with XComs.
The following operator args support templating, and are accessible both through the DbtDag and DbtTaskGroup constructors in addition to being accessible standalone:
envvarsfull_refresh(for thebuild,seed, andrunoperators since Cosmos 1.4.)dbt_cmd_flags
Note
Using Jinja templating for env and vars may cause problems when using LoadMode.DBT_LS to render your DAG.
Example usage of templated dbt_cmd_flags for microbatch models with event-time ranges:
DbtDag(
# ... other parameters
operator_args={
"dbt_cmd_flags": [
"{% if params.EVENT_TIME_START %}--event-time-start{% endif %}",
"{% if params.EVENT_TIME_START %}{{ params.EVENT_TIME_START }}{% endif %}",
"{% if params.EVENT_TIME_END %}--event-time-end{% endif %}",
"{% if params.EVENT_TIME_END %}{{ params.EVENT_TIME_END }}{% endif %}",
"--select",
"{{ params.MODEL_NAME }}",
]
},
params={
"EVENT_TIME_START": Param(default=None, type=["null", "string"]),
"EVENT_TIME_END": Param(default=None, type=["null", "string"]),
"MODEL_NAME": Param(default=None, type=["null", "string"]),
},
)
The following template fields are only selectable when using the operators in a standalone context via the operator_args parameter (starting in Cosmos 1.4):
selectexcludeselectormodels
Since Airflow resolves template fields during Airflow DAG execution and not DAG parsing, the args above cannot be templated via DbtDag and DbtTaskGroup because both need to select dbt nodes during DAG parsing.
Output-only template fields#
A small number of template fields on the local execution mode operators are
output-only: Cosmos populates them as the task runs so the values appear in the
Airflow UI, but any value passed in via operator_args is silently
overwritten and has no effect.
compiled_sql— the SQL Cosmos compiled for a model. See the Compiled SQL docs for how it is populated and how to disable it viashould_store_compiled_sql.freshness— the JSON Cosmos captures fromdbt source freshnesswhen source nodes run, reset on every task instance.