Skip to content

Starship Migration DAG

The StarshipAirflowMigrationDAG can be used to migrate Airflow Variables, Pools, Connections, and DAG History from one Airflow instance to another.

The StarshipAirflowMigrationDAG should be used in instances where the source Airflow Webserver is unable to correctly host a Plugin. The Target must still have a functioning Starship Plugin installed, be running the same version of Airflow, and have the same set of DAGs deployed.

The StarshipAirflowMigrationDAG should be used if migrating from a Google Cloud Composer 1 (with Airflow 2.x) or MWAA v2.0.2 environment. These environments do not support webserver plugins and will require using the StarshipAirflowMigrationDAG to migrate data.

Installation

Add the following line to your requirements.txt in your source environment:

astronomer-starship[provider]

Setup

Make a connection in Airflow with the following details: - Conn ID: starship_default - Conn Type: HTTP - Host: the URL of the homepage of Airflow (excluding /home on the end of the URL) - For example, if your deployment URL is https://astronomer.astronomer.run/abcdt4ry/home, you'll use https://astronomer.astronomer.run/abcdt4ry - Schema: https - Extras: {"Authorization": "Bearer <token>"}

Usage

  1. Add the following DAG to your source environment:

    dags/starship_airflow_migration_dag.py
    from astronomer_starship.providers.starship.operators.starship import StarshipAirflowMigrationDAG
    
    globals()['starship_airflow_migration_dag'] = StarshipAirflowMigrationDAG(http_conn_id="starship_default")
    
  2. Unpause the DAG in the Airflow UI

  3. Once the DAG successfully runs, your connections, variables, and environment variables should all be migrated to Astronomer

Configuration

The StarshipAirflowMigrationDAG can be configured as follows:

StarshipAirflowMigrationDAG(
    http_conn_id="starship_default",
    variables=None,  # None to migrate all, or ["var1", "var2"] to migrate specific items, or empty list to skip all
    pools=None,  # None to migrate all, or ["pool1", "pool2"] to migrate specific items, or empty list to skip all
    connections=None,  # None to migrate all, or ["conn1", "conn2"] to migrate specific items, or empty list to skip all
    dag_ids=None,  # None to migrate all, or ["dag1", "dag2"] to migrate specific items, or empty list to skip all
)

You can use this DAG to migrate all items, or specific items by providing a list of names.

You can skip migration by providing an empty list.

Python API

Hooks

Hooks for interacting with Starship migrations

StarshipHttpHook

get_connections
get_connections()

Get all connections from the Target Starship instance.

get_dag_runs
get_dag_runs(
    dag_id: str, offset: int = 0, limit: int = 10
) -> dict

Get DAG runs from the Target Starship instance.

get_dags
get_dags() -> dict

Get all DAGs from the Target Starship instance.

get_pools
get_pools()

Get all pools from the Target Starship instance.

get_task_instances
get_task_instances(
    dag_id: str, offset: int = 0, limit: int = 10
)

Get task instances from the Target Starship instance.

get_variables
get_variables()

Get all variables from the Target Starship instance.

set_connection
set_connection(**kwargs)

Set a connection in the Target Starship instance.

set_dag_is_paused
set_dag_is_paused(dag_id: str, is_paused: bool)

Set the paused status of a DAG in the Target Starship instance.

set_dag_runs
set_dag_runs(dag_runs: List[dict]) -> dict

Set DAG runs in the Target Starship instance.

set_pool
set_pool(**kwargs)

Set a pool in the Target Starship instance.

set_task_instances
set_task_instances(task_instances: list[dict]) -> dict

Set task instances in the Target Starship instance.

set_variable
set_variable(**kwargs)

Set a variable in the Target Starship instance.

StarshipLocalHook

Hook to retrieve local Airflow data, which can then be sent to the Target Starship instance.

get_connections
get_connections()

Get all connections from the local Airflow instance.

get_dag_runs
get_dag_runs(
    dag_id: str, offset: int = 0, limit: int = 10
) -> dict

Get DAG runs from the local Airflow instance.

get_dags
get_dags() -> dict

Get all DAGs from the local Airflow instance.

get_pools
get_pools()

Get all pools from the local Airflow instance.

get_task_instances
get_task_instances(
    dag_id: str, offset: int = 0, limit: int = 10
)

Get task instances from the local Airflow instance.

get_variables
get_variables()

Get all variables from the local Airflow instance.

set_dag_is_paused
set_dag_is_paused(dag_id: str, is_paused: bool)

Set the paused status of a DAG in the local Airflow instance.

Operators, TaskGroups, DAG

Operators, TaskGroups, and DAGs for interacting with the Starship migrations.

StarshipConnectionMigrationOperator

StarshipConnectionMigrationOperator(
    connection_id: Union[str, None] = None, **kwargs
)

Operator to migrate a single Connection from one Airflow instance to another.

StarshipDagHistoryMigrationOperator

StarshipDagHistoryMigrationOperator(
    target_dag_id: str,
    unpause_dag_in_target: bool = False,
    dag_run_limit: int = 10,
    **kwargs
)

Operator to migrate a single DAG from one Airflow instance to another, with it's history.

StarshipPoolMigrationOperator

StarshipPoolMigrationOperator(
    pool_name: Union[str, None] = None, **kwargs
)

Operator to migrate a single Pool from one Airflow instance to another.

StarshipVariableMigrationOperator

StarshipVariableMigrationOperator(
    variable_key: Union[str, None] = None, **kwargs
)

Operator to migrate a single Variable from one Airflow instance to another.

StarshipAirflowMigrationDAG

StarshipAirflowMigrationDAG(
    http_conn_id: str,
    variables: List[str] = None,
    pools: List[str] = None,
    connections: List[str] = None,
    dag_ids: List[str] = None,
    **kwargs
)

DAG to fetch and migrate Variables, Pools, Connections, and DAGs with history from one Airflow instance to another.

starship_connections_migration

starship_connections_migration(
    connections: List[str] = None, **kwargs
)

TaskGroup to fetch and migrate Connections from one Airflow instance to another.

starship_dag_history_migration

starship_dag_history_migration(
    dag_ids: List[str] = None, **kwargs
)

TaskGroup to fetch and migrate DAGs with their history from one Airflow instance to another.

starship_pools_migration

starship_pools_migration(pools: List[str] = None, **kwargs)

TaskGroup to fetch and migrate Pools from one Airflow instance to another.

starship_variables_migration

starship_variables_migration(
    variables: List[str] = None, **kwargs
)

TaskGroup to fetch and migrate Variables from one Airflow instance to another.