使用 Cloud Composer 运行 Dataproc 无服务器工作负载

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

本页面介绍了如何使用 Cloud Composer 2 在 Google Cloud 上运行 Dataproc 无服务器工作负载。

以下部分中的示例向您展示了如何使用运算符管理 Dataproc Serverless 批处理工作负载。您可以在用于创建、删除、列出和获取 Dataproc Serverless Spark 批量工作负载的 DAG 中使用以下运算符:

准备工作

  1. 启用 Dataproc API:

    控制台

    启用 Dataproc API。

    启用 API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 选择批处理工作负载文件的位置。您可以使用以下任一选项:

    • 创建一个 Cloud Storage 存储桶来存储此文件。
    • 使用您环境的存储桶。由于您不需要将此文件与 Airflow 同步,因此您可以在 /dags/data 文件夹之外创建一个单独的子文件夹。例如 /batches
    • 使用现有存储桶。

设置文件和 Airflow 变量

本部分将演示如何为本教程设置文件和配置 Airflow 变量。

将 Dataproc Serverless Spark ML 工作负载文件上传到存储桶

本教程中的工作负载运行 pyspark 脚本:

  1. 将任何 pyspark 脚本保存到名为 spark-job.py 的本地文件中。例如,您可以使用示例 pyspark 脚本

  2. 将文件上传到您在准备工作中选择的位置。

设置 Airflow 变量

以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后 DAG 代码便可以访问这些值。

本教程中的示例使用以下 Airflow 变量。您可以根据需要对其进行设置,具体取决于您使用的示例。

设置以下 Airflow 变量,以便在 DAG 代码中使用:

  • project_id项目 ID
  • bucket_name:工作负载 (spark-job.py) 的主 Python 文件所在的存储桶的 URI。您在准备工作中选择了此位置。
  • phs_cluster:永久性历史记录服务器集群名称。您应在创建永久性历史记录服务器时设置此变量。
  • image_name:自定义容器映像 (image:tag) 的名称和标记。在通过 DataprocCreateBatchOperator 使用自定义容器映像时,您可以设置此变量。
  • metastore_cluster:Dataproc Metastore 服务名称。在将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 结合使用时,您可以设置此变量。
  • region_name:Dataproc Metastore 服务所在的区域。在将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 结合使用时,您可以设置此变量。

使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境对应的 Airflow 链接。系统会打开 Airflow 界面。

  3. 在 Airflow 界面中,依次选择 Admin > Variables

  4. 点击添加新记录

  5. 字段中指定变量的名称,并在 Val 字段中设置此变量的值。

  6. 点击保存

创建永久性历史记录服务器

使用永久性历史记录服务器 (PHS) 查看批量工作负载的 Spark 历史记录文件:

  1. 创建一个永久性历史记录服务器
  2. 确保您在 phs_cluster Airflow 变量中指定了 PHS 集群的名称。

DataprocCreateBatchOperator

以下 DAG 会启动一个 Dataproc Serverless Batch 工作负载。

如需详细了解 DataprocCreateBatchOperator 参数,请参阅运算符的源代码

如需详细了解可传入 DataprocCreateBatchOperatorbatch 参数的属性,请参阅 Batch 类说明


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

将自定义容器映像与 DataprocCreateBatchOperator 结合使用

以下示例展示了如何使用自定义容器映像运行工作负载。例如,您可以使用自定义容器添加默认容器映像未提供的 Python 依赖项。

如需使用自定义容器映像,请执行以下操作:

  1. 创建自定义容器映像并将其上传到 Container Registry

  2. image_name Airflow 变量中指定映像。

  3. 将 DataprocCreateBatchOperator 用于您的自定义映像:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用

如需从 DAG 使用 Dataproc Metastore 服务,请执行以下操作:

  1. 检查您的 Metastore 服务是否已启动。

    如需了解如何启动 Metastore 服务,请参阅启用和停用 Dataproc Metastore

    如需详细了解用于创建配置的批处理运算符,请参阅 PeripheralsConfig

  2. Metastore 服务启动并运行后,请在 metastore_cluster 变量中指定其名称,在 region_name Airflow 变量中指定其区域。

  3. 在 DataprocCreateBatchOperator 中使用 Metastore 服务:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator 根据工作负载的批次 ID 删除批次。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 会列出给定 project_id 和区域中存在的批次。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 提取一个特定的批量工作负载。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

后续步骤