使用 Cloud Composer 运行 Dataproc Serverless 工作负载

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页介绍如何使用 Cloud Composer 2 来 Dataproc 无服务器工作负载 Google Cloud

以下部分中的示例展示了如何使用 用于管理 Dataproc 无服务器批量工作负载的操作工具。您可以在 DAG 中使用这些运算符创建 删除、列出和获取 Dataproc Serverless Spark 批量工作负载:

准备工作

  1. 启用 Dataproc API:

    控制台

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 选择批处理工作负载文件的位置。您可以使用以下任一选项:

    • 创建 Cloud Storage 存储桶, 存储此文件。
    • 使用环境的存储桶。因为您无需同步此文件 使用 Airflow 时,您可以在 /dags 之外创建一个单独的子文件夹 或 /data 文件夹中。例如 /batches
    • 使用现有存储桶。

设置文件和 Airflow 变量

本部分将演示如何为本教程设置文件并配置 Airflow 变量。

将 Dataproc Serverless Spark 机器学习工作负载文件上传到存储桶

本教程中的工作负载运行 pyspark 脚本:

  1. 将任何 pyspark 脚本保存到名为 spark-job.py 的本地文件中。 例如,您可以使用 pyspark 脚本示例

  2. 将文件上传到您选择的位置 在准备工作部分。

设置 Airflow 变量

以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后 DAG 代码就可以访问这些值。

本教程中的示例使用以下 Airflow 变量。您可以设置 具体取决于您使用的示例。

设置以下 Airflow 变量,以便在 DAG 代码中使用:

  • project_id项目 ID
  • bucket_name:工作负载的主要 Python 文件 (spark-job.py) 所在的存储桶的 URI。您选择了此地点 准备工作
  • phs_cluster:永久性历史记录服务器集群名称。您设置此变量 创建永久性历史记录服务器时。
  • image_name:自定义容器映像的名称和标记 (image:tag)。在将 DataprocCreateBatchOperator 与使用自定义容器映像时,您需要设置此变量。
  • metastore_cluster:Dataproc Metastore 服务名称。 您在设置此变量时 将 Dataproc Metastore 服务与 DataprocCreateBatchOperator.
  • region_name:Dataproc Metastore 服务所在的区域 。您在设置此变量时 将 Dataproc Metastore 服务与 DataprocCreateBatchOperator.

使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击与您的环境对应的 Airflow 链接。系统会打开 Airflow 界面。

  3. 在 Airflow 界面中,依次选择 Admin > Variables

  4. 点击添加新记录

  5. 字段中指定变量的名称,并在 Val 字段中为其设置值。

  6. 点击保存

创建永久性历史记录服务器

使用 Persistent History Server (PHS) 查看批处理工作负载的 Spark 历史记录文件:

  1. 创建永久性历史记录服务器
  2. 确保您已在 phs_cluster Airflow 变量

DataprocCreateBatchOperator

以下 DAG 会启动一个 Dataproc Serverless Batch 工作负载。

如需详细了解 DataprocCreateBatchOperator 参数,请参阅 运算符的源代码

如需详细了解可传入 batch 的属性,请执行以下操作: 参数DataprocCreateBatchOperator的详情,请参阅 Batch 类的说明


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

将自定义容器映像与 DataprocCreateBatchOperator 结合使用

以下示例展示了如何使用自定义容器映像运行工作负载。例如,您可以使用自定义容器添加默认容器映像未提供的 Python 依赖项。

如需使用自定义容器映像,请执行以下操作:

  1. 创建自定义容器映像并将其上传到 Container Registry

  2. image_name Airflow 变量中指定映像。

  3. 将 DataprocCreateBatchOperator 与自定义映像搭配使用:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用

使用 Dataproc Metastore 服务 从 DAG 导入:

  1. 检查元存储空间服务是否已启动。

    如需了解如何启动 Metastore 服务,请参阅 启用和停用 Dataproc Metastore

    如需详细了解用于创建配置的批处理运算符,请参阅 PeripheralsConfig

  2. Metastore 服务启动并运行后,在 region_name Airflow 变量中的 metastore_cluster 变量及其区域。

  3. 在 DataprocCreateBatchOperator 中使用 Metastore 服务:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator 基于工作负载的批处理 ID 删除批处理。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 会列出给定 project_id 和区域中存在的批处理。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 用于提取一个特定的批处理工作负载。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

后续步骤