Ejecuta cargas de trabajo de Dataproc Serverless con Cloud Composer

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se describe cómo usar Cloud Composer 2 para ejecutar Dataproc sin servidores en en Google Cloud.

Los ejemplos de las siguientes secciones muestran cómo usar operadores para administrar las cargas de trabajo por lotes de Dataproc Serverless. Usas estos operadores en DAG que crean, borrar, enumerar y obtener una carga de trabajo por lotes de Spark sin servidores de Dataproc:

Antes de comenzar

  1. Habilita la API de Dataproc:

    Console

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. Selecciona la ubicación del archivo de carga de trabajo por lotes. Puedes usar cualquiera de los las siguientes opciones:

    • Crea un bucket de Cloud Storage que almacene este archivo.
    • Usa el bucket de tu entorno. Porque no necesitas sincronizar este archivo Con Airflow, puedes crear una subcarpeta separada fuera de /dags o /data. Por ejemplo, /batches.
    • Usa un bucket existente.

Configura archivos y variables de Airflow

En esta sección, se muestra cómo configurar archivos y variables de Airflow para este instructivo.

Sube un archivo de carga de trabajo de Spark ML sin servidores de Dataproc a un bucket

La carga de trabajo de este instructivo ejecuta una secuencia de comandos pyspark:

  1. Guarda cualquier secuencia de comandos pyspark en un archivo local llamado spark-job.py. Por ejemplo, puedes usar la secuencia de comandos de pyspark de muestra.

  2. Sube el archivo a la ubicación que seleccionaste en Antes de comenzar.

Configura variables de Airflow

En los ejemplos de las siguientes secciones, se usan variables de Airflow. Estableciste valores para estas variables en Airflow, tu código DAG podrá acceder a estos valores.

En los ejemplos de este instructivo, se usan las siguientes variables de Airflow. Puedes establecer según sea necesario, según el ejemplo que uses.

Configura las siguientes variables de Airflow para usarlas en tu código de DAG:

  • project_id: ID del proyecto.
  • bucket_name: URI de un bucket en el que se encuentra el archivo principal de Python de se encuentra la carga de trabajo (spark-job.py). Seleccionaste esta ubicación en Antes de comenzar.
  • phs_cluster: Es el nombre del clúster del servidor de historial persistente. Estableces esta variable cuando creas un servidor de historial persistente.
  • image_name: Es el nombre y la etiqueta de la imagen del contenedor personalizado (image:tag). Tú establece esta variable cuando usar una imagen de contenedor personalizada con DataprocCreateBatchOperator.
  • metastore_cluster: Es el nombre del servicio de Dataproc Metastore. Puedes establecer esta variable cuando usar el servicio de Dataproc Metastore con DataprocCreateBatchOperator.
  • region_name: Es la región en la que se encuentra el servicio de Dataproc Metastore. el código fuente. Estableces esta variable cuando usas el servicio de Dataproc Metastore con DataprocCreateBatchOperator.

Usa la consola de Google Cloud y la IU de Airflow para establecer cada variable de Airflow

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haz clic en el vínculo Airflow para tu en un entorno de nube. Se abrirá la IU de Airflow.

  3. En la IU de Airflow, selecciona Administrador > Variables.

  4. Haz clic en Agregar un registro nuevo.

  5. Especifica el nombre de la variable en el campo Clave y establece su valor en el campo Val.

  6. Haz clic en Guardar.

Crea un servidor de historial persistente

Usa un servidor de historial persistente (PHS) para ver los archivos de historial de Spark de tu lote cargas de trabajo:

  1. Crea un servidor de historial persistente.
  2. Asegúrate de haber especificado el nombre del clúster de PHS en el phs_cluster Variable de Airflow.

DataprocCreateBatchOperator

El siguiente DAG inicia una carga de trabajo por lotes de Dataproc Serverless.

Para obtener más información sobre los argumentos DataprocCreateBatchOperator, consulta código fuente del operador.

Para obtener más información sobre los atributos que puedes pasar en el archivo batch parámetro de DataprocCreateBatchOperator, consulta la descripción de la clase Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Usa una imagen de contenedor personalizada con DataprocCreateBatchOperator

En el siguiente ejemplo, se muestra cómo usar una imagen de contenedor personalizado para ejecutar tus cargas de trabajo. Puedes usar un contenedor personalizado, por ejemplo, para agregar dependencias de Python que no proporcionó la imagen de contenedor predeterminada.

Para usar una imagen de contenedor personalizada, sigue estos pasos:

  1. Crea una imagen de contenedor personalizada y súbela a Container Registry.

  2. Especifica la imagen en la variable de Airflow image_name.

  3. Usa DataprocCreateBatchOperator con tu imagen personalizada:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Usa el servicio de Dataproc Metastore con DataprocCreateBatchOperator

Para usar un servicio de Dataproc Metastore desde un DAG, haz lo siguiente:

  1. Verifica que el servicio de metastore ya se haya iniciado.

    Para obtener información sobre cómo iniciar un servicio de almacén de metadatos, consulta Habilita o inhabilita Dataproc Metastore.

    Si deseas obtener información detallada sobre el operador de lotes para crear la configuración, consulta PeripheralsConfig

  2. Una vez que el servicio de almacén de metadatos esté en funcionamiento, especifica su nombre en La variable metastore_cluster y su región en la variable de Airflow region_name

  3. Usa el servicio de metastore en DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Puedes usar DataprocDeleteBatchOperator para borrar un lote según el ID del lote de la carga de trabajo.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator enumera los lotes que existen dentro de un project_id determinado. y región.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator recupera una carga de trabajo por lotes particular.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

¿Qué sigue?