Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
This page describes how to use Cloud Composer 2 to run Dataproc Serverless workloads on Google Cloud.
Examples in the following sections show you how to use operators for managing Dataproc Serverless batch workloads. You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload:
Create DAGs for operators that work with Dataproc Serverless Batch workloads:
Create DAGs that use custom containers, and Dataproc Metastore.
Configure Persistent History Server for these DAGs.
Before you begin
Enable the Dataproc API:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Select the location for your Batch workload file. You can use any of the following options:
- Create a Cloud Storage bucket that stores this file.
- Use your environment's bucket. Because you do not need to sync this file
with Airflow, you can create a separate subfolder outside the
/dags
or/data
folders. For example,/batches
. - Use an existing bucket.
Set up files and Airflow variables
This section demonstrates how to set up files and configure Airflow variables for this tutorial.
Upload a Dataproc Serverless Spark ML workload file to a bucket
The workload in this tutorial runs a pyspark script:
Save any pyspark script to a local file named
spark-job.py
. For example, you can use the sample pyspark script.Upload the file to the location that you selected in Before you begin.
Set Airflow variables
Examples in the following sections use Airflow variables. You set values for these variables in Airflow, then your DAG code can access these values.
Examples in this tutorial use the following Airflow variables. You can set them as needed, depending on the example you use.
Set the following Airflow variables for use in your DAG code:
project_id
: Project ID.bucket_name
: URI of a bucket where the main python file of the workload (spark-job.py
) is located. You selected this location in Before you begin.phs_cluster
: Persistent History Server cluster name. You set this variable when you Create a Persistent History Server.image_name
: name and tag of the custom container image (image:tag
). You set this variable when you use custom container image with DataprocCreateBatchOperator.metastore_cluster
: Dataproc Metastore service name. You set this variable when you use Dataproc Metastore service with DataprocCreateBatchOperator.region_name
: region where the Dataproc Metastore service is located. You set this variable when you use Dataproc Metastore service with DataprocCreateBatchOperator.
Use the Google Cloud console and Airflow UI to set each Airflow variable
In Google Cloud console, go to the Environments page.
In the list of environments, click the Airflow link for your environment. Airflow UI opens.
In Airflow UI, select Admin > Variables.
Click Add a new record.
Specify the name of the variable in the Key field, and set the value for it in the Val field.
Click Save.
Create a Persistent History Server
Use a Persistent History Server (PHS) to view Spark history files of your batch workloads:
- Create a Persistent History Server.
- Make sure that you specified the name of the PHS cluster in the
phs_cluster
Airflow variable.
DataprocCreateBatchOperator
The following DAG starts a Dataproc Serverless Batch workload.
For more information about DataprocCreateBatchOperator
arguments, see
operator's source code.
For more information about attributes that you can pass in the batch
parameter of DataprocCreateBatchOperator
, see the
description of the Batch class.
Use custom container image with DataprocCreateBatchOperator
The following example shows how to use a custom container image to run your workloads. You can use a custom container, for example, to add Python dependencies not provided by the default container image.
To use a custom container image:
Create a custom container image and upload it to Container Registry.
Specify the image in the
image_name
Airflow variable.Use DataprocCreateBatchOperator with your custom image:
Use Dataproc Metastore service with DataprocCreateBatchOperator
To use a Dataproc Metastore service from a DAG:
Check that your metastore service is already started.
To learn about starting a metastore service, see Enable and disable Dataproc Metastore.
For detailed information about the batch operator for creating the configuration, see PeripheralsConfig.
Once the metastore service is up and running, specify its name in the
metastore_cluster
variable and its region in theregion_name
Airflow variable.Use the metastore service in DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
You can use DataprocDeleteBatchOperator to delete a batch based on the batch id of the workload.
DataprocListBatchesOperator
DataprocDeleteBatchOperator lists batches that exist within a given project_id and region.
DataprocGetBatchOperator
DataprocGetBatchOperator fetches one particular batch workload.