Getting Started on MWAA#
Users can face Python dependency issues when trying to use the Cosmos Local Execution Mode in Amazon Managed Workflows for Apache Airflow® (MWAA).
This step-by-step illustrates how to use the Local Execution Mode, together with the
MWAA’s startup script and
the dbt_executable_path
argument.
Create a Startup Script#
MWAA allows users to run a startup script before the scheduler and webserver are started. This is a great place to install dbt into a virtual environment.
To do so:
Initialize a startup script as outlined in MWAA’s documentation here
Add the following to your startup script (be sure to replace
<your-dbt-adapter>
with the actual adapter you need (i.e.dbt-redshift
,dbt-snowflake
, etc.)
#!/bin/sh
export DBT_VENV_PATH="${AIRFLOW_HOME}/dbt_venv"
export PIP_USER=false
python3 -m venv "${DBT_VENV_PATH}"
${DBT_VENV_PATH}/bin/pip install <your-dbt-adapter>
export PIP_USER=true
Install Cosmos#
Add the following to your base project requirements.txt
:
astronomer-cosmos
Move your dbt project into the DAGs directory#
Make a new folder, dbt
, inside your local dags
folder. Then, copy/paste your dbt project into the directory and create a file called my_cosmos_dag.py
in the root of your DAGs directory. Your folder structure should look like this:
├── dags/
│ ├── dbt/
│ │ └── my_dbt_project/
│ │ ├── dbt_project.yml
│ │ ├── models/
│ │ │ ├── my_model.sql
│ │ │ └── my_other_model.sql
│ │ └── macros/
│ │ ├── my_macro.sql
│ │ └── my_other_macro.sql
│ └── my_cosmos_dag.py
└── ...
Note: your dbt projects can go anywhere that Airflow can access. By default, Cosmos looks in the /usr/local/airflow/dags/dbt
directory, but you can change this by setting the dbt_project_dir
argument when you create your DAG instance.
For example, if you wanted to put your dbt project in the /usr/local/airflow/dags/my_dbt_project
directory, you would do:
from cosmos import DbtDag, ProjectConfig
my_cosmos_dag = DbtDag(
project_config=ProjectConfig(
dbt_project_path="/usr/local/airflow/dags/my_dbt_project",
),
# ...,
)
Create your DAG#
In your my_cosmos_dag.py
file, import the DbtDag
class from Cosmos and create a new DAG instance. Make sure to use the dbt_executable_path
argument to point to the virtual environment you created in step 1.
import os
from datetime import datetime
from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig
from cosmos.profiles import PostgresUserPasswordProfileMapping
from cosmos.constants import ExecutionMode
profile_config = ProfileConfig(
profile_name="default",
target_name="dev",
profile_mapping=PostgresUserPasswordProfileMapping(
conn_id="airflow_db",
profile_args={"schema": "public"},
),
)
execution_config = ExecutionConfig(
dbt_executable_path=f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt",
)
my_cosmos_dag = DbtDag(
project_config=ProjectConfig(
"<my_dbt_project>",
),
profile_config=profile_config,
execution_config=execution_config,
# normal dag parameters
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
dag_id="my_cosmos_dag",
default_args={"retries": 2},
)
Note
In some cases, especially in larger dbt projects, you might run into a DagBag import timeout
error.
This error can be resolved by increasing the value of the Airflow configuration core.dagbag_import_timeout.