Menjalankan workload Dataproc Serverless dengan Cloud Composer

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini menjelaskan cara menggunakan Cloud Composer 2 untuk menjalankan Workload Dataproc Serverless aktif Google Cloud.

Contoh di bagian berikut menunjukkan cara menggunakan operator untuk mengelola workload batch Dataproc Serverless. Anda menggunakan operator ini di DAG yang membuat, menghapus, mencantumkan, dan mendapatkan workload batch Dataproc Serverless Spark:

Sebelum memulai

  1. Aktifkan Dataproc API:

    Konsol

    Aktifkan API Dataproc.

    Mengaktifkan API

    gcloud

    Aktifkan API Dataproc:

    gcloud services enable dataproc.googleapis.com

  2. Pilih lokasi untuk file workload Batch Anda. Anda dapat menggunakan salah satu opsi berikut:

    • Membuat bucket Cloud Storage yang menyimpan file tersebut.
    • Menggunakan bucket lingkungan Anda. Karena Anda tidak perlu menyinkronkan file ini dengan Airflow, Anda dapat membuat subfolder terpisah di luar /dags atau /data. Misalnya, /batches.
    • Gunakan bucket yang sudah ada.

Menyiapkan file dan variabel Airflow

Bagian ini menunjukkan cara menyiapkan file dan mengonfigurasi variabel Airflow untuk tutorial ini.

Mengupload file workload Dataproc Serverless Spark ML ke bucket

Beban kerja dalam tutorial ini menjalankan skrip pyspark:

  1. Simpan skrip pyspark ke file lokal bernama spark-job.py. Misalnya, Anda dapat menggunakan contoh skrip pyspark.

  2. Upload file ke lokasi yang dipilih di bagian Sebelum memulai.

Menyetel variable Airflow

Contoh di bagian berikut menggunakan variabel Airflow. Anda menetapkan nilai untuk variabel ini di Airflow, kode DAG Anda dapat mengakses nilai tersebut.

Contoh dalam tutorial ini menggunakan variabel Airflow berikut. Anda dapat menyetelnya sesuai kebutuhan, tergantung pada contoh yang Anda gunakan.

Tetapkan variabel Airflow berikut untuk digunakan dalam kode DAG Anda:

Menggunakan Konsol Google Cloud dan UI Airflow untuk menetapkan setiap variabel Airflow

  1. Di Konsol Google Cloud, buka halaman Environments.

    Buka Lingkungan

  2. Dalam daftar lingkungan, klik link Airflow untuk lingkungan fleksibel App Engine. UI Airflow akan terbuka.

  3. Di UI Airflow, pilih Admin > Variabel.

  4. Klik Add a new record.

  5. Tentukan nama variabel di kolom Kunci, dan tetapkan nilai untuk di kolom Val.

  6. Klik Simpan.

Membuat Server Histori Persisten

Gunakan Persistent History Server (PHS) untuk melihat file histori Spark batch Anda workload:

  1. Membuat Server Histori Persisten.
  2. Pastikan Anda telah menentukan nama cluster PHS di phs_cluster Variabel Airflow.

DataprocCreateBatchOperator

DAG berikut memulai workload Dataproc Serverless Batch.

Untuk mengetahui informasi selengkapnya tentang argumen DataprocCreateBatchOperator, lihat kode sumber operator.

Untuk informasi selengkapnya tentang atribut yang dapat Anda teruskan dalam batch dari DataprocCreateBatchOperator, lihat deskripsi class Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Menggunakan gambar container kustom dengan DataprocCreateBatchOperator

Contoh berikut menunjukkan cara menggunakan image container kustom untuk menjalankan sebagian besar workload standar dan berbasis cloud. Anda dapat menggunakan container kustom, misalnya, untuk menambahkan Python dependensi yang tidak disediakan oleh image container default.

Untuk menggunakan image container kustom:

  1. Buat image container kustom dan upload ke Container Registry.

  2. Tentukan gambar dalam variabel Airflow image_name.

  3. Gunakan DataprocCreateBatchOperator dengan gambar kustom Anda:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Menggunakan layanan Metastore Dataproc dengan DataprocCreateBatchOperator

Untuk menggunakan layanan Dataproc Metastore dari DAG:

  1. Pastikan layanan metastore sudah dimulai.

    Untuk mempelajari cara memulai layanan metastore, lihat Aktifkan dan nonaktifkan Dataproc Metastore.

    Untuk informasi terperinci tentang operator batch untuk membuat konfigurasi, lihat PeripheralsConfig.

  2. Setelah layanan metastore aktif dan berjalan, tentukan namanya di variabel metastore_cluster dan regionnya dalam variabel Airflow region_name.

  3. Gunakan layanan metastore di DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Anda dapat menggunakan DataprocDeleteBatchOperator untuk menghapus batch berdasarkan ID batch dari beban kerja.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator mencantumkan batch yang ada dalam project_id tertentu dan region.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator mengambil satu workload batch tertentu.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

Langkah selanjutnya