使用 Cloud Composer 运行 Dataproc Serverless 工作负载

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本页介绍了如何使用 Cloud Composer 2 在Google Cloud上运行 Dataproc Serverless 工作负载。

以下部分中的示例介绍了如何使用运算符管理 Dataproc Serverless 批处理工作负载。您可以在用于创建、删除、列出和获取 Dataproc Serverless Spark 批处理工作负载的 DAG 中使用以下运算符:

准备工作

  1. 启用 Dataproc API:

    控制台

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 选择批处理工作负载文件的位置。您可以使用以下任一选项:

设置文件和 Airflow 变量

本部分将演示如何为本教程设置文件并配置 Airflow 变量。

将 Dataproc Serverless Spark 机器学习工作负载文件上传到存储分区

本教程中的工作负载会运行一个 pyspark 脚本:

  1. 将任何 pyspark 脚本保存到名为 spark-job.py 的本地文件中。 例如,您可以使用 PySpark 示例脚本

  2. 将文件上传到您在开始前准备工作部分中选择的位置。

设置 Airflow 变量

以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后 DAG 代码就可以访问这些值。

本教程中的示例使用以下 Airflow 变量。您可以根据需要进行设置,具体取决于您使用的示例。

设置以下 Airflow 变量,以便在 DAG 代码中使用:

  • project_id项目 ID
  • bucket_name:工作负载的主要 Python 文件 (spark-job.py) 所在的存储分区的 URI。您已在准备工作中选择了此位置。
  • phs_cluster:永久性历史记录服务器集群名称。您可以在创建永久性历史记录服务器时设置此变量。
  • image_name:自定义容器映像的名称和标记 (image:tag)。您在将 DataprocCreateBatchOperator 与自定义容器映像搭配使用时设置此变量。
  • metastore_cluster:Dataproc Metastore 服务名称。 在将 DataprocCreateBatchOperator 与 Dataproc Metastore 服务搭配使用时,您需要设置此变量。
  • region_name:Dataproc Metastore 服务所在的区域。在将 DataprocCreateBatchOperator 与 Dataproc Metastore 服务搭配使用时,您需要设置此变量。

使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击与您的环境对应的 Airflow 链接。Airflow 界面随即打开。

  3. 在 Airflow 界面中,依次选择 Admin > Variables

  4. 点击添加新记录

  5. 字段中指定变量的名称,并在 Val 字段中为其设置值。

  6. 点击保存

创建永久性历史记录服务器

使用 Persistent History Server (PHS) 查看批处理工作负载的 Spark 历史记录文件:

  1. 创建永久性历史记录服务器
  2. 确保您已在 phs_cluster Airflow 变量中指定 PHS 集群的名称。

DataprocCreateBatchOperator

以下 DAG 会启动 Dataproc Serverless 批处理工作负载。

如需详细了解 DataprocCreateBatchOperator 参数,请参阅运算符的源代码

如需详细了解您可以在 DataprocCreateBatchOperatorbatch 参数中传入的属性,请参阅 Batch 类的说明


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

将自定义容器映像与 DataprocCreateBatchOperator 搭配使用

以下示例展示了如何使用自定义容器映像运行工作负载。例如,您可以使用自定义容器添加默认容器映像未提供的 Python 依赖项。

如需使用自定义容器映像,请执行以下操作:

  1. 创建自定义容器映像并将其上传到 Container Registry

  2. image_name Airflow 变量中指定映像。

  3. 将 DataprocCreateBatchOperator 与自定义映像搭配使用:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用

如需从 DAG 使用 Dataproc Metastore 服务,请执行以下操作:

  1. 检查元存储空间服务是否已启动。

    如需了解如何启动 Metastore 服务,请参阅启用和停用 Dataproc Metastore

    如需详细了解用于创建配置的批处理运算符,请参阅 PeripheralsConfig

  2. Metastore 服务启动并运行后,在 metastore_cluster 变量中指定其名称,并在 region_name Airflow 变量中指定其区域。

  3. 在 DataprocCreateBatchOperator 中使用 Metastore 服务:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator 基于工作负载的批处理 ID 删除批处理。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 会列出给定 project_id 和区域中存在的批处理。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 用于提取一个特定的批处理工作负载。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

后续步骤