Serverlose Dataproc-Arbeitslasten mit Cloud Composer ausführen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Auf dieser Seite wird beschrieben, wie Sie Cloud Composer 2 zur Ausführung von Dataproc Serverless-Arbeitslasten auf Google Cloud

Beispiele in den folgenden Abschnitten zeigen, wie Sie Operatoren zur Verwaltung serverloser Dataproc-Batcharbeitslasten. Sie verwenden diese Operatoren in DAGs, die Löschen, Auflisten und Abrufen einer serverlosen Spark-Batcharbeitslast von Dataproc:

Hinweise

  1. Aktivieren Sie die Dataproc API:

    Console

    Dataproc API aktivieren.

    Aktivieren Sie die API

    gcloud

    Aktivieren Sie die Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. Wählen Sie den Speicherort für die Batch-Arbeitslastdatei aus. Sie können eine der folgende Optionen:

    • Cloud Storage-Bucket erstellen, der speichert diese Datei.
    • Verwenden Sie den Bucket Ihrer Umgebung. Weil Sie diese Datei nicht synchronisieren müssen können Sie mit Airflow einen separaten Unterordner außerhalb von /dags erstellen. oder /data Ordner. Beispiel: /batches
    • Verwenden Sie einen vorhandenen Bucket.

Dateien und Airflow-Variablen einrichten

In diesem Abschnitt wird gezeigt, wie Sie Dateien einrichten und Airflow-Variablen für diese Anleitung konfigurieren.

Serverlose Spark ML-Arbeitslastdatei von Dataproc in einen Bucket hochladen

Die Arbeitslast in dieser Anleitung führt ein PySpark-Skript aus:

  1. Speichern Sie ein beliebiges Pyspark-Skript in einer lokalen Datei mit dem Namen spark-job.py. Sie können beispielsweise die Methode Pyspark-Beispielskript

  2. Laden Sie die Datei in den ausgewählten Speicherort hoch. unter Hinweise.

Airflow-Variablen festlegen

In den Beispielen in den folgenden Abschnitten werden Airflow-Variablen verwendet. Sie legen Werte für können Sie mit Ihrem DAG-Code auf diese Werte zugreifen.

In den Beispielen dieser Anleitung werden die folgenden Airflow-Variablen verwendet. Sie können sie festlegen je nach verwendetem Beispiel.

Legen Sie die folgenden Airflow-Variablen für die Verwendung in Ihrem DAG-Code fest:

  • project_id: Projekt-ID.
  • bucket_name: URI eines Buckets, in dem die Python-Hauptdatei von sich die Arbeitslast (spark-job.py) befindet. Sie haben diesen Standort ausgewählt in Hinweise
  • phs_cluster: Name des Persistent History Server-Clusters. Sie legen diese Variable fest wenn Sie einen Persistent History Server erstellen.
  • image_name: Name und Tag des benutzerdefinierten Container-Images (image:tag). Ich diese Variable festlegen, Benutzerdefiniertes Container-Image verwenden mit DataprocCreateBatchOperator.
  • metastore_cluster: Name des Dataproc Metastore-Dienstes. Sie legen diese Variable fest, Dataproc Metastore-Dienst verwenden mit DataprocCreateBatchOperator.
  • region_name: Region, in der der Dataproc Metastore-Dienst ausgeführt wird befindet. Sie legen diese Variable fest, Dataproc Metastore-Dienst verwenden mit DataprocCreateBatchOperator.

Mit der Google Cloud Console und der Airflow-UI die einzelnen Airflow-Variablen festlegen

  1. Rufen Sie in der Google Cloud Console die Seite Umgebungen auf.

    Zur Seite Umgebungen

  2. Klicken Sie in der Liste der Umgebungen auf den Link Airflow für Ihre zu verbessern. Die Airflow-UI wird geöffnet.

  3. Wählen Sie in der Airflow-Benutzeroberfläche Admin > Variablen aus.

  4. Klicken Sie auf Neuen Eintrag hinzufügen.

  5. Geben Sie im Feld Schlüssel den Namen der Variablen an und legen Sie den Wert für es in das Feld Val ein.

  6. Klicken Sie auf Speichern.

Persistent History Server erstellen

Einen Persistent History Server (PHS) verwenden, um die Spark-Verlaufsdateien Ihres Batches anzusehen Arbeitslasten:

  1. Erstellen Sie einen Persistent History Server.
  2. Achten Sie darauf, dass Sie den Namen des PHS-Clusters im phs_cluster Airflow-Variable.

DataprocCreateBatchOperator

Der folgende DAG startet eine serverlose Dataproc-Batcharbeitslast.

Weitere Informationen zu DataprocCreateBatchOperator-Argumenten finden Sie unter Quellcode des Operators.

Weitere Informationen zu Attributen, die Sie im Feld batch übergeben können von DataprocCreateBatchOperator, siehe Beschreibung der Batch-Klasse


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Benutzerdefiniertes Container-Image mit DataprocCreateBatchOperator verwenden

Das folgende Beispiel zeigt, wie Sie ein benutzerdefiniertes Container-Image zum Ausführen Ihrer Arbeitsbelastungen. Sie können beispielsweise einen benutzerdefinierten Container verwenden, um Python hinzuzufügen. Abhängigkeiten, die nicht vom Standard-Container-Image bereitgestellt werden.

So verwenden Sie ein benutzerdefiniertes Container-Image:

  1. Benutzerdefiniertes Container-Image erstellen und in Container Registry hochladen

  2. Geben Sie das Image in der Airflow-Variable image_name an.

  3. Verwenden Sie DataprocCreateBatchOperator mit Ihrem benutzerdefinierten Image:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Dataproc Metastore-Dienst mit DataprocCreateBatchOperator verwenden

Dataproc Metastore-Dienst verwenden aus einem DAG:

  1. Prüfen Sie, ob der Metastore-Dienst bereits gestartet wurde.

    Informationen zum Starten eines Metastore-Dienstes finden Sie unter Dataproc Metastore aktivieren und deaktivieren

    Ausführliche Informationen zum Batch-Operator zum Erstellen der Konfiguration finden Sie unter PeripheralsConfig

  2. Sobald der Metastore-Dienst betriebsbereit ist, geben Sie seinen Namen in die Variable metastore_cluster und ihre Region in der Airflow-Variable region_name.

  3. Verwenden Sie den Metastore-Dienst in DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Mit DataprocDeleteBatchOperator können Sie einen Batch auf Grundlage der Batch-ID löschen der Arbeitslast.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator listet Batches auf, die in einer bestimmten Projekt-ID vorhanden sind und Region.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator ruft eine bestimmte Batcharbeitslast ab.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

Nächste Schritte