Cloud Composer を使用して Dataproc サーバーレス ワークロードを実行する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Cloud Composer 2 を使用して Google Cloud で Dataproc サーバーレス ワークロードを実行する方法について説明します。

次のセクションの例では、Dataproc サーバーレスのバッチ ワークロードを管理する演算子を使用する方法を示しています。これらの演算子は、Dataproc サーバーレス Spark バッチ ワークロードの作成、削除、一覧表示、取得を行う DAG で使用します。

始める前に

  1. Dataproc API を有効にします。

    コンソール

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. Batch ワークロード ファイルの場所を選択します。次のいずれかのオプションを使用できます。

    • このファイルを格納する Cloud Storage バケットを作成する。
    • 環境のバケットを使用する。このファイルを Airflow と同期する必要がないため、/dags フォルダまたは /data フォルダの外に別のサブフォルダを作成できます。例: /batches
    • 既存のバケットを使用します。

ファイルと Airflow 変数を設定する

このセクションでは、このチュートリアルで使用するファイルを設定し、Airflow 変数を構成する方法について説明します。

Dataproc Serverless Spark ML ワークロード ファイルをバケットにアップロードする

このチュートリアルのワークロードは、pyspark スクリプトを実行します。

  1. pyspark スクリプトを spark-job.py という名前のローカル ファイルに保存します。たとえば、サンプルの pyspark スクリプトを使用できます。

  2. 始める前にで選択した場所にファイルをアップロードします。

Airflow 変数を設定する

次のセクションの例では、Airflow 変数を使用しています。これらの変数の値を Airflow で設定すると、DAG コードがこれらの値にアクセスできるようになります。

このチュートリアルの例では、次の Airflow 変数を使用します。使用する例によっては、必要に応じて設定できます。

DAG コードで使用する次の Airflow 変数を設定します。

Google Cloud コンソールと Airflow UI を使用して各 Airflow 変数を設定する

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、使用中の環境の [Airflow] リンクをクリックします。Airflow UI が開きます。

  3. Airflow UI で、[管理] > [変数] を選択します。

  4. [新しいレコードの追加] をクリックします。

  5. [キー] フィールドに変数の名前を指定し、[] フィールドにその変数の値を設定します。

  6. [保存] をクリックします。

永続履歴サーバーを作成する

永続的履歴サーバー(PHS)を使用して、バッチ ワークロードの Spark 履歴ファイルを表示します。

  1. 永続履歴サーバーを作成します
  2. phs_cluster Airflow 変数で PHS クラスタの名前を指定していることを確認します。

DataprocCreateBatchOperator

次の DAG は、Dataproc Serverless Batch ワークロードを開始します。

DataprocCreateBatchOperator 引数の詳細については、演算子のソースコードをご覧ください。

DataprocCreateBatchOperatorbatch パラメータで渡すことができる属性の詳細については、Batch クラスの説明をご覧ください。


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

DataprocCreateBatchOperator でカスタム コンテナ イメージを使用する

次の例は、カスタム コンテナ イメージを使用してワークロードを実行する方法を示しています。カスタム コンテナを使用すると、デフォルトのコンテナ イメージで提供されていない Python の依存関係を追加できます。

カスタム コンテナ イメージを使用するには:

  1. カスタム コンテナ イメージを作成して Container Registry にアップロードします。

  2. image_name Airflow 変数でイメージを指定します。

  3. カスタム イメージで DataprocCreateBatchOperator を使用します。

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

DataprocCreateBatchOperator で Dataproc Metastore サービスを使用する

DAG から Dataproc Metastore サービスを使用するには:

  1. メタストア サービスがすでに起動していることを確認します。

    メタストア サービスの起動については、Dataproc Metastore を有効または無効にするをご覧ください。

    構成を作成するためのバッチ演算子の詳細については、PeripheralsConfig をご覧ください。

  2. メタストア サービスが稼働したら、metastore_cluster 変数でその名前を指定し、region_name Airflow 変数でリージョンを指定します。

  3. DataprocCreateBatchOperator で metastore サービスを使用するには:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

DataprocDeleteBatchOperator を使用すると、ワークロードのバッチ ID に基づいてバッチを削除できます。

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator は、指定された project_id とリージョン内に存在するバッチを一覧表示します。

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator は、特定のバッチ ワークロードをフェッチします。

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

次のステップ