Lancer des pipelines Dataflow avec Cloud Composer

Cette page explique comment utiliser l'opérateur DataflowTemplateOperator pour lancer des pipelines Dataflow à partir de Cloud Composer. Le pipeline Texte Cloud Storage vers BigQuery est un pipeline par lot qui vous permet d'importer des fichiers texte stockés dans Cloud Storage, de les transformer à l'aide d'une fonction JavaScript définie par l'utilisateur (UDF) et de générer les résultats dans BigQuery.

Une fonction, un fichier d'entrée et un schéma JSON définis par l'utilisateur sont importés dans un bucket Cloud Storage. Un DAG qui référence ces fichiers lance un pipeline par lot Dataflow, qui applique la fonction définie par l'utilisateur et le fichier de schéma JSON à notre fichier d'entrée. Ensuite, ce contenu est importé dans une table BigQuery.

  • Avant de lancer le processus, nous devons créer les entités suivantes :

    • Une table BigQuery vide à partir d'un ensemble de données vide qui contient les colonnes d'informations suivantes : location, average_temperature, month et, éventuellement, inches_of_rain, is_current et latest_measurement.

    • Un fichier JSON qui normalise les données du fichier .txt au format approprié pour le schéma de notre table BigQuery. L'objet JSON comporte un tableau de BigQuery Schema, où chaque objet contient un nom de colonne, un type d'entrée et l'indication de champ obligatoire ou pas.

    • Un fichier d'entrée .txt contenant les données que nous souhaitons importer par lot dans notre table BigQuery.

    • Une fonction définie par l'utilisateur écrite en JavaScript qui transforme chaque ligne du fichier .txt en variables pertinentes pour notre table.

    • Un fichier de graphe orienté acrylique (DAG) qui pointe vers l'emplacement des fichiers mentionnés ci-dessus.

  • Ensuite, nous importons le fichier .txt, le fichier de fonction définie par l'utilisateur .js et le fichier de schéma .json dans un bucket Cloud Storage. Nous importons également le DAG dans notre environnement Cloud Composer.

  • Une fois le DAG importé, une tâche Airflow démarre. La tâche lance un pipeline Cloud Dataflow qui applique la fonction définie par l'utilisateur à notre fichier .txt et la met en forme en fonction du schéma JSON.

  • Enfin, les données sont importées dans la table BigQuery que nous avons créée précédemment.

Coûts

Ce tutoriel utilise les composants facturables de Google Cloud, dont :

  • Cloud Composer
  • Dataflow
  • Cloud Storage
  • BigQuery

Prérequis

Configurer votre environnement

Créer une table BigQuery vide avec une définition de schéma

Tout d'abord, vous allez créer une table BigQuery avec une définition de schéma. Vous utiliserez cette définition de schéma plus loin dans ce tutoriel. Cette table BigQuery contiendra les résultats de l'importation groupée.

Pour créer une table vide avec une définition de schéma :

Console

  1. Ouvrez l'UI Web de BigQuery dans Cloud Console.
    Accéder à l'UI Web de BigQuery

  2. Dans le panneau de navigation, dans la section Ressources, développez votre projet.

  3. Dans le panneau des détails situé à droite, cliquez sur Créer un ensemble de données.

cliquez sur le bouton CREATE DATASET (Créer un ensemble de données) à droite de la fenêtre.

  1. Sur la page "Create dataset", dans la section Dataset ID (ID de l'ensemble de données), nommez l'ensemble de données average_weather. Conservez les paramètres par défaut pour tous les autres champs.

Renseigner l'ID de l'ensemble de données avec le nom average_weather

  1. Cliquez sur Créer un ensemble de données.

  2. Revenez au panneau de navigation, puis, dans la section Ressources, développez votre projet. Cliquez ensuite sur l'ensemble de données average_weather.

  3. À droite de la fenêtre, dans le panneau de détails, cliquez sur Create table (Créer une table).

Cliquez sur

  1. Dans la section Source de la page Créer une table, sélectionnez Table vide.

  2. Dans la section Destination de la page Créer une table :

    • Dans le champ Dataset name (Nom de l'ensemble de données), sélectionnez l'ensemble de données average_weather.

      Choisissez l'ensemble de données average_weather.

    • Dans le champ Table name (Nom de la table), saisissez le nom average_weather.

    • Vérifiez que Type de table est défini sur Table native.

  3. Dans la section Schéma, saisissez la définition du schéma.

    • Indiquez manuellement les informations de schéma de l'une des manières suivantes :

      • Activez l'option Modifier sous forme de texte et saisissez le schéma de la table sous forme de tableau JSON. Saisissez les champs suivants pour cette option :

        [
        {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
        },
        {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
        },
        {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
        },
        {
        "name": "inches_of_rain",
        "type": "NUMERIC"
        },
        {
        "name": "is_current",
        "type": "BOOLEAN"
        },
        {
        "name": "latest_measurement",
        "type": "DATE"
        }
        ]
        

      • Utilisez l'option Ajouter un champ pour saisir manuellement le schéma.

      Cliquez sur

  4. Sous Paramètres de partitionnement et de clustering, conservez la valeur par défaut : No partitioning.

  5. Dans la section Options avancées, pour Chiffrement, conservez la valeur par défaut : Google-managed key. Par défaut, Compute Engine chiffre les contenus client stockés au repos.

  6. Cliquez sur Create table.

CLI

Utilisez la commande bq mk avec l'option --location pour créer un ensemble de données vide. Remplacez PROJECT_ID par votre ID de projet et LOCATION par l'emplacement de votre choix. Nous vous recommandons de choisir la même région que votre environnement Composer pour réduire la latence.

Copiez la commande suivante pour créer un ensemble de données des valeurs météo moyennes mondiales :

bq --location=LOCATION mk \
--dataset \
PROJECT_ID:average_weather

Pour créer une table vide dans cet ensemble de données avec notre définition de schéma, remplacez PROJECT_ID par votre ID de projet dans la commande ci-dessous, puis saisissez-le dans le terminal :

bq mk \
--table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Une fois la table créée, vous pouvez mettre à jour son délai d'expiration, sa description et ses libellés. Vous pouvez également modifier la définition du schéma.

Python

Avant de lancer l'exemple, veillez à exécuter la commande suivante pour installer la bibliothèque dans votre environnement :

pip install google.cloud.bigquery

Enregistrez ce code sous dataflowtemplateoperator_create_dataset_and_table_helper.py et mettez à jour les variables pour correspondre à votre projet et à votre emplacement, puis exécutez le fichier à l'aide de la commande suivante :

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Avant de tester cet exemple, suivez les instructions de configuration de Python dans le guide de démarrage rapide de Compute Engine – Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Compute Engine en langage Python.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = 'your-project'  # Your GCP Project
location = 'US'  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = 'average_weather'

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Créer un bucket de stockage

Ensuite, vous devez créer un bucket de stockage pour stocker tous les fichiers nécessaires au workflow. Le DAG que vous créerez à l'avenir référencera les fichiers que vous avez importés dans ce bucket de stockage. Pour créer un bucket de stockage, procédez comme suit :

Console

  1. Accédez à Cloud Storage dans Cloud Console.

    Ouvrir Cloud Storage

  2. Cliquez sur Créer un bucket pour ouvrir le formulaire de création de bucket.

  3. Saisissez les informations relatives à votre bucket et cliquez sur Continuer à chaque étape :

    • Spécifiez un nom encore jamais utilisé pour votre bucket (il sera référencé en tant que bucketName pour la suite du tutoriel).

    • Sélectionnez Régional comme type d'emplacement. Ensuite, sélectionnez l'(emplacement) où les données du bucket seront stockées de manière permanente.

    • Sélectionnez Standard comme classe de stockage par défaut pour vos données.

    • Sélectionnez Uniforme pour le contrôle d'accès à vos objets.

  4. Cliquez sur Terminé.

gsutil

  1. Utilisez la commande gsutil mb :
    gsutil mb gs://bucketName/
    

Python

Python

Avant de tester cet exemple, suivez les instructions de configuration de Python dans le guide de démarrage rapide de Compute Engine – Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Compute Engine en langage Python.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print("Bucket {} created".format(bucket.name))

Créer un schéma BigQuery au format JSON pour votre table de sortie

Créez un fichier de schéma BigQuery au format JSON correspondant à la table de sortie que vous avez créée précédemment. Notez que les noms, types et modes des champs doivent correspondre à ceux définis précédemment dans le schéma de votre table BigQuery. Ce fichier normalisera les données de votre fichier .txt dans un format compatible avec votre schéma BigQuery. Nommez ce fichier jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Créer un fichier JavaScript (.js) pour mettre en forme vos données

Dans ce fichier, vous définissez votre fonction définie par l'utilisateur qui fournit la logique permettant de transformer les lignes de texte de votre fichier d'entrée. Notez que cette fonction prend chaque ligne de texte de votre fichier d'entrée comme son propre argument. Par conséquent, elle s'exécute une fois pour chaque ligne de votre fichier d'entrée. Nommez ce fichier transformCSVtoJSON.js.

Python


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  const weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  var jsonString = JSON.stringify(weatherInCity);
  return jsonString;
}

Créer votre fichier d'entrée

Ce fichier contiendra les informations que vous souhaitez importer dans votre table BigQuery. Copiez ce fichier localement et nommez-le inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Importer vos fichiers dans un bucket Cloud Storage et créer un dossier de préproduction

Importez vos fichiers dans le bucket Cloud Storage que vous avez créé précédemment :

  • Schéma BigQuery au format JSON (.json)
  • Fonction définie par l'utilisateur JavaScript (transformCSVtoJSON.js)
  • Fichier d'entrée du texte à traiter (.txt)

Console

  1. Ouvrez le navigateur Cloud Storage dans Google Cloud Console.
    Ouvrir le navigateur Cloud Storage
  2. Dans la liste des buckets, cliquez sur le bucket bucketName.

  3. Dans l'onglet Objets du bucket, exécutez l'une des actions suivantes :

    • Effectuez un glisser-déposer des fichiers souhaités depuis votre bureau ou votre gestionnaire de fichiers vers le volet principal de Cloud Console.

    • Cliquez sur le bouton Importer des fichiers, sélectionnez les fichiers que vous souhaitez importer dans la boîte de dialogue qui s'affiche, puis cliquez sur Ouvrir.

gsutil

Exécutez la commande [gsutil cp] :

gsutil cp [OBJECT_LOCATION] gs://bucketName

Où :

  • [OBJECT_LOCATION] correspond au chemin d'accès local à votre objet. Exemple : Desktop/dog.png.

  • [bucketName] est le nom de bucket encore jamais utilisé que vous avez créé précédemment.

Si l'opération réussit, la réponse se présente comme suit :

Operation completed over 1 objects/58.8 KiB.

Python

Python

Avant de tester cet exemple, suivez les instructions de configuration Python du guide de démarrage rapide de Compute Engine – Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Compute Engine en langage Python.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # bucket_name = "your-bucket-name"
    # source_file_name = "local/path/to/file"
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(
        "File {} uploaded to {}.".format(
            source_file_name, destination_blob_name
        )
    )

Configuration de DataflowTemplateOperator

Avant d'exécuter l'exemple, assurez-vous de définir les variables d'environnement appropriées. Pour ce faire, vous pouvez utiliser gcloud ou l'interface utilisateur d'Airflow :

gcloud

Saisissez les commandes suivantes :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set project_id PROJECT_ID

où :

  • ENVIRONMENT correspond au nom de l'environnement Cloud Composer.
  • LOCATION correspond à la région où se trouve l'environnement Cloud Composer
  • PROJECT_ID correspond à votre ID de projet Google Cloud.
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_region GCE_REGION

où :

  • GCE_REGION correspond à la région de votre région Compute Engine.
gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set gce_zone GCE_ZONE

où :

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set bucket_path BUCKET_PATH

où :

  • BUCKET_PATH est l'emplacement du bucket Cloud Storage que vous avez créé précédemment.

Interface utilisateur d'Airflow

  1. Dans la barre d'outils, cliquez sur Admin > Variables (Administration > Variables).

  2. Cliquez sur Create (Créer).

  3. Saisissez les informations suivantes :

    • Key (Clé) : project_id
    • Val (Valeur) : PROJECT_ID est votre ID de projet Google Cloud.
  4. Cliquez sur Save and Add Another (Enregistrer et ajouter) Option .

  5. Saisissez les informations suivantes :

    • Key (Clé) : bucket_path
    • Val (Valeur) : BUCKET_PATH correspondant à l'emplacement de votre bucket Cloud Storage (par exemple, "gs://my-bucket").
  6. Cliquez sur Enregistrer et ajouter.

  7. Saisissez les informations suivantes :

    • Key (Clé) : gce_region
    • Val (Valeur) : GCE_REGION correspondant à la région de votre région Compute Engine.
  8. Cliquez sur Enregistrer et ajouter.

  9. Saisissez les informations suivantes :

  10. Cliquez sur Save.

Vous allez maintenant référencer les fichiers que vous avez créés précédemment pour créer un DAG qui lance le workflow Dataflow. Copiez ce DAG et enregistrez-le localement sous le nom composer-dataflow-dag.py.

Python

Avant de tester cet exemple, suivez les instructions de configuration Python du guide de démarrage rapide de Compute Engine – Utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Compute Engine en langage Python.



"""Example Airflow DAG that creates a Cloud Dataproc cluster, runs the Hadoop
wordcount example, and deletes the cluster.

This DAG relies on three Airflow variables
https://airflow.apache.org/concepts.html#variables
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
* gce_region - Google Compute Engine region where Cloud Dataflow cluster should be
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = models.Variable.get("bucket_path")
project_id = models.Variable.get("project_id")
gce_zone = models.Variable.get("gce_zone")
gce_region = models.Variable.get("gce_region")

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your region
        "region": gce_region,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "temp_location": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:

    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Importation du DAG dans Cloud Storage

Importez votre DAG dans le dossier de votre environnement. Une fois l'importation terminée, vous devriez le voir apparaître en cliquant sur le lien Dossier DAGS sur la page Environnements Cloud Composer.

Le dossier DAGS de votre environnement contient votre DAG

Afficher l'état d'une tâche

  1. Accédez à l'interface Web d'Airflow.
  2. Sur la page DAG, cliquez sur le nom du DAG, par exemple composerDataflowDAG.
  3. Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :

    • Échec : la tâche apparaît dans un encadré rouge. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention State: Failed (État : Échec).La tâche apparaît dans un encadré rouge, indiquant qu'elle a échoué

Après quelques minutes, vos résultats devraient s'afficher dans Dataflow et BigQuery.

Afficher votre tâche dans Dataflow

  1. Accédez à l'interface utilisateur Web de Dataflow. Accéder à l'interface utilisateur Web de Dataflow

  2. Votre tâche est nommée dataflow_operator_transform_csv_to_bq avec un ID unique rattaché à la fin du nom par un tiret, comme ceci : La tâche Dataflow possède un identifiant unique

  3. Cliquez sur le nom pour afficher les détails de la tâche. En savoir plus sur les détails de la tâche Dataflow Tous les détails de la tâche sont indiqués ci-dessous

Afficher vos résultats dans BigQuery

  1. Accédez à l'interface utilisateur Web de BigQuery. Accéder à l'interface utilisateur Web de BigQuery

  2. Vous pouvez envoyer des requêtes à l'aide du langage SQL standard. Exécutez la requête suivante pour afficher les lignes qui ont été ajoutées à votre table :

    SELECT * FROM projectId.average_weather
    

Nettoyer

Afin d'éviter que des frais ne soient facturés sur votre compte Google Cloud Platform pour les ressources utilisées dans ce tutoriel, procédez comme suit :

  1. Supprimez l'environnement Cloud Composer.
  2. Supprimez le bucket Cloud Storage pour l'environnement Cloud Composer. La suppression de l'environnement Cloud Composer ne supprime pas son bucket.
  3. Arrêtez la tâche Dataflow.
  4. Supprimez la table BigQuery et l'ensemble de données BigQuery.