使用 Cloud Composer 运行 Dataproc 无服务器工作负载

Cloud Composer 1 | Cloud Composer 2

本页面介绍如何使用 Cloud Composer 2 在 Google Cloud 上运行 Dataproc 无服务器工作负载。

以下部分中的示例展示了如何使用运算符管理 Dataproc 无服务器批处理工作负载。您可以在创建、删除、列出和获取 Dataproc Serverless Spark 批量工作负载的 DAG 中使用以下运算符:

准备工作

  1. 启用 Dataproc API:

    控制台

    启用 Dataproc API。

    启用 API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 选择批量工作负载文件的位置。您可以使用以下任一选项:

    • 创建一个 Cloud Storage 存储桶来存储此文件。
    • 使用您的环境的存储桶。由于您无需将此文件与 Airflow 同步,因此您可以在 /dags/data 文件夹之外单独创建一个子文件夹。例如 /batches
    • 使用现有存储桶。

设置文件和 Airflow 变量

本部分演示了如何为本教程设置文件和配置 Airflow 变量。

将 Dataproc Serverless Spark ML 工作负载文件上传到存储桶

本教程中的工作负载运行 pyspark 脚本:

  1. 将任何 pyspark 脚本保存到名为 spark-job.py 的本地文件中。例如,您可以使用示例 pyspark 脚本

  2. 将此文件上传到您在准备工作中选择的位置。

设置 Airflow 变量

以下部分中的示例使用 Airflow 变量。您可以在 Airflow 中为这些变量设置值,然后您的 DAG 代码可以访问这些值。

本教程中的示例使用以下 Airflow 变量。您可以根据需要对其进行设置,具体取决于您使用的示例。

设置以下 Airflow 变量,以便在 DAG 代码中使用:

  • project_id项目 ID
  • bucket_name:工作负载的主 Python 文件 (spark-job.py) 所在的存储桶的 URI。您在准备工作部分选择了此位置。
  • phs_cluster:永久性历史记录服务器集群名称。您可在创建永久性历史记录服务器时设置此变量。
  • image_name:自定义容器映像 (image:tag) 的名称和标记。当您将自定义容器映像与 DataprocCreateBatchOperator 结合使用时,可以设置此变量。
  • metastore_cluster:Dataproc Metastore 服务名称。在将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 结合使用时,可以设置此变量。
  • region_name:Dataproc Metastore 服务所在的区域。在将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 结合使用时,可以设置此变量。

使用 Google Cloud 控制台和 Airflow 界面设置每个 Airflow 变量

  1. 在 Google Cloud 控制台中,前往环境页面。

    转到“环境”

  2. 在环境列表中,点击您的环境对应的 Airflow 链接。Airflow 界面随即会打开。

  3. 在 Airflow 界面中,依次选择管理 > 变量

  4. 点击添加新记录

  5. 字段中指定变量的名称,并在价值字段中设置变量值。

  6. 点击保存

创建永久性历史记录服务器

使用永久性历史记录服务器 (PHS) 查看批量工作负载的 Spark 历史记录文件:

  1. 创建永久性历史记录服务器
  2. 确保在 phs_cluster Airflow 变量中指定 PHS 集群的名称。

DataprocCreateBatchOperator

以下 DAG 将启动 Dataproc 无服务器批处理工作负载。

如需详细了解 DataprocCreateBatchOperator 参数,请参阅运算符的源代码

如需详细了解可在 DataprocCreateBatchOperatorbatch 参数中传递的属性,请参阅 Batch 类的说明


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

将自定义容器映像与 DataprocCreateBatchOperator 结合使用

以下示例展示了如何使用自定义容器映像运行工作负载。例如,您可以使用自定义容器来添加默认容器映像未提供的 Python 依赖项。

如需使用自定义容器映像,请执行以下操作:

  1. 创建自定义容器映像并将其上传到 Container Registry

  2. image_name Airflow 变量中指定映像。

  3. 将 DataprocCreateBatchOperator 与您的自定义映像搭配使用:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

将 Dataproc Metastore 服务与 DataprocCreateBatchOperator 搭配使用

如需从 DAG 使用 Dataproc Metastore 服务,请执行以下操作:

  1. 检查您的 Metastore 服务是否已启动。

    如需了解如何启动 Metastore 服务,请参阅启用和停用 Dataproc Metastore

    如需详细了解用于创建配置的批处理运算符,请参阅 PeripheralsConfig

  2. Metastore 服务启动并运行后,在 metastore_cluster 变量中指定其名称,在 region_name Airflow 变量中指定其区域。

  3. 在 DataprocCreateBatchOperator 中使用 Metastore 服务:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator 根据工作负载的批次 ID 删除批次。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 列出给定 project_id 和区域中存在的批次。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 可提取一个特定的批量工作负载。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

后续步骤