Exécuter des charges de travail Dataproc sans serveur avec Cloud Composer

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment utiliser Cloud Composer 2 pour exécuter des charges de travail Dataproc sans serveur sur Google Cloud.

Les exemples figurant dans les sections suivantes vous expliquent comment utiliser les opérateurs pour gérer les charges de travail par lot Dataproc sans serveur. Vous utilisez ces opérateurs dans les DAG qui créent, suppriment, répertorient et obtiennent une charge de travail par lot Spark sans serveur Dataproc:

Avant de commencer

  1. Activez l'API Dataproc:

    Console

    Activez l'API Dataproc

    Activer l'API

    gcloud

    Activer l'API Dataproc :

    gcloud services enable dataproc.googleapis.com

  2. Sélectionnez l'emplacement du fichier de votre charge de travail Batch. Vous pouvez utiliser l'une des options suivantes:

    • Créez un bucket Cloud Storage dans lequel stocker ce fichier.
    • Utilisez le bucket de votre environnement. Étant donné que vous n'avez pas besoin de synchroniser ce fichier avec Airflow, vous pouvez créer un sous-dossier distinct en dehors des dossiers /dags ou /data. Exemple :/batches
    • Utiliser un bucket existant

Configurer des fichiers et des variables Airflow

Cette section explique comment configurer des fichiers et des variables Airflow pour ce tutoriel.

Importer un fichier de charge de travail Spark ML sans serveur Dataproc dans un bucket

La charge de travail de ce tutoriel exécute un script pyspark:

  1. Enregistrez le script pyspark dans un fichier local nommé spark-job.py. Vous pouvez utiliser l'exemple de script pyspark.

  2. Importez le fichier à l'emplacement que vous avez sélectionné à la section Avant de commencer.

Définir les variables Airflow

Les exemples des sections suivantes utilisent des variables Airflow. Vous définissez des valeurs pour ces variables dans Airflow, puis votre code DAG peut accéder à ces valeurs.

Les exemples de ce tutoriel utilisent les variables Airflow suivantes. Vous pouvez les définir si nécessaire, selon l'exemple que vous utilisez.

Définissez les variables Airflow suivantes à utiliser dans le code de votre DAG:

Définir chaque variable Airflow à l'aide de la console Google Cloud et de l'interface utilisateur Airflow

  1. Dans la console Google Cloud, accédez à la page Environnements.

    Accéder à la page Environnements

  2. Dans la liste des environnements, cliquez sur le lien Airflow correspondant à votre environnement. L'interface utilisateur Airflow s'ouvre.

  3. Dans l'interface utilisateur Airflow, sélectionnez Admin > Variables.

  4. Cliquez sur Ajouter un enregistrement.

  5. Indiquez le nom de la variable dans le champ Key (Clé) et définissez sa valeur dans le champ Val.

  6. Cliquez sur Enregistrer.

Créer un serveur d'historique persistant

Utilisez un serveur d'historique persistant (PHS) pour afficher les fichiers d'historique Spark de vos charges de travail par lot:

  1. Créez un serveur d'historique persistant.
  2. Assurez-vous d'avoir spécifié le nom du cluster PHS dans la variable Airflow phs_cluster.

DataprocCreateBatchOperator

Le DAG suivant démarre une charge de travail par lot Dataproc sans serveur.

Pour en savoir plus sur les arguments DataprocCreateBatchOperator, consultez le code source de l'opérateur.

Pour en savoir plus sur les attributs que vous pouvez transmettre dans le paramètre batch de DataprocCreateBatchOperator, consultez la description de la classe Batch.


"""
Examples below show how to use operators for managing Dataproc Serverless batch workloads.
 You use these operators in DAGs that create, delete, list, and get a Dataproc Serverless Spark batch workload.
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id is the Google Cloud Project ID to use for the Cloud Dataproc Serverless.
* bucket_name is the URI of a bucket where the main python file of the workload (spark-job.py) is located.
* phs_cluster is the Persistent History Server cluster name.
* image_name is the name and tag of the custom container image (image:tag).
* metastore_cluster is the Dataproc Metastore service name.
* region_name is the region where the Dataproc Metastore service is located.
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
    DataprocCreateBatchOperator,
    DataprocDeleteBatchOperator,
    DataprocGetBatchOperator,
    DataprocListBatchesOperator,
)
from airflow.utils.dates import days_ago

PROJECT_ID = "{{ var.value.project_id }}"
REGION = "{{ var.value.region_name}}"
BUCKET = "{{ var.value.bucket_name }}"
PHS_CLUSTER = "{{ var.value.phs_cluster }}"
METASTORE_CLUSTER = "{{var.value.metastore_cluster}}"
DOCKER_IMAGE = "{{var.value.image_name}}"

PYTHON_FILE_LOCATION = "gs://{{var.value.bucket_name }}/spark-job.py"
# for e.g.  "gs//my-bucket/spark-job.py"
# Start a single node Dataproc Cluster for viewing Persistent History of Spark jobs
PHS_CLUSTER_PATH = "projects/{{ var.value.project_id }}/regions/{{ var.value.region_name}}/clusters/{{ var.value.phs_cluster }}"
# for e.g. projects/my-project/regions/my-region/clusters/my-cluster"
SPARK_BIGQUERY_JAR_FILE = "gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
# use this for those pyspark jobs that need a spark-bigquery connector
# https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example
# Start a Dataproc MetaStore Cluster
METASTORE_SERVICE_LOCATION = "projects/{{var.value.project_id}}/locations/{{var.value.region_name}}/services/{{var.value.metastore_cluster }}"
# for e.g. projects/my-project/locations/my-region/services/my-cluster
CUSTOM_CONTAINER = "us.gcr.io/{{var.value.project_id}}/{{ var.value.image_name}}"
# for e.g. "us.gcr.io/my-project/quickstart-image",

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "project_id": PROJECT_ID,
    "region": REGION,
}
with models.DAG(
    "dataproc_batch_operators",  # The id you will see in the DAG airflow page
    default_args=default_args,  # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    create_batch = DataprocCreateBatchOperator(
        task_id="batch_create",
        batch={
            "pyspark_batch": {
                "main_python_file_uri": PYTHON_FILE_LOCATION,
                "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
            },
            "environment_config": {
                "peripherals_config": {
                    "spark_history_server_config": {
                        "dataproc_cluster": PHS_CLUSTER_PATH,
                    },
                },
            },
        },
        batch_id="batch-create-phs",
    )
    list_batches = DataprocListBatchesOperator(
        task_id="list-all-batches",
    )

    get_batch = DataprocGetBatchOperator(
        task_id="get_batch",
        batch_id="batch-create-phs",
    )
    delete_batch = DataprocDeleteBatchOperator(
        task_id="delete_batch",
        batch_id="batch-create-phs",
    )
    create_batch >> list_batches >> get_batch >> delete_batch

Utiliser une image de conteneur personnalisé avec DataprocCreateBatchOperator

L'exemple suivant montre comment exécuter vos charges de travail à l'aide d'une image de conteneur personnalisé. Vous pouvez utiliser un conteneur personnalisé, par exemple, pour ajouter des dépendances Python non fournies par l'image de conteneur par défaut.

Pour utiliser une image de conteneur personnalisé:

  1. Créez une image de conteneur personnalisé et importez-la dans Container Registry.

  2. Spécifiez l'image dans la variable Airflow image_name.

  3. Utilisez DataprocCreateBatchOperator avec votre image personnalisée:

create_batch_with_custom_container = DataprocCreateBatchOperator(
    task_id="dataproc_custom_container",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "container_image": CUSTOM_CONTAINER,
        },
    },
    batch_id="batch-custom-container",
)
get_batch_custom = DataprocGetBatchOperator(
    task_id="get_batch_custom",
    batch_id="batch-custom-container",
)
delete_batch_custom = DataprocDeleteBatchOperator(
    task_id="delete_batch_custom",
    batch_id="batch-custom-container",
)
create_batch_with_custom_container >> get_batch_custom >> delete_batch_custom

Utiliser le service Dataproc Metastore avec DataprocCreateBatchOperator

Pour utiliser un service Dataproc Metastore à partir d'un DAG, procédez comme suit:

  1. Vérifiez que votre service de métastore est déjà démarré.

    Pour en savoir plus sur le démarrage d'un service de métastore, consultez la page Activer et désactiver Dataproc Metastore.

    Pour en savoir plus sur l'opérateur de traitement par lot pour la création de la configuration, consultez PeripheralsConfig.

  2. Une fois le service de métastore opérationnel, spécifiez son nom dans la variable metastore_cluster et sa région dans la variable Airflow region_name.

  3. Utilisez le service de métastore dans DataprocCreateBatchOperator:

create_batch_with_metastore = DataprocCreateBatchOperator(
    task_id="dataproc_metastore",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
            "jar_file_uris": [SPARK_BIGQUERY_JAR_FILE],
        },
        "environment_config": {
            "peripherals_config": {
                "metastore_service": METASTORE_SERVICE_LOCATION,
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
    },
    batch_id="dataproc-metastore",
)
get_batch_metastore = DataprocGetBatchOperator(
    task_id="get_batch_metatstore",
    batch_id="dataproc-metastore",
)
delete_batch_metastore = DataprocDeleteBatchOperator(
    task_id="delete_batch_metastore",
    batch_id="dataproc-metastore",
)

create_batch_with_metastore >> get_batch_metastore >> delete_batch_metastore

DataprocDeleteBatchOperator

Vous pouvez utiliser DataprocDeleteBatchOperator pour supprimer un lot en fonction de l'ID de lot de la charge de travail.

delete_batch = DataprocDeleteBatchOperator(
    task_id="delete_batch",
    batch_id="batch-create-phs",
)

DataprocListBatchesOperator

DataprocDeleteBatchOperator répertorie les lots qui existent dans un ID de projet et une région donnés.

list_batches = DataprocListBatchesOperator(
    task_id="list-all-batches",
)

DataprocGetBatchOperator

DataprocGetBatchOperator récupère une charge de travail par lot particulière.

get_batch = DataprocGetBatchOperator(
    task_id="get_batch",
    batch_id="batch-create-phs",
)

Étapes suivantes