Lancer des pipelines Dataflow avec Cloud Composer

Cloud Composer 1 | Cloud Composer 2

Cette page explique comment utiliser DataflowTemplateOperator pour lancer des pipelines Dataflow à partir de Cloud Composer. Le pipeline Texte Cloud Storage vers BigQuery est un pipeline de traitement par lot qui vous permet d'importer des fichiers texte stockés dans Cloud Storage, de les transformer à l'aide de la fonction JavaScript définie par l'utilisateur (UDF) que vous fournissez, et de générer les résultats dans BigQuery.

une fonction définie par l'utilisateur, un fichier d'entrée et un schéma JSON sont importés dans un bucket Cloud Storage. Un DAG qui fait référence à ces fichiers lance un pipeline de traitement par lot Dataflow, qui applique la fonction définie par l'utilisateur et le fichier de schéma JSON au fichier d'entrée. Ce contenu sera ensuite importé dans une table BigQuery

Présentation

  • Avant de lancer le workflow, vous allez créer les entités suivantes:

    • Une table BigQuery vide à partir d'un ensemble de données vide qui contient les colonnes d'informations suivantes : location, average_temperature, month et, éventuellement, inches_of_rain, is_current et latest_measurement.

    • Fichier JSON qui normalisera les données du fichier .txt au format approprié pour le schéma de la table BigQuery. L'objet JSON comporte un tableau BigQuery Schema, où chaque objet contient un nom de colonne et un type d'entrée, et indique s'il s'agit ou non d'un champ obligatoire.

    • Un fichier .txt d'entrée contenant les données qui seront importées de manière groupée dans la table BigQuery.

    • Une fonction définie par l'utilisateur écrite en JavaScript qui transforme chaque ligne du fichier .txt en variables pertinentes pour notre table.

    • Un fichier DAG Airflow pointant vers l'emplacement de ces fichiers.

  • Vous allez ensuite importer les fichiers .txt, .js et de schéma .json dans un bucket Cloud Storage. Vous importerez aussi le DAG dans votre environnement Cloud Composer.

  • Une fois le DAG importé, Airflow exécute une tâche à partir de celui-ci. Cette tâche lance un pipeline Dataflow qui appliquera la fonction définie par l'utilisateur au fichier .txt et le formatera en fonction du schéma JSON.

  • Enfin, les données seront importées dans la table BigQuery que vous avez créée précédemment.

Avant de commencer

  • Ce guide nécessite une connaissance de JavaScript pour écrire la fonction définie par l'utilisateur.
  • Dans ce guide, nous partons du principe que vous disposez déjà d'un environnement Cloud Composer. Consultez Créer un environnement pour en créer un. Dans ce guide, vous pouvez utiliser n'importe quelle version de Cloud Composer.
  • Activer les API Cloud Composer, Dataflow, Cloud Storage, BigQuery.

    Activer les API

Créer une table BigQuery vide avec une définition de schéma

Créer une table BigQuery avec une définition de schéma Vous utiliserez cette définition de schéma plus loin dans ce guide. Cette table BigQuery contiendra les résultats de l'importation groupée.

Pour créer une table vide avec une définition de schéma :

Console

  1. Dans la console Google Cloud, accédez à la page BigQuery:

    Accéder à BigQuery

  2. Dans le panneau de navigation, dans la section Ressources, développez votre projet.

  3. Dans le panneau des détails, cliquez sur Create dataset (Créer un ensemble de données).

    cliquez sur le bouton "Créer un ensemble de données"

  4. Dans la section ID de l'ensemble de données de la page "Créer un ensemble de données", nommez votre ensemble de données average_weather. Conservez l'état par défaut de tous les autres champs.

    Renseigner l'ID de l'ensemble de données avec le nom average_weather

  5. Cliquez sur Créer un ensemble de données.

  6. Revenez au panneau de navigation. Dans la section Ressources, développez votre projet. Cliquez ensuite sur l'ensemble de données average_weather.

  7. Dans le panneau de détails, cliquez sur Create table (Créer une table).

    cliquer sur "Créer une table"

  8. Dans la section Source de la page Créer une table, sélectionnez Table vide.

  9. Dans la section Destination de la page Créer une table :

    • Dans le champ Dataset name (Nom de l'ensemble de données), sélectionnez l'ensemble de données average_weather.

      Choisissez l'ensemble de données average_weather.

    • Dans le champ Table name (Nom de la table), saisissez le nom average_weather.

    • Vérifiez que Type de table est défini sur Table native.

  10. Dans la section Schema (Schéma), saisissez la définition du schéma. Vous pouvez adopter l'une des approches suivantes:

    • Saisissez les informations de schéma manuellement en activant l'option Edit as text (Modifier sous forme de texte) et en saisissant le schéma de la table en tant que tableau JSON. Saisissez les informations dans les champs suivants:

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Utilisez l'option Add field (Ajouter un champ) pour saisir manuellement le schéma:

      cliquez sur "Ajouter un champ" pour saisir les champs

  11. Sous Paramètres de partitionnement et de clustering, conservez la valeur par défaut, No partitioning.

  12. Dans la section Options avancées, pour Chiffrement, conservez la valeur par défaut, Google-managed key.

  13. Cliquez sur Créer la table.

bq

Exécutez la commande bq mk pour créer un ensemble de données vide et une table dans cet ensemble de données.

Exécutez la commande suivante pour créer un ensemble de données sur les conditions météorologiques mondiales moyennes:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Remplacez les éléments suivants :

  • LOCATION: région dans laquelle se trouve l'environnement.
  • PROJECT_ID: ID du projet

Exécutez la commande suivante pour créer une table vide dans cet ensemble de données avec la définition de schéma:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Une fois la table créée, vous pouvez mettre à jour son délai d'expiration, sa description et ses étiquettes. Vous pouvez également modifier la définition du schéma.

Python

Enregistrez ce code sous le nom dataflowtemplateoperator_create_dataset_and_table_helper.py et mettez à jour les variables qu'il contient pour refléter votre projet et votre emplacement, puis exécutez-le à l'aide de la commande suivante:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"

def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Créer un bucket Cloud Storage

Créez un bucket pour stocker tous les fichiers nécessaires au workflow. Le DAG que vous créerez plus loin dans ce guide fera référence aux fichiers que vous importez dans ce bucket de stockage. Pour créer un bucket de stockage, procédez comme suit :

Console

  1. Ouvrez Cloud Storage dans la console Google Cloud.

    Accéder à Cloud Storage

  2. Cliquez sur Créer un bucket pour ouvrir le formulaire de création de bucket.

    1. Saisissez les informations concernant votre bucket et cliquez sur Continuer à chaque étape :

      • Spécifiez un nom unique pour votre bucket. Ce guide utilise bucketName comme exemple.

      • Sélectionnez Régional comme type d'emplacement. Ensuite, sélectionnez l'emplacement où seront stockées les données du bucket.

      • Sélectionnez Standard comme classe de stockage par défaut pour vos données.

      • Sélectionnez Uniforme pour le contrôle d'accès à vos objets.

    2. Cliquez sur OK.

gsutil

Exécutez la commande gsutil mb :

gsutil mb gs://bucketName/

Remplacez les éléments suivants :

  • bucketName: nom du bucket que vous avez créé précédemment dans ce guide.

Exemples de code

C#

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage

def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Créer un schéma BigQuery au format JSON pour votre table de sortie

Créez un fichier de schéma BigQuery au format JSON correspondant à la table de sortie que vous avez créée précédemment. Notez que les noms, types et modes de champs doivent correspondre à ceux définis précédemment dans votre schéma de table BigQuery. Ce fichier normalise les données de votre fichier .txt dans un format compatible avec votre schéma BigQuery. Nommez ce fichier jsonSchema.json.

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Créer un fichier JavaScript pour mettre en forme vos données

Dans ce fichier, vous allez définir votre fonction définie par l'utilisateur (UDF) qui fournit la logique permettant de transformer les lignes de texte de votre fichier d'entrée. Notez que cette fonction utilise chaque ligne de texte de votre fichier d'entrée comme son propre argument. Elle sera donc exécutée une fois pour chaque ligne de votre fichier d'entrée. Nommez ce fichier transformCSVtoJSON.js.


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Créer votre fichier d'entrée

Ce fichier contiendra les informations que vous souhaitez importer dans votre table BigQuery. Copiez ce fichier localement et nommez-le inputFile.txt.

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Importer des fichiers dans votre bucket

Importez les fichiers suivants dans le bucket Cloud Storage que vous avez créé précédemment:

  • Schéma BigQuery au format JSON (.json)
  • Fonction définie par l'utilisateur JavaScript (transformCSVtoJSON.js)
  • Fichier d'entrée du texte à traiter (.txt)

Console

  1. Dans la console Google Cloud, accédez à la page Buckets Cloud Storage.

    Accéder à la page "Buckets"

  2. Dans la liste des buckets, cliquez sur votre bucket.

  3. Dans l'onglet Objets du bucket, effectuez l'une des opérations suivantes:

    • Effectuez un glisser-déposer des fichiers souhaités depuis votre bureau ou votre gestionnaire de fichiers vers le volet principal de la console Google Cloud.

    • Cliquez sur le bouton Upload Files (Importer des fichiers), sélectionnez les fichiers que vous souhaitez importer dans la boîte de dialogue qui s'affiche, puis cliquez sur Open (Ouvrir).

gsutil

Exécutez la commande gsutil cp :

gsutil cp OBJECT_LOCATION gs://bucketName

Remplacez les éléments suivants :

  • bucketName: nom du bucket que vous avez créé précédemment dans ce guide.
  • OBJECT_LOCATION: chemin d'accès local à votre objet. Exemple : Desktop/transformCSVtoJSON.js.

Exemples de code

Python

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage

def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configurer DataflowTemplateOperator

Avant d'exécuter le DAG, définissez les variables Airflow suivantes.

Variable Airflow Valeur
project_id L'ID du projet
gce_zone Zone Compute Engine dans laquelle le cluster Dataflow doit être créé
bucket_path L'emplacement du bucket Cloud Storage créé précédemment

Vous allez maintenant référencer les fichiers que vous avez créés précédemment pour créer un DAG qui lance le workflow Dataflow. Copiez ce DAG et enregistrez-le localement sous le nom composer-dataflow-dag.py.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"

default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Importer le DAG dans Cloud Storage

Importez votre DAG dans le dossier /dags du bucket de votre environnement. Une fois l'importation terminée, vous pouvez la voir en cliquant sur le lien Dossier des DAG sur la page des environnements Cloud Composer.

Le dossier des DAG de votre environnement contient votre DAG

Afficher l'état de la tâche

  1. Accédez à l'interface Web d'Airflow.
  2. Sur la page des DAG, cliquez sur le nom du DAG, par exemple composerDataflowDAG.
  3. Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :

    • Failed: la tâche est encadrée en rouge. Vous pouvez également maintenir le pointeur sur la tâche et rechercher State: Failed (État : Échec).

    • Success: la tâche est encadrée en vert. Vous pouvez également maintenir le pointeur sur la tâche et vérifier l'état State: Success (État : Réussite).

Après quelques minutes, vous pouvez vérifier les résultats dans Dataflow et BigQuery.

Afficher votre tâche dans Dataflow

  1. Dans la console Google Cloud, accédez à la page Dataflow.

    Accéder à Dataflow

  2. Votre tâche est nommée dataflow_operator_transform_csv_to_bq avec un ID unique qui est ajouté à la fin du nom par un trait d'union, comme suit:

    La tâche Dataflow possède un identifiant unique

  3. Cliquez sur le nom du poste pour afficher les détails de l'offre.

    voir tous les détails de l'offre d'emploi

Afficher vos résultats dans BigQuery

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Vous pouvez envoyer des requêtes à l'aide du langage SQL standard. Exécutez la requête suivante pour afficher les lignes qui ont été ajoutées à votre table :

    SELECT * FROM projectId.average_weather.average_weather