Ejecutar cargas de trabajo de Dataproc sin servidor con Cloud Composer

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se describe cómo usar Cloud Composer 2 para ejecutar cargas de trabajo de Dataproc Serverless enGoogle Cloud.

En los ejemplos de las siguientes secciones se muestra cómo usar operadores para gestionar cargas de trabajo por lotes de Dataproc Serverless. Estos operadores se usan en DAGs que crean, eliminan, enumeran y obtienen una carga de trabajo por lotes de Dataproc Serverless Spark:

Antes de empezar

  1. Habilita la API de Dataproc:

    Consola

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. Selecciona la ubicación del archivo de carga de trabajo de Batch. Puedes usar cualquiera de las siguientes opciones:

    • Crea un segmento de Cloud Storage que almacene este archivo.
    • Usa el segmento de tu entorno. Como no necesitas sincronizar este archivo con Airflow, puedes crear una subcarpeta independiente fuera de las carpetas /dags o /data. Por ejemplo, /batches.
    • Usa un segmento que ya tengas.

Configurar archivos y variables de Airflow

En esta sección se muestra cómo configurar los archivos y las variables de Airflow para este tutorial.

Subir un archivo de carga de trabajo de aprendizaje automático de Spark de Dataproc Serverless a un segmento

La carga de trabajo de este tutorial ejecuta una secuencia de comandos de pyspark:

  1. Guarda cualquier secuencia de comandos de pyspark en un archivo local llamado spark-job.py. Por ejemplo, puedes usar el script de pyspark de ejemplo.

  2. Sube el archivo a la ubicación que hayas seleccionado en la sección Antes de empezar.

Definir variables de Airflow

En los ejemplos de las siguientes secciones se usan variables de Airflow. Puedes definir los valores de estas variables en Airflow y, después, el código de tu DAG podrá acceder a ellos.

En los ejemplos de este tutorial se usan las siguientes variables de Airflow. Puedes configurarlos según sea necesario, en función del ejemplo que utilices.

Define las siguientes variables de Airflow para usarlas en el código de tu DAG:

Usa la consola Google Cloud y la interfaz de usuario de Airflow para definir cada variable de Airflow

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, haga clic en el enlace Airflow del entorno. Se abrirá la interfaz de usuario de Airflow.

  3. En la interfaz de Airflow, seleccione Administrar > Variables.

  4. Haz clic en Add a new record (Añadir un registro nuevo).

  5. Especifica el nombre de la variable en el campo Clave y asigna el valor correspondiente en el campo Valor.

  6. Haz clic en Guardar.

Crear un servidor de historial persistente

Usa un servidor de historial persistente (PHS) para ver los archivos de historial de Spark de tus cargas de trabajo por lotes:

  1. Crea un servidor de historial persistente.
  2. Asegúrate de haber especificado el nombre del clúster de PHS en la phs_cluster variable de Airflow.

DataprocCreateBatchOperator

El siguiente DAG inicia una carga de trabajo de Dataproc Serverless Batch.

Para obtener más información sobre los argumentos de DataprocCreateBatchOperator, consulta el código fuente del operador.

Para obtener más información sobre los atributos que puede incluir en el parámetro batch de DataprocCreateBatchOperator, consulte la descripción de la clase Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Usar una imagen de contenedor personalizada con DataprocCreateBatchOperator

En el siguiente ejemplo se muestra cómo usar una imagen de contenedor personalizada para ejecutar tus cargas de trabajo. Puedes usar un contenedor personalizado, por ejemplo, para añadir dependencias de Python que no proporciona la imagen de contenedor predeterminada.

Para usar una imagen de contenedor personalizada, sigue estos pasos:

  1. Crea una imagen de contenedor personalizada y súbela a Container Registry.

  2. Especifique la imagen en la image_name variable de Airflow.

  3. Usa DataprocCreateBatchOperator con tu imagen personalizada:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Usar el servicio Dataproc Metastore con DataprocCreateBatchOperator

Para usar un servicio Dataproc Metastore desde un DAG, sigue estos pasos:

  1. Comprueba que el servicio metastore ya se haya iniciado.

    Para obtener información sobre cómo iniciar un servicio de metastore, consulta Habilitar e inhabilitar Dataproc Metastore.

    Para obtener información detallada sobre el operador de lote para crear la configuración, consulta PeripheralsConfig.

  2. Una vez que el servicio del almacén de metadatos esté en funcionamiento, especifica su nombre en la variable metastore_cluster y su región en la variable region_name Airflow.

  3. Usa el servicio metastore en DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Puede usar DataprocDeleteBatchOperator para eliminar un lote en función del ID del lote de la carga de trabajo.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator muestra las tareas por lotes que existen en un project_id y una región determinados.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator obtiene una carga de trabajo por lotes concreta.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

Siguientes pasos