Esegui carichi di lavoro serverless di Dataproc con Cloud Composer

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina descrive come utilizzare Cloud Composer 2 per eseguire Carichi di lavoro Dataproc Serverless on in Google Cloud.

Gli esempi nelle seguenti sezioni mostrano come utilizzare operatori per la gestione dei carichi di lavoro batch serverless di Dataproc. Questi operatori vengono utilizzati nei DAG che creano, elimina, elenca e recupera un carico di lavoro batch Spark serverless di Dataproc:

Prima di iniziare

  1. Abilita l'API Dataproc:

    Console

    Attiva l'API Dataproc.

    Abilita l'API

    gcloud

    Attiva l'API Dataproc.

    gcloud services enable dataproc.googleapis.com

  2. Seleziona la località per il file del carico di lavoro batch. Puoi utilizzare uno qualsiasi dei le seguenti opzioni:

    • Crea un bucket Cloud Storage che archivia questo file.
    • Utilizza il bucket del tuo ambiente. Perché non devi sincronizzare questo file con Airflow, puoi creare una sottocartella separata all'esterno di /dags o /data cartelle. Ad esempio: /batches.
    • Utilizza un bucket esistente.

configura file e variabili Airflow

Questa sezione illustra come impostare i file e configurare le variabili Airflow per questo tutorial.

Carica un file del carico di lavoro Spark ML serverless Dataproc in un bucket

Il carico di lavoro in questo tutorial esegue uno script pyspark:

  1. Salva qualsiasi script pyspark in un file locale denominato spark-job.py. Ad esempio, puoi utilizzare script pyspark di esempio.

  2. Carica il file nel percorso selezionato. in Prima di iniziare.

Imposta variabili Airflow

Gli esempi nelle sezioni seguenti utilizzano le variabili Airflow. Imposti i valori per queste variabili in Airflow, il codice DAG può accedere a questi valori.

Gli esempi in questo tutorial utilizzano le seguenti variabili Airflow. Puoi impostarle secondo necessità, a seconda dell'esempio utilizzato.

Imposta le seguenti variabili Airflow da utilizzare nel codice DAG:

Usa la console Google Cloud e la UI di Airflow per impostare ogni variabile Airflow

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, fai clic sul link Airflow per il tuo completamente gestito di Google Cloud. Si apre la UI di Airflow.

  3. Nella UI di Airflow, seleziona Amministratore > Variabili.

  4. Fai clic su Add a new record (Aggiungi un nuovo record).

  5. Specifica il nome della variabile nel campo Chiave e imposta il valore per nel campo Val (Valore).

  6. Fai clic su Salva.

Crea un server di cronologia permanente

Usa un server di cronologia permanente (PHS) per visualizzare i file di cronologia di Spark del tuo batch carichi di lavoro standard:

  1. Crea un server di cronologia permanente.
  2. Assicurati di aver specificato il nome del cluster PHS nel phs_cluster Variabile Airflow.

DataprocCreateBatchOperator

Il DAG seguente avvia un carico di lavoro batch serverless di Dataproc.

Per ulteriori informazioni sugli argomenti DataprocCreateBatchOperator, vedi codice sorgente dell'operatore.

Per ulteriori informazioni sugli attributi che puoi trasmettere nel batch di DataprocCreateBatchOperator, consulta le descrizione della classe Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Usa l'immagine container personalizzata con DataprocCreateBatchOperator

L'esempio seguente mostra come utilizzare un'immagine container personalizzata per eseguire carichi di lavoro con scale out impegnativi. Puoi utilizzare un container personalizzato, ad esempio, per aggiungere Python le dipendenze non fornite dall'immagine container predefinita.

Per utilizzare un'immagine container personalizzata:

  1. Crea un'immagine container personalizzata e caricala in Container Registry.

  2. Specifica l'immagine nella variabile Airflow image_name.

  3. Utilizza DataprocCreateBatchOperator con la tua immagine personalizzata:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Utilizzo del servizio Dataproc Metastore con DataprocCreateBatchOperator

Per utilizzare un servizio Dataproc Metastore da un DAG:

  1. Verifica che il servizio metastore sia già avviato.

    Per scoprire come avviare un servizio metastore, vedi Abilita e disabilita Dataproc Metastore.

    Per informazioni dettagliate sull'operatore batch per la creazione per la configurazione, consulta PeripheralsConfig.

  2. Quando il servizio metastore è attivo e in esecuzione, specifica il suo nome in la variabile metastore_cluster e la rispettiva regione nella variabile Airflow region_name.

  3. Usa il servizio metastore in DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Puoi utilizzare DataprocDeleteBatchOperator per eliminare un batch in base al suo ID del carico di lavoro.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator elenca i batch esistenti all'interno di un dato project_id e regione.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator recupera un determinato carico di lavoro batch.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

Passaggi successivi