Code Samples¶
Index¶
Example 1: Ray jobs on an existing cluster¶
If you already have a Ray cluster set up, you can use the SubmitRayJob
operator or ray.task()
decorator to submit jobs directly.
In the example below (ray_taskflow_example_existing_cluster.py
), the @ray.task
decorator is used to define a task that will be executed on the Ray cluster:
Important
Set the Ray Dashboard URL connection parameter or RAY_ADDRESS on your airflow worker to connect to your cluster
1from datetime import datetime
2from pathlib import Path
3
4from airflow.decorators import dag, task
5
6from ray_provider.decorators import ray
7
8CONN_ID = "ray_conn"
9FOLDER_PATH = Path(__file__).parent / "ray_scripts"
10RAY_TASK_CONFIG = {
11 "conn_id": CONN_ID,
12 "runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]},
13 "num_cpus": 1,
14 "num_gpus": 0,
15 "memory": 0,
16 "poll_interval": 5,
17}
18
19
20@dag(
21 dag_id="Ray_Taskflow_Example_Existing_Cluster",
22 start_date=datetime(2023, 1, 1),
23 schedule=None,
24 catchup=False,
25 tags=["ray", "example"],
26)
27def ray_taskflow_dag():
28
29 @task
30 def generate_data():
31 return [1, 2, 3]
32
33 @ray.task(config=RAY_TASK_CONFIG)
34 def process_data_with_ray(data):
35 import numpy as np
36 import ray
37
38 @ray.remote
39 def square(x):
40 return x**2
41
42 ray.init()
43 data = np.array(data)
44 futures = [square.remote(x) for x in data]
45 results = ray.get(futures)
46 mean = np.mean(results)
47 print(f"Mean of this population is {mean}")
48 return mean
49
50 data = generate_data()
51 process_data_with_ray(data)
52
53
54ray_example_dag = ray_taskflow_dag()
Ray Cluster Sample Spec (YAML)¶
Important
spec.headGroupSpec.serviceType
must be a ‘LoadBalancer’ to spin a service that exposes your dashboard externally
Save this file in a location accessible to your Airflow installation, and reference it in your DAG code.
apiVersion: ray.io/v1
kind: RayCluster
metadata:
name: airflow-raycluster
spec:
rayVersion: "2.10.0"
enableInTreeAutoscaling: true
headGroupSpec:
serviceType: LoadBalancer
rayStartParams:
dashboard-host: "0.0.0.0"
block: "true"
template:
metadata:
labels:
ray-node-type: head
spec:
imagePullSecrets:
- name: my-registry-secret
containers:
- name: ray-head
image: rayproject/ray:2.20.0-aarch64
resources:
limits:
cpu: 1
memory: 3Gi
requests:
cpu: 1
memory: 3Gi
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","ray stop"]
ports:
- containerPort: 6379
name: gcs
- containerPort: 8265
name: dashboard
- containerPort: 10001
name: client
- containerPort: 8000
name: serve
- containerPort: 8080
name: metrics
workerGroupSpecs:
- groupName: small-group
replicas: 1
minReplicas: 1
maxReplicas: 2
rayStartParams:
block: "true"
template:
metadata:
spec:
imagePullSecrets:
- name: my-registry-secret
containers:
- name: machine-learning
image: rayproject/ray:2.20.0-aarch64
resources:
limits:
cpu: 1
memory: 1Gi
requests:
cpu: 1
memory: 1Gi
Example 2: Using @ray.task for job lifecycle¶
The below example showcases how to use the @ray.task
decorator to manage the full lifecycle of a Ray cluster: setup, job execution, and teardown.
This approach is ideal for jobs that require a dedicated, short-lived cluster, optimizing resource usage by cleaning up after task completion.
1import os
2from datetime import datetime
3from pathlib import Path
4
5from airflow.decorators import dag, task
6
7from ray_provider.decorators import ray
8
9CONN_ID = "ray_conn"
10RAY_SPEC_FILENAME = os.getenv("RAY_SPEC_FILENAME", "ray.yaml")
11RAY_SPEC = Path(__file__).parent / "scripts" / RAY_SPEC_FILENAME
12
13FOLDER_PATH = Path(__file__).parent / "ray_scripts"
14RAY_TASK_CONFIG = {
15 "conn_id": CONN_ID,
16 "runtime_env": {"working_dir": str(FOLDER_PATH), "pip": ["numpy"]},
17 "num_cpus": 1,
18 "num_gpus": 0,
19 "memory": 0,
20 "poll_interval": 5,
21 "ray_cluster_yaml": str(RAY_SPEC),
22 "xcom_task_key": "dashboard",
23}
24
25
26@dag(
27 dag_id="Ray_Taskflow_Example",
28 start_date=datetime(2023, 1, 1),
29 schedule=None,
30 catchup=False,
31 tags=["ray", "example"],
32)
33def ray_taskflow_dag():
34
35 @task
36 def generate_data():
37 return [1, 2, 3]
38
39 @ray.task(config=RAY_TASK_CONFIG)
40 def process_data_with_ray(data):
41 import numpy as np
42 import ray
43
44 @ray.remote
45 def square(x):
46 return x**2
47
48 ray.init()
49 data = np.array(data)
50 futures = [square.remote(x) for x in data]
51 results = ray.get(futures)
52 mean = np.mean(results)
53 print(f"Mean of this population is {mean}")
54 return mean
55
56 data = generate_data()
57 process_data_with_ray(data)
58
59
60ray_example_dag = ray_taskflow_dag()
Example 3: Using SubmitRayJob operator for job lifecycle¶
This example demonstrates how to use the SubmitRayJob
operator to manage the full lifecycle of a Ray cluster and job execution.
This operator provides a more declarative way to define your Ray job within an Airflow DAG.
1from datetime import datetime
2from pathlib import Path
3
4from airflow import DAG
5
6from ray_provider.operators import SubmitRayJob
7
8CONN_ID = "ray_conn"
9RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
10FOLDER_PATH = Path(__file__).parent / "ray_scripts"
11RAY_RUNTIME_ENV = {"working_dir": str(FOLDER_PATH)}
12
13dag = DAG(
14 "Ray_Single_Operator",
15 start_date=datetime(2023, 1, 1),
16 schedule=None,
17 catchup=False,
18 tags=["ray", "example"],
19)
20
21submit_ray_job = SubmitRayJob(
22 task_id="SubmitRayJob",
23 conn_id=CONN_ID,
24 entrypoint="python script.py",
25 runtime_env=RAY_RUNTIME_ENV,
26 num_cpus=1,
27 num_gpus=0,
28 memory=0,
29 resources={},
30 xcom_task_key="SubmitRayJob.dashboard",
31 ray_cluster_yaml=str(RAY_SPEC),
32 fetch_logs=True,
33 wait_for_completion=True,
34 job_timeout_seconds=600,
35 poll_interval=5,
36 dag=dag,
37)
38
39
40# Create ray cluster and submit ray job
41submit_ray_job
Example 4: SetupRayCluster, SubmitRayJob & DeleteRayCluster¶
This example shows how to use separate operators for cluster setup, job submission, and teardown, providing more granular control over the process.
This approach allows for more complex workflows involving Ray clusters.
Key Points:
Uses SetupRayCluster, SubmitRayJob, and DeleteRayCluster operators separately.
Allows for multiple jobs to be submitted to the same cluster before deletion.
Demonstrates how to pass cluster information between tasks using XCom.
This method is ideal for scenarios where you need fine-grained control over the cluster lifecycle, such as running multiple jobs on the same cluster or keeping the cluster alive for a certain period.
Important
The SubmitRayJob operator uses the xcom_task_key parameter “SetupRayCluster.dashboard” to retrieve the Ray dashboard URL. This URL, stored as an XCom variable by the SetupRayCluster task, is necessary for job submission.
1from datetime import datetime
2from pathlib import Path
3
4from airflow import DAG
5
6from ray_provider.operators import DeleteRayCluster, SetupRayCluster, SubmitRayJob
7
8CONN_ID = "ray_conn"
9RAY_SPEC = Path(__file__).parent / "scripts/ray.yaml"
10FOLDER_PATH = Path(__file__).parent / "ray_scripts"
11
12with DAG(
13 "Setup_Teardown",
14 start_date=datetime(2023, 1, 1),
15 schedule=None,
16 catchup=False,
17 tags=["ray", "example"],
18):
19
20 setup_cluster = SetupRayCluster(
21 task_id="SetupRayCluster", conn_id=CONN_ID, ray_cluster_yaml=str(RAY_SPEC), update_if_exists=False
22 )
23
24 submit_ray_job = SubmitRayJob(
25 task_id="SubmitRayJob",
26 conn_id=CONN_ID,
27 entrypoint="python script.py",
28 runtime_env={"working_dir": str(FOLDER_PATH)},
29 num_cpus=1,
30 num_gpus=0,
31 memory=0,
32 resources={},
33 fetch_logs=True,
34 wait_for_completion=True,
35 job_timeout_seconds=600,
36 xcom_task_key="SetupRayCluster.dashboard",
37 poll_interval=5,
38 )
39
40 delete_cluster = DeleteRayCluster(task_id="DeleteRayCluster", conn_id=CONN_ID, ray_cluster_yaml=str(RAY_SPEC))
41
42 # Create ray cluster and submit ray job
43 setup_cluster.as_setup() >> submit_ray_job >> delete_cluster.as_teardown()
44 setup_cluster >> delete_cluster