Scheduling#

Because Cosmos uses Airflow to power scheduling, you can leverage Airflow’s scheduling capabilities to schedule your dbt projects. This includes cron-based scheduling, timetables, and data-aware scheduling. For more info on Airflow’s scheduling capabilities, check out the Airflow documentation or check out the Astronomer documentation.

Time-Based Scheduling#

To schedule a dbt project on a time-based schedule, you can use Airflow’s scheduling options. For example, to run a dbt project every day starting on January 1, 2023, you can use the following DAG:

from cosmos import DbtDag

jaffle_shop = DbtDag(
    # ...
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
)

Data-Aware Scheduling#

Apache Airflow® 2.4 introduced the concept of scheduling based on Datasets.

By default, if using a version between Airflow 2.4 or higher is used, Cosmos emits Airflow Datasets when running dbt projects. This allows you to use Airflow’s data-aware scheduling capabilities to schedule your dbt projects. Cosmos emits datasets using the OpenLineage URI format, as detailed in the OpenLineage Naming Convention.

Cosmos calculates these URIs during the task execution, by using the library OpenLineage Integration Common.

This block illustrates a Cosmos-generated dataset for Postgres:

Dataset("postgres://host:5432/database.schema.table")

For example, let’s say you have:

  • A dbt project (project_one) with a model called my_model that runs daily

  • A second dbt project (project_two) with a model called my_other_model that you want to run immediately after my_model

We are assuming that the Database used is Postgres, the host is host, the database is database and the schema is schema.

Then, you can use Airflow’s data-aware scheduling capabilities to schedule my_other_model to run after my_model. For example, you can use the following DAGs:

from cosmos import DbtDag

project_one = DbtDag(
    # ...
    start_date=datetime(2023, 1, 1),
    schedule="@daily",
)

project_two = DbtDag(
    schedule=[Dataset("postgres://host:5432/database.schema.my_model")],
    dbt_project_name="project_two",
)

In this scenario, project_one runs once a day and project_two runs immediately after project_one. You can view these dependencies in Airflow’s UI.

Examples#

This example DAG:

Will trigger the following DAG to be run (when using Cosmos 1.1 when using Airflow 2.4 or newer):

from datetime import datetime
from airflow import DAG
from airflow.datasets import Dataset
from airflow.operators.empty import EmptyOperator


with DAG(
    "dataset_triggered_dag",
    description="A DAG that should be triggered via Dataset",
    start_date=datetime(2024, 9, 1),
    schedule=[Dataset(uri="postgres://0.0.0.0:5434/postgres.public.orders")],
) as dag:
    t1 = EmptyOperator(
        task_id="task_1",
    )
    t2 = EmptyOperator(
        task_id="task_2",
    )

    t1 >> t2

From Cosmos 1.7 and Airflow 2.10, it is also possible to trigger DAGs be to be run by using DatasetAliases:

from datetime import datetime
from airflow import DAG
from airflow.datasets import DatasetAlias
from airflow.operators.empty import EmptyOperator


with DAG(
    "datasetalias_triggered_dag",
    description="A DAG that should be triggered via Dataset alias",
    start_date=datetime(2024, 9, 1),
    schedule=[DatasetAlias(name="basic_cosmos_dag__orders__run")],
) as dag:

    t3 = EmptyOperator(
        task_id="task_3",
    )

    t3

Known Limitations#

Airflow 2.9 and below#

If using cosmos with an Airflow 2.9 or below, users will experience the following issues:

  • The task inlets and outlets generated by Cosmos will not be seen in the Airflow UI

  • The scheduler logs will contain many messages saying “Orphaning unreferenced dataset”

Example of scheduler logs:

scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_customers'
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_payments'
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.stg_orders'
scheduler | [2023-09-08T10:18:34.252+0100] {scheduler_job_runner.py:1742} INFO - Orphaning unreferenced dataset 'postgres://0.0.0.0:5432/postgres.public.customers'

References about the root cause of these issues:

Airflow 2.10.0 and 2.10.1#

If using Cosmos with Airflow 2.10.0 or 2.10.1, the two issues previously described are resolved, since Cosmos uses DatasetAlias to support the dynamic creation of datasets during task execution. However, users may face sqlalchemy.orm.exc.FlushError errors if they attempt to run Cosmos-powered DAGs using airflow dags test with these versions.

We’ve reported this issue and it will be resolved in future versions of Airflow:

For users to overcome this limitation in local tests, until the Airflow community solves this, we introduced the configuration AIRFLOW__COSMOS__ENABLE_DATASET_ALIAS, that is True by default. If users want to run dags test` and not see ``sqlalchemy.orm.exc.FlushError, they can set this configuration to False. It can also be set in the airflow.cfg file:

[cosmos]
enable_dataset_alias = False