使用 Cloud Composer 執行 Dataproc Serverless 工作負載

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

本頁說明如何使用 Cloud Composer 2 在Google Cloud上執行 Dataproc Serverless 工作負載。

以下各節的範例說明如何使用運算子管理 Dataproc Serverless 批次工作負載。您可以在 DAG 中使用這些運算子,建立、刪除、列出及取得 Dataproc Serverless Spark 批次工作負載:

事前準備

  1. 啟用 Dataproc API:

    主控台

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. 選取 Batch 工作負載檔案的位置。你可以使用下列任一選項:

    • 建立 Cloud Storage bucket,用來儲存這個檔案。
    • 使用環境的值區。由於您不需要將這個檔案與 Airflow 同步處理,因此可以在 /dags/data 資料夾外建立個別的子資料夾。例如:/batches
    • 使用現有 bucket。

設定檔案和 Airflow 變數

本節說明如何設定檔案,以及設定本教學課程的 Airflow 變數。

將 Dataproc Serverless Spark ML 工作負載檔案上傳至 bucket

本教學課程中的工作負載會執行 pyspark 指令碼:

  1. 將任何 pyspark 指令碼儲存至名為 spark-job.py 的本機檔案。 舉例來說,您可以使用範例 pyspark 指令碼

  2. 將檔案上傳至「開始前」選取的儲存位置。

設定 Airflow 變數

下列各節的範例會使用 Airflow 變數。您可以在 Airflow 中設定這些變數的值,然後 DAG 程式碼就能存取這些值。

本教學課程中的範例使用下列 Airflow 變數。您可以視需要設定這些值,具體取決於您使用的範例。

設定下列 Airflow 變數,供 DAG 程式碼使用:

  • project_id專案 ID
  • bucket_name:bucket 的 URI,其中包含工作負載的主要 Python 檔案 (spark-job.py)。您在「事前準備」中選取了這個位置。
  • phs_cluster:永久記錄伺服器叢集名稱。您會在建立永久記錄伺服器時設定這個變數。
  • image_name:自訂容器映像檔的名稱和標記 (image:tag)。使用 DataprocCreateBatchOperator 自訂容器映像檔時,您會設定這個變數。
  • metastore_cluster:Dataproc Metastore 服務名稱。使用 DataprocCreateBatchOperator 時,您會使用 Dataproc Metastore 服務,並設定這個變數。
  • region_name:Dataproc Metastore 服務所在的區域。使用 DataprocCreateBatchOperator 時,您會使用 Dataproc Metastore 服務,並設定這個變數。

使用 Google Cloud 控制台和 Airflow 使用者介面設定每個 Airflow 變數

  1. 前往 Google Cloud 控制台的「Environments」頁面。

    前往「環境」

  2. 在環境清單中,按一下環境的「Airflow」連結。Airflow UI 隨即開啟。

  3. 在 Airflow UI 中,依序選取「Admin」>「Variables」

  4. 按一下「新增記錄」

  5. 在「鍵」欄位中指定變數名稱,並在「值」欄位中設定變數值。

  6. 按一下 [儲存]

建立永久記錄伺服器

使用永久記錄伺服器 (PHS) 查看批次工作負載的 Spark 記錄檔:

  1. 建立永久記錄伺服器
  2. 請確認您已在 phs_cluster Airflow 變數中指定 PHS 叢集的名稱。

DataprocCreateBatchOperator

下列 DAG 會啟動 Dataproc Serverless 批次工作負載。

如要進一步瞭解 DataprocCreateBatchOperator 引數,請參閱運算子的原始碼

如要進一步瞭解可在 DataprocCreateBatchOperatorbatch 參數中傳遞的屬性,請參閱 Batch 類別的說明


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

使用 DataprocCreateBatchOperator 搭配自訂容器映像檔

下列範例說明如何使用自訂容器映像檔執行工作負載。舉例來說,您可以使用自訂容器,新增預設容器映像檔未提供的 Python 依附元件。

如要使用自訂容器映像檔,請按照下列步驟操作:

  1. 建立自訂容器映像檔並上傳至 Container Registry

  2. image_name Airflow 變數中指定映像檔。

  3. 使用 DataprocCreateBatchOperator 搭配自訂映像檔:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

使用 DataprocCreateBatchOperator 搭配 Dataproc Metastore 服務

如要從 DAG 使用 Dataproc Metastore 服務,請按照下列步驟操作:

  1. 確認 Metastore 服務是否已啟動。

    如要瞭解如何啟動 Metastore 服務,請參閱「啟用及停用 Dataproc Metastore」。

    如要進一步瞭解建立設定的批次運算子,請參閱「PeripheralsConfig」。

  2. Metastore 服務啟動並執行後,請在 metastore_cluster 變數中指定服務名稱,並在 region_name Airflow 變數中指定服務區域。

  3. 在 DataprocCreateBatchOperator 中使用中繼存放區服務:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

您可以使用 DataprocDeleteBatchOperator,根據工作負載的批次 ID 刪除批次。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator 會列出特定 project_id 和區域內現有的批次。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator 會擷取特定批次工作負載。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

後續步驟