Lancer des pipelines Dataflow avec Cloud Composer

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment utiliser l'opérateur DataflowTemplateOperator pour lancer des pipelines Dataflow à partir de Cloud Composer. Pipeline Cloud Storage Text-BigQuery est un pipeline de traitement par lot qui vous permet d'importer des fichiers texte Cloud Storage, les transformer à l'aide de la fonction JavaScript définie par l'utilisateur que vous fournissez, puis générer les résultats dans dans BigQuery.

une fonction définie par l'utilisateur, un fichier d'entrée et un schéma JSON sont importés
  dans un bucket Cloud Storage. Un DAG qui référence ces fichiers lance un pipeline par lot Dataflow, qui applique la fonction définie par l'utilisateur et le fichier de schéma JSON au fichier d'entrée. Ensuite, ce contenu sera importé dans une table BigQuery

Présentation

  • Avant de démarrer le workflow, vous allez créer les entités suivantes:

    • Une table BigQuery vide à partir d'un ensemble de données vide qui contient les colonnes d'informations suivantes : location, average_temperature, month et, éventuellement, inches_of_rain, is_current et latest_measurement.

    • Un fichier JSON qui normalise les données du fichier .txt au format approprié pour le schéma de la table BigQuery. L'objet JSON comporte un tableau BigQuery Schema, où chaque objet contiendra un nom de colonne, un type d'entrée et si mais il ne s'agit pas d'un champ obligatoire.

    • Un fichier d'entrée .txt contenant les données qui seront importées par lot dans la table BigQuery.

    • Une fonction définie par l'utilisateur écrite en JavaScript qui transforme chaque ligne du fichier .txt en variables pertinentes pour notre table.

    • Un fichier DAG Airflow qui pointe vers l'emplacement de ces .

  • Vous allez ensuite importer le fichier .txt, le fichier UDF .js et le schéma .json. vers un bucket Cloud Storage. Vous importerez également le DAG votre environnement Cloud Composer.

  • Une fois le DAG importé, Airflow exécute une tâche à partir de celui-ci. Cette tâche lancer un pipeline Dataflow qui appliquera définie par l'utilisateur dans le fichier .txt et formatez-le conformément au Schéma JSON.

  • Enfin, les données sont importées dans la table BigQuery que vous avez créé précédemment.

Avant de commencer

  • Ce guide nécessite une bonne connaissance de JavaScript pour écrire la fonction définie par l'utilisateur.
  • Ce guide suppose que vous disposez déjà d'un environnement Cloud Composer. Pour en créer un, consultez Créer un environnement. Vous pouvez utilisez n'importe quelle version de Cloud Composer avec ce guide.
  • Enable the Cloud Composer, Dataflow, Cloud Storage, BigQuery APIs.

    Enable the APIs

Créer une table BigQuery vide avec une définition de schéma

Créez une table BigQuery avec une définition de schéma. Toi utilisera cette définition de schéma plus loin dans ce guide. Ce La table BigQuery contient les résultats de l'importation groupée.

Pour créer une table vide avec une définition de schéma :

Console

  1. Dans la console Google Cloud, accédez à BigQuery :

    Accéder à BigQuery

  2. Dans le panneau de navigation, dans la section Ressources, développez votre projet.

  3. Dans le panneau des détails, cliquez sur Create dataset (Créer un ensemble de données).

    cliquez sur le bouton "Créer un ensemble de données".

  4. Dans la section ID de l'ensemble de données de la page "Créer un ensemble de données", attribuez un nom Ensemble de données average_weather. Conservez les paramètres par défaut pour tous les autres champs.

    Renseigner l'ID de l'ensemble de données avec le nom average_weather

  5. Cliquez sur Créer un ensemble de données.

  6. Revenez au panneau de navigation, dans la section Ressources, développez votre projet. Cliquez ensuite sur l'ensemble de données average_weather.

  7. Dans le panneau de détails, cliquez sur Create table (Créer une table).

    cliquez sur "Créer une table".

  8. Dans la section Source de la page Créer une table, sélectionnez Table vide.

  9. Dans la section Destination de la page Créer une table :

    • Dans le champ Dataset name (Nom de l'ensemble de données), sélectionnez l'ensemble de données average_weather.

      Choisissez l'ensemble de données average_weather.

    • Dans le champ Table name (Nom de la table), saisissez le nom average_weather.

    • Vérifiez que Type de table est défini sur Table native.

  10. Dans la section Schema (Schéma), saisissez la définition du schéma. Vous pouvez utiliser l'une des approches suivantes:

    • Saisissez les informations de schéma manuellement en activant l'option Modifier sous forme de texte, puis en saisissant le schéma de la table en tant que tableau JSON. Tapez ce qui suit :

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Utilisez l'option Ajouter un champ pour saisir manuellement le schéma :

      Cliquez sur "Add field" (Ajouter un champ) pour saisir les champs.

  11. Pour Paramètres de partitionnement et de clustering, conservez la valeur par défaut (No partitioning).

  12. Dans la section Options avancées, pour Chiffrement, conservez la valeur par défaut, Google-managed key.

  13. Cliquez sur Créer une table.

bq

Utilisez la commande bq mk pour créer un ensemble de données vide et une table dans ce ensemble de données.

Exécutez la commande suivante pour créer un ensemble de données sur la météo moyenne mondiale:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Remplacez les éléments suivants :

  • LOCATION: région dans laquelle se trouve l'environnement.
  • PROJECT_ID : ID du projet.

Exécutez la commande suivante pour créer une table vide dans cet ensemble de données avec la définition du schéma:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Une fois la table créée, vous pouvez mettre à jour son délai d'expiration, sa description et ses libellés. Vous pouvez également modifier la définition du schéma.

Python

Enregistrer ce code sous dataflowtemplateoperator_create_dataset_and_table_helper.py et mettez à jour les variables qu'il contient pour refléter votre projet et votre emplacement, puis exécutez-le à l'aide de la commande suivante:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Créer un bucket Cloud Storage

Créez un bucket pour stocker tous les fichiers nécessaires au workflow. Le DAG que vous créerez plus loin dans ce guide référencera les fichiers que vous importerez dans ce bucket de stockage. Pour créer un bucket de stockage, procédez comme suit :

Console

  1. Ouvrez Cloud Storage dans la console Google Cloud.

    Accéder à Cloud Storage

  2. Cliquez sur Créer un bucket pour ouvrir le formulaire de création de bucket.

    1. Saisissez les informations concernant votre bucket et cliquez sur Continuer à chaque étape :

      • Spécifiez un nom encore jamais utilisé pour votre bucket. Ce guide utilise bucketName comme exemple.

      • Sélectionnez Régional comme type d'emplacement. Ensuite, sélectionnez un Emplacement où les données du bucket seront stockées.

      • Sélectionnez Standard comme classe de stockage par défaut pour vos données.

      • Sélectionnez Uniforme pour le contrôle d'accès à vos objets.

    2. Cliquez sur OK.

gcloud

Exécutez la commande gcloud storage buckets create :

gcloud storage buckets create gs://bucketName/

Remplacez les éléments suivants :

  • bucketName : nom du bucket que vous avez créé précédemment dans ce guide.

Exemples de code

C#

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Créer un schéma BigQuery au format JSON pour votre table de sortie

Créez un fichier de schéma BigQuery au format JSON correspondant à la table de sortie que vous avez créée précédemment. Notez que les noms, types et modes des champs doivent correspondre à ceux définis précédemment dans votre table BigQuery du schéma. Ce fichier normalisera les données de votre fichier .txt dans un format compatible avec votre schéma BigQuery. Nommer ce fichier jsonSchema.json

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Créer un fichier JavaScript pour mettre en forme vos données

Dans ce fichier, vous définirez votre fonction définie par l'utilisateur qui fournit la logique permettant de transformer les lignes de texte de votre fichier d'entrée. Notez que cette prend chaque ligne de texte de votre fichier d'entrée comme son propre argument. la fonction s’exécutera une fois pour chaque ligne de votre fichier d’entrée. Nommez ce fichier transformCSVtoJSON.js.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Créer votre fichier d'entrée

Ce fichier contiendra les informations que vous souhaitez importer dans votre table BigQuery. Copiez ce fichier localement et nommez-le inputFile.txt

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Importer des fichiers dans votre bucket

Importez les fichiers suivants dans le bucket Cloud Storage que vous avez créé plus tôt:

  • Schéma BigQuery au format JSON (.json)
  • Fonction définie par l'utilisateur JavaScript (transformCSVtoJSON.js)
  • Fichier d'entrée du texte à traiter (.txt)

Console

  1. Dans la console Google Cloud, accédez à la page Buckets Cloud Storage.

    Accéder à la page "Buckets"

  2. Dans la liste des buckets, cliquez sur le vôtre.

  3. Dans l'onglet Objets du bucket, effectuez l'une des opérations suivantes:

    • Effectuez un glisser-déposer des fichiers souhaités depuis votre bureau ou votre gestionnaire de fichiers vers le volet principal de la console Google Cloud.

    • Cliquez sur le bouton Importer des fichiers, sélectionnez les fichiers que vous souhaitez importer dans la boîte de dialogue qui s'affiche, puis cliquez sur Ouvrir.

gcloud

Exécutez la commande gcloud storage cp :

gcloud storage cp OBJECT_LOCATION gs://bucketName

Remplacez les éléments suivants :

  • bucketName : nom du bucket que vous avez créé précédemment dans ce guide.
  • OBJECT_LOCATION: chemin d'accès local à votre objet. Exemple : Desktop/transformCSVtoJSON.js.

Exemples de code

Python

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configurer DataflowTemplateOperator

Avant d'exécuter le DAG, définissez les variables Airflow suivantes.

Variable Airflow Valeur
project_id ID du projet
gce_zone Zone Compute Engine dans laquelle le cluster Dataflow doit être créé
bucket_path L'emplacement du bucket Cloud Storage que vous avez créé plus tôt

Vous allez maintenant référencer les fichiers que vous avez créés précédemment pour créer un DAG qui lance le workflow Dataflow. Copiez ce DAG et enregistrez-le localement sous le nom composer-dataflow-dag.py.



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Importer le DAG dans Cloud Storage

Importez votre DAG dans le dossier /dags du dossier bucket. Une fois l'importation terminée, vous pouvez le voir en cliquant sur le lien Dossier DAGS sur la page Environnements Cloud Composer.

Le dossier des DAG de votre environnement contient votre DAG

Afficher l'état de la tâche

  1. Accédez à l'interface Web d'Airflow.
  2. Sur la page des DAG, cliquez sur le nom du DAG (par exemple, composerDataflowDAG).
  3. Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :

    • Failed: la tâche est entourée d'un cadre rouge. Vous pouvez également placer le pointeur de la souris sur la tâche et rechercher la mention State: Failed (État : Échec).

    • Success: la tâche est entourée d'un cadre vert. Vous pouvez également placer le pointeur sur la tâche pour voir s'afficher la mention (État : Réussite).

Après quelques minutes, vous pouvez vérifier les résultats dans Dataflow et dans BigQuery.

Afficher votre tâche dans Dataflow

  1. Dans la console Google Cloud, accédez à la page Dataflow.

    Accéder à Dataflow

  2. Votre job est nommé dataflow_operator_transform_csv_to_bq avec un identifiant unique à la fin du nom par un trait d'union, comme ceci:

    La tâche Dataflow possède un identifiant unique

  3. Cliquez sur le nom pour afficher détails de l'offre d'emploi.

    voir tous les détails de la tâche ;

Afficher vos résultats dans BigQuery

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Vous pouvez envoyer des requêtes à l'aide du langage SQL standard. Exécutez la requête suivante pour afficher les lignes qui ont été ajoutées à votre table :

    SELECT * FROM projectId.average_weather.average_weather