Execute cargas de trabalho do Dataproc sem servidor com o Cloud Composer

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como usar o Cloud Composer 2 para executar cargas de trabalho do Dataproc sem servidor no Google Cloud.

Os exemplos nas secções seguintes mostram como usar operadores para gerir cargas de trabalho em lote sem servidor do Dataproc. Use estes operadores em DAGs que criam, eliminam, listam e obtêm uma carga de trabalho em lote do Dataproc Serverless Spark:

Antes de começar

  1. Ative a API Dataproc:

    Consola

    Enable the Dataproc API.

    Enable the API

    gcloud

    Enable the Dataproc API:

    gcloud services enable dataproc.googleapis.com

  2. Selecione a localização do ficheiro de carga de trabalho em lote. Pode usar qualquer uma das seguintes opções:

    • Crie um contentor do Cloud Storage que armazene este ficheiro.
    • Use o contentor do seu ambiente. Uma vez que não precisa de sincronizar este ficheiro com o Airflow, pode criar uma subpasta separada fora das pastas /dags ou /data. Por exemplo, /batches.
    • Use um contentor existente.

Configure ficheiros e variáveis do Airflow

Esta secção demonstra como configurar ficheiros e variáveis do Airflow para este tutorial.

Carregue um ficheiro de carga de trabalho de ML do Dataproc Serverless Spark para um contentor

A carga de trabalho neste tutorial executa um script pyspark:

  1. Guarde qualquer script pyspark num ficheiro local com o nome spark-job.py. Por exemplo, pode usar o script pyspark de exemplo.

  2. Carregue o ficheiro para a localização que selecionou em Antes de começar.

Defina variáveis do Airflow

Os exemplos nas secções seguintes usam variáveis do Airflow. Define valores para estas variáveis no Airflow e, em seguida, o código DAG pode aceder a estes valores.

Os exemplos neste tutorial usam as seguintes variáveis do Airflow. Pode defini-los conforme necessário, consoante o exemplo que usar.

Defina as seguintes variáveis do Airflow para utilização no código do DAG:

Use a Google Cloud consola e a IU do Airflow para definir cada variável do Airflow

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, clique no link Airflow para o seu ambiente. A IU do Airflow é aberta.

  3. Na IU do Airflow, selecione Admin > Variáveis.

  4. Clique em Adicionar um novo registo.

  5. Especifique o nome da variável no campo Chave e defina o valor para esta no campo Valor.

  6. Clique em Guardar.

Crie um servidor de histórico persistente

Use um servidor de histórico persistente (PHS) para ver os ficheiros de histórico do Spark das suas cargas de trabalho em lote:

  1. Crie um servidor de histórico persistente.
  2. Certifique-se de que especificou o nome do cluster do PHS na phs_cluster variável do Airflow.

DataprocCreateBatchOperator

O seguinte DAG inicia uma carga de trabalho em lote do Dataproc Serverless.

Para mais informações sobre os argumentos DataprocCreateBatchOperator, consulte o código fonte do operador.

Para mais informações sobre os atributos que pode transmitir no parâmetro batch de DataprocCreateBatchOperator, consulte a descrição da classe Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Use uma imagem de contentor personalizada com o DataprocCreateBatchOperator

O exemplo seguinte mostra como usar uma imagem de contentor personalizada para executar as suas cargas de trabalho. Pode usar um contentor personalizado, por exemplo, para adicionar dependências do Python que não são fornecidas pela imagem do contentor predefinida.

Para usar uma imagem de contentor personalizada:

  1. Crie uma imagem de contentor personalizada e carregue-a para o Container Registry.

  2. Especifique a imagem na image_name variável do Airflow.

  3. Use o DataprocCreateBatchOperator com a sua imagem personalizada:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Use o serviço Dataproc Metastore com o DataprocCreateBatchOperator

Para usar um serviço Dataproc Metastore a partir de um DAG:

  1. Verifique se o serviço de metastore já foi iniciado.

    Para saber como iniciar um serviço de metastore, consulte o artigo Ative e desative o Dataproc Metastore.

    Para obter informações detalhadas sobre o operador de lote para criar a configuração, consulte PeripheralsConfig.

  2. Assim que o serviço de metastore estiver em funcionamento, especifique o respetivo nome na variável metastore_cluster e a respetiva região na region_name variável do Airflow.

  3. Use o serviço de metastore em DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Pode usar o DataprocDeleteBatchOperator para eliminar um lote com base no ID do lote da carga de trabalho.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator apresenta as tarefas em lote existentes num determinado project_id e região.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

O DataprocGetBatchOperator obtém uma carga de trabalho em lote específica.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

O que se segue?