Code Samples

Index

Example 1: Ray jobs on an existing cluster

If you already have a Ray cluster set up, you can use the SubmitRayJob operator or ray.task() decorator to submit jobs directly.

In the example below (ray_taskflow_example_existing_cluster.py), the @ray.task decorator is used to define a task that will be executed on the Ray cluster:

Important

Set the Ray Dashboard URL connection parameter or RAY_ADDRESS on your airflow worker to connect to your cluster

 1from datetime import datetime
 2from pathlib import Path
 3
 4from airflow.decorators import dag, task
 5
 6from ray_provider.decorators.ray import ray
 7
 8CONN_ID = "ray_job"
 9FOLDER_PATH = Path(__file__).parent / "ray_scripts"
10RAY_TASK_CONFIG = {
11    "conn_id": CONN_ID,
12    "runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]},
13    "num_cpus": 1,
14    "num_gpus": 0,
15    "memory": 0,
16    "poll_interval": 5,
17}
18
19
20@dag(
21    dag_id="Ray_Taskflow_Example_Existing_Cluster",
22    start_date=datetime(2023, 1, 1),
23    schedule=None,
24    catchup=False,
25    tags=["ray", "example"],
26)
27def ray_taskflow_dag():
28
29    @task
30    def generate_data():
31        return [1, 2, 3]
32
33    @ray.task(config=RAY_TASK_CONFIG)
34    def process_data_with_ray(data):
35        import numpy as np
36        import ray
37
38        @ray.remote
39        def square(x):
40            return x**2
41
42        ray.init()
43        data = np.array(data)
44        futures = [square.remote(x) for x in data]
45        results = ray.get(futures)
46        mean = np.mean(results)
47        print(f"Mean of this population is {mean}")
48        return mean
49
50    data = generate_data()
51    process_data_with_ray(data)
52
53
54ray_example_dag = ray_taskflow_dag()

Ray Cluster Sample Spec (YAML)

Important

spec.headGroupSpec.serviceType must be a ‘LoadBalancer’ to spin a service that exposes your dashboard externally

Save this file in a location accessible to your Airflow installation, and reference it in your DAG code.

apiVersion: ray.io/v1
kind: RayCluster
metadata:
  name: raycluster-complete
spec:
  rayVersion: "2.10.0"
  enableInTreeAutoscaling: true
  headGroupSpec:
    serviceType: LoadBalancer
    rayStartParams:
      dashboard-host: "0.0.0.0"
      block: "true"
    template:
      metadata:
        labels:
          ray-node-type: head
      spec:
        containers:
        - name: ray-head
          image: rayproject/ray-ml:latest
          resources:
            limits:
              cpu: 1
              memory: 3Gi
            requests:
              cpu: 1
              memory: 3Gi
          lifecycle:
            preStop:
              exec:
                command: ["/bin/sh","-c","ray stop"]
          ports:
          - containerPort: 6379
            name: gcs
          - containerPort: 8265
            name: dashboard
          - containerPort: 10001
            name: client
          - containerPort: 8000
            name: serve
          - containerPort: 8080
            name: metrics
  workerGroupSpecs:
  - groupName: small-group
    replicas: 1
    minReplicas: 1
    maxReplicas: 2
    rayStartParams:
      block: "true"
    template:
      metadata:
      spec:
        containers:
        - name: machine-learning
          image: rayproject/ray-ml:latest
          resources:
            limits:
              cpu: 1
              memory: 1Gi
            requests:
              cpu: 1
              memory: 1Gi

Example 2: Using @ray.task for job lifecycle

The below example showcases how to use the @ray.task decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown.

This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion.

 1from datetime import datetime
 2from pathlib import Path
 3
 4from airflow.decorators import dag, task
 5
 6from ray_provider.decorators.ray import ray
 7
 8CONN_ID = "ray_conn"
 9RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
10FOLDER_PATH = Path(__file__).parent / "ray_scripts"
11RAY_TASK_CONFIG = {
12    "conn_id": CONN_ID,
13    "runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]},
14    "num_cpus": 1,
15    "num_gpus": 0,
16    "memory": 0,
17    "poll_interval": 5,
18    "ray_cluster_yaml": str(RAY_SPEC),
19    "xcom_task_key": "dashboard",
20}
21
22
23@dag(
24    dag_id="Ray_Taskflow_Example",
25    start_date=datetime(2023, 1, 1),
26    schedule=None,
27    catchup=False,
28    tags=["ray", "example"],
29)
30def ray_taskflow_dag():
31
32    @task
33    def generate_data():
34        return [1, 2, 3]
35
36    @ray.task(config=RAY_TASK_CONFIG)
37    def process_data_with_ray(data):
38        import numpy as np
39        import ray
40
41        @ray.remote
42        def square(x):
43            return x**2
44
45        ray.init()
46        data = np.array(data)
47        futures = [square.remote(x) for x in data]
48        results = ray.get(futures)
49        mean = np.mean(results)
50        print(f"Mean of this population is {mean}")
51        return mean
52
53    data = generate_data()
54    process_data_with_ray(data)
55
56
57ray_example_dag = ray_taskflow_dag()

Example 3: Using SubmitRayJob operator for job lifecycle

This example demonstrates how to use the SubmitRayJob operator to manage the full lifecycle of a Ray cluster and job execution.

This operator provides a more declarative way to define your Ray job within an Airflow DAG.

 1from datetime import datetime
 2from pathlib import Path
 3
 4from airflow import DAG
 5
 6from ray_provider.operators.ray import SubmitRayJob
 7
 8CONN_ID = "ray_conn"
 9RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
10FOLDER_PATH = Path(__file__).parent / "ray_scripts"
11RAY_RUNTIME_ENV = {"working_dir": str(FOLDER_PATH)}
12
13dag = DAG(
14    "Ray_Single_Operator",
15    start_date=datetime(2023, 1, 1),
16    schedule=None,
17    catchup=False,
18    tags=["ray", "example"],
19)
20
21submit_ray_job = SubmitRayJob(
22    task_id="SubmitRayJob",
23    conn_id=CONN_ID,
24    entrypoint="python script.py",
25    runtime_env=RAY_RUNTIME_ENV,
26    num_cpus=1,
27    num_gpus=0,
28    memory=0,
29    resources={},
30    xcom_task_key="SubmitRayJob.dashboard",
31    ray_cluster_yaml=str(RAY_SPEC),
32    fetch_logs=True,
33    wait_for_completion=True,
34    job_timeout_seconds=600,
35    poll_interval=5,
36    dag=dag,
37)
38
39
40# Create ray cluster and submit ray job
41submit_ray_job

Example 4: SetupRayCluster, SubmitRayJob & DeleteRayCluster

This example shows how to use separate operators for cluster setup, job submission, and teardown, providing more granular control over the process.

This approach allows for more complex workflows involving Ray clusters.

Key Points:

  • Uses SetupRayCluster, SubmitRayJob, and DeleteRayCluster operators separately.

  • Allows for multiple jobs to be submitted to the same cluster before deletion.

  • Demonstrates how to pass cluster information between tasks using XCom.

This method is ideal for scenarios where you need fine-grained control over the cluster lifecycle, such as running multiple jobs on the same cluster or keeping the cluster alive for a certain period.

Important

The SubmitRayJob operator uses the xcom_task_key parameter “SetupRayCluster.dashboard” to retrieve the Ray dashboard URL. This URL, stored as an XCom variable by the SetupRayCluster task, is necessary for job submission.

 1from datetime import datetime
 2from pathlib import Path
 3
 4from airflow import DAG
 5
 6from ray_provider.operators.ray import DeleteRayCluster, SetupRayCluster, SubmitRayJob
 7
 8CONN_ID = "ray_conn"
 9RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
10FOLDER_PATH = Path(__file__).parent / "ray_scripts"
11
12with DAG(
13    "Setup_Teardown",
14    start_date=datetime(2023, 1, 1),
15    schedule=None,
16    catchup=False,
17    tags=["ray", "example"],
18):
19
20    setup_cluster = SetupRayCluster(
21        task_id="SetupRayCluster", conn_id=CONN_ID, ray_cluster_yaml=str(RAY_SPEC), update_if_exists=False
22    )
23
24    submit_ray_job = SubmitRayJob(
25        task_id="SubmitRayJob",
26        conn_id=CONN_ID,
27        entrypoint="python script.py",
28        runtime_env={"working_dir": str(FOLDER_PATH)},
29        num_cpus=1,
30        num_gpus=0,
31        memory=0,
32        resources={},
33        fetch_logs=True,
34        wait_for_completion=True,
35        job_timeout_seconds=600,
36        xcom_task_key="SetupRayCluster.dashboard",
37        poll_interval=5,
38    )
39
40    delete_cluster = DeleteRayCluster(task_id="DeleteRayCluster", conn_id=CONN_ID, ray_cluster_yaml=str(RAY_SPEC))
41
42    # Create ray cluster and submit ray job
43    setup_cluster.as_setup() >> submit_ray_job >> delete_cluster.as_teardown()
44    setup_cluster >> delete_cluster