Lancer des pipelines Dataflow avec Cloud Composer

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Cette page explique comment utiliser DataflowTemplateOperator pour lancer Pipelines Dataflow de Cloud Composer. Pipeline Cloud Storage Text-BigQuery est un pipeline de traitement par lot qui vous permet d'importer des fichiers texte Cloud Storage, les transformer à l'aide de la fonction JavaScript définie par l'utilisateur que vous fournissez, puis générer les résultats dans dans BigQuery.

une fonction définie par l'utilisateur, un fichier d'entrée et un schéma JSON sont importés
  dans un bucket Cloud Storage. Un DAG faisant référence à ces fichiers lancera un pipeline de traitement par lot Dataflow, qui appliquera la fonction définie par l'utilisateur et le fichier de schéma JSON au fichier d'entrée. Ensuite, ce contenu sera importé dans une table BigQuery

Présentation

  • Avant de démarrer le workflow, vous allez créer les entités suivantes:

    • Une table BigQuery vide à partir d'un ensemble de données vide qui contient les colonnes d'informations suivantes : location, average_temperature, month et, éventuellement, inches_of_rain, is_current et latest_measurement.

    • Un fichier JSON qui normalisera les données de .txt au format approprié pour le table BigQuery du schéma. L'objet JSON comporte un tableau BigQuery Schema, où chaque objet contiendra un nom de colonne, un type d'entrée et si mais il ne s'agit pas d'un champ obligatoire.

    • Un fichier d'entrée .txt qui contiendra les données qui seront importées de manière groupée à la table BigQuery.

    • Une fonction définie par l'utilisateur écrite en JavaScript qui transforme chaque ligne du fichier .txt en variables pertinentes pour notre table.

    • Un fichier DAG Airflow qui pointe vers l'emplacement de ces .

  • Vous allez ensuite importer le fichier .txt, le fichier UDF .js et le schéma .json. dans un bucket Cloud Storage. Vous importerez également le DAG votre environnement Cloud Composer.

  • Une fois le DAG importé, Airflow exécute une tâche à partir de celui-ci. Cette tâche lancer un pipeline Dataflow qui appliquera définie par l'utilisateur dans le fichier .txt et mettez-le en forme conformément au Schéma JSON.

  • Enfin, les données sont importées dans la table BigQuery que vous avez créé précédemment.

Avant de commencer

  • Ce guide requiert une bonne connaissance de JavaScript pour écrire la fonction définie par l'utilisateur.
  • Dans ce guide, nous partons du principe que vous disposez déjà environnement. Consultez Créer un environnement pour en créer un. Vous pouvez utilisez n'importe quelle version de Cloud Composer avec ce guide.
  • Activer les API Cloud Composer, Dataflow, Cloud Storage, BigQuery.

    Activer les API

Créer une table BigQuery vide avec une définition de schéma

Créer une table BigQuery avec une définition de schéma Toi utilisera cette définition de schéma plus loin dans ce guide. Ce La table BigQuery contient les résultats de l'importation groupée.

Pour créer une table vide avec une définition de schéma :

Console

  1. Dans la console Google Cloud, accédez à BigQuery :

    Accéder à BigQuery

  2. Dans le panneau de navigation, dans la section Ressources, développez votre projet.

  3. Dans le panneau des détails, cliquez sur Create dataset (Créer un ensemble de données).

    cliquez sur le bouton "Créer un ensemble de données"

  4. Dans la section ID de l'ensemble de données de la page "Créer un ensemble de données", attribuez un nom Ensemble de données average_weather. Conservez les valeurs par défaut de tous les autres champs. state.

    Renseigner l'ID de l'ensemble de données avec le nom average_weather

  5. Cliquez sur Créer un ensemble de données.

  6. Revenez au panneau de navigation, dans la section Ressources, développez votre projet. Cliquez ensuite sur l'ensemble de données average_weather.

  7. Dans le panneau de détails, cliquez sur Create table (Créer une table).

    cliquez sur "Créer une table"

  8. Dans la section Source de la page Créer une table, sélectionnez Table vide.

  9. Dans la section Destination de la page Créer une table :

    • Dans le champ Dataset name (Nom de l'ensemble de données), sélectionnez l'ensemble de données average_weather.

      Choisissez l'ensemble de données average_weather.

    • Dans le champ Table name (Nom de la table), saisissez le nom average_weather.

    • Vérifiez que Type de table est défini sur Table native.

  10. Dans la section Schema (Schéma), saisissez la définition du schéma. Vous pouvez utiliser l'une des approches suivantes:

    • Saisissez les informations de schéma manuellement en activant l'option Modifier sous forme de texte, puis en saisissant le schéma de la table en tant que tableau JSON. Saisissez ce qui suit :

      [
          {
              "name": "location",
              "type": "GEOGRAPHY",
              "mode": "REQUIRED"
          },
          {
              "name": "average_temperature",
              "type": "INTEGER",
              "mode": "REQUIRED"
          },
          {
              "name": "month",
              "type": "STRING",
              "mode": "REQUIRED"
          },
          {
              "name": "inches_of_rain",
              "type": "NUMERIC"
          },
          {
              "name": "is_current",
              "type": "BOOLEAN"
          },
          {
              "name": "latest_measurement",
              "type": "DATE"
          }
      ]
      
    • Utilisez l'option Ajouter un champ pour saisir manuellement le schéma:

      cliquez sur "Add field" (Ajouter un champ) pour saisir les champs

  11. Pour Paramètres de partitionnement et de clustering, conservez la valeur par défaut (No partitioning).

  12. Dans la section Options avancées, pour Chiffrement, laissez la la valeur par défaut, Google-managed key.

  13. Cliquez sur Créer une table.

<ph type="x-smartling-placeholder">
</ph>

bq

Utilisez la commande bq mk pour créer un ensemble de données vide et une table dans ce ensemble de données.

Exécutez la commande suivante pour créer un ensemble de données sur la météo moyenne mondiale:

bq --location=LOCATION mk \
    --dataset PROJECT_ID:average_weather

Remplacez les éléments suivants :

  • LOCATION: région dans laquelle se trouve l'environnement.
  • PROJECT_ID: ID du projet

Exécutez la commande suivante pour créer une table vide dans cet ensemble de données avec la définition du schéma:

bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE

Une fois la table créée, vous pouvez mise à jour l'expiration, la description et les étiquettes de la table. Vous pouvez également modifier la définition du schéma.

Python

Enregistrer ce code sous dataflowtemplateoperator_create_dataset_and_table_helper.py et mettez à jour les variables qu'il contient pour refléter votre projet et votre emplacement, puis exécutez-le à l'aide de la commande suivante:

python dataflowtemplateoperator_create_dataset_and_table_helper.py

Python

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


# Make sure to follow the quickstart setup instructions beforehand.
# See instructions here:
# https://cloud.google.com/bigquery/docs/quickstarts/quickstart-client-libraries

# Before running the sample, be sure to install the bigquery library
# in your local environment by running pip install google.cloud.bigquery

from google.cloud import bigquery

# TODO(developer): Replace with your values
project = "your-project"  # Your GCP Project
location = "US"  # the location where you want your BigQuery data to reside. For more info on possible locations see https://cloud.google.com/bigquery/docs/locations
dataset_name = "average_weather"


def create_dataset_and_table(project, location, dataset_name):
    # Construct a BigQuery client object.
    client = bigquery.Client(project)

    dataset_id = f"{project}.{dataset_name}"

    # Construct a full Dataset object to send to the API.
    dataset = bigquery.Dataset(dataset_id)

    # Set the location to your desired location for the dataset.
    # For more information, see this link:
    # https://cloud.google.com/bigquery/docs/locations
    dataset.location = location

    # Send the dataset to the API for creation.
    # Raises google.api_core.exceptions.Conflict if the Dataset already
    # exists within the project.
    dataset = client.create_dataset(dataset)  # Make an API request.

    print(f"Created dataset {client.project}.{dataset.dataset_id}")

    # Create a table from this dataset.

    table_id = f"{client.project}.{dataset_name}.average_weather"

    schema = [
        bigquery.SchemaField("location", "GEOGRAPHY", mode="REQUIRED"),
        bigquery.SchemaField("average_temperature", "INTEGER", mode="REQUIRED"),
        bigquery.SchemaField("month", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("inches_of_rain", "NUMERIC", mode="NULLABLE"),
        bigquery.SchemaField("is_current", "BOOLEAN", mode="NULLABLE"),
        bigquery.SchemaField("latest_measurement", "DATE", mode="NULLABLE"),
    ]

    table = bigquery.Table(table_id, schema=schema)
    table = client.create_table(table)  # Make an API request.
    print(f"Created table {table.project}.{table.dataset_id}.{table.table_id}")

Créer un bucket Cloud Storage

Créez un bucket pour stocker tous les fichiers nécessaires au workflow. Le DAG que vous créer plus loin dans ce guide se réfèrent aux fichiers que vous importez dans ce bucket de stockage. Pour créer un bucket de stockage, procédez comme suit :

Console

  1. Ouvrez Cloud Storage dans la console Google Cloud.

    Accéder à Cloud Storage

  2. Cliquez sur Créer un bucket pour ouvrir le formulaire de création de bucket.

    1. Saisissez les informations concernant votre bucket et cliquez sur Continuer à chaque étape :

      • Spécifiez un nom unique pour le bucket. Ce guide utilise bucketName à titre d'exemple.

      • Sélectionnez Régional comme type d'emplacement. Ensuite, sélectionnez un Emplacement où les données du bucket seront stockées.

      • Sélectionnez Standard comme classe de stockage par défaut pour vos données.

      • Sélectionnez Uniforme pour le contrôle d'accès à vos objets.

    2. Cliquez sur OK.

gsutil

Exécutez la commande gsutil mb :

gsutil mb gs://bucketName/

Remplacez les éléments suivants :

  • bucketName: nom du bucket que vous avez créé précédemment dans .

Exemples de code

C#

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Apis.Storage.v1.Data;
using Google.Cloud.Storage.V1;
using System;

public class CreateBucketSample
{
    public Bucket CreateBucket(
        string projectId = "your-project-id",
        string bucketName = "your-unique-bucket-name")
    {
        var storage = StorageClient.Create();
        var bucket = storage.CreateBucket(projectId, bucketName);
        Console.WriteLine($"Created {bucketName}.");
        return bucket;
    }
}

Go

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/storage"
)

// createBucket creates a new bucket in the project.
func createBucket(w io.Writer, projectID, bucketName string) error {
	// projectID := "my-project-id"
	// bucketName := "bucket-name"
	ctx := context.Background()
	client, err := storage.NewClient(ctx)
	if err != nil {
		return fmt.Errorf("storage.NewClient: %w", err)
	}
	defer client.Close()

	ctx, cancel := context.WithTimeout(ctx, time.Second*30)
	defer cancel()

	bucket := client.Bucket(bucketName)
	if err := bucket.Create(ctx, projectID, nil); err != nil {
		return fmt.Errorf("Bucket(%q).Create: %w", bucketName, err)
	}
	fmt.Fprintf(w, "Bucket %v created\n", bucketName)
	return nil
}

Java

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.BucketInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

public class CreateBucket {
  public static void createBucket(String projectId, String bucketName) {
    // The ID of your GCP project
    // String projectId = "your-project-id";

    // The ID to give your GCS bucket
    // String bucketName = "your-unique-bucket-name";

    Storage storage = StorageOptions.newBuilder().setProjectId(projectId).build().getService();

    Bucket bucket = storage.create(BucketInfo.newBuilder(bucketName).build());

    System.out.println("Created bucket " + bucket.getName());
  }
}

Python

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage


def create_bucket(bucket_name):
    """Creates a new bucket."""
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.create_bucket(bucket_name)

    print(f"Bucket {bucket.name} created")

Ruby

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

def create_bucket bucket_name:
  # The ID to give your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.create_bucket bucket_name

  puts "Created bucket: #{bucket.name}"
end

Créer un schéma BigQuery au format JSON pour votre table de sortie

Créez un fichier de schéma BigQuery au format JSON correspondant à la table de sortie que vous avez créée précédemment. Notez que les noms, types et modes des champs doivent correspondre à ceux définis précédemment dans votre table BigQuery du schéma. Ce fichier normalisera les données de votre fichier .txt dans un format compatibles avec votre schéma BigQuery. Nommer ce fichier jsonSchema.json

{
    "BigQuery Schema": [
    {
        "name": "location",
        "type": "GEOGRAPHY",
        "mode": "REQUIRED"
    },
    {
        "name": "average_temperature",
        "type": "INTEGER",
        "mode": "REQUIRED"
    },
    {
        "name": "month",
        "type": "STRING",
        "mode": "REQUIRED"
    },
    {
        "name": "inches_of_rain",
        "type": "NUMERIC"
    },
    {
        "name": "is_current",
        "type": "BOOLEAN"
    },
    {
        "name": "latest_measurement",
        "type": "DATE"
    }]
}

Créer un fichier JavaScript pour mettre en forme vos données

Dans ce fichier, vous allez définir votre fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte dans votre fichier d'entrée. Notez que cette prend chaque ligne de texte de votre fichier d'entrée comme son propre argument. la fonction s’exécutera une fois pour chaque ligne de votre fichier d’entrée. Nommer ce fichier transformCSVtoJSON.js


function transformCSVtoJSON(line) {
  var values = line.split(',');
  var properties = [
    'location',
    'average_temperature',
    'month',
    'inches_of_rain',
    'is_current',
    'latest_measurement',
  ];
  var weatherInCity = {};

  for (var count = 0; count < values.length; count++) {
    if (values[count] !== 'null') {
      weatherInCity[properties[count]] = values[count];
    }
  }

  return JSON.stringify(weatherInCity);
}

Créer votre fichier d'entrée

Ce fichier contiendra les informations que vous souhaitez importer dans votre table BigQuery. Copiez ce fichier localement et nommez-le inputFile.txt

POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null

Importer des fichiers dans votre bucket

Importez les fichiers suivants dans le bucket Cloud Storage que vous avez créé plus tôt:

  • Schéma BigQuery au format JSON (.json)
  • Fonction définie par l'utilisateur JavaScript (transformCSVtoJSON.js)
  • Fichier d'entrée du texte à traiter (.txt)

Console

  1. Dans la console Google Cloud, accédez à la page Buckets Cloud Storage.

    Accéder à la page "Buckets"

  2. Dans la liste des buckets, cliquez sur votre bucket.

  3. Dans l'onglet Objets du bucket, effectuez l'une des opérations suivantes:

    • Effectuez un glisser-déposer des fichiers souhaités depuis votre bureau ou votre gestionnaire de fichiers vers le volet principal de la console Google Cloud.

    • Cliquez sur le bouton Importer des fichiers, puis sélectionnez les fichiers à importer. dans la boîte de dialogue qui s'affiche, puis cliquez sur Ouvrir.

gsutil

Exécutez la commande gsutil cp :

gsutil cp OBJECT_LOCATION gs://bucketName

Remplacez les éléments suivants :

  • bucketName: nom du bucket que vous avez créé précédemment dans ce guide.
  • OBJECT_LOCATION: chemin d'accès local à votre objet. Exemple : Desktop/transformCSVtoJSON.js.

Exemples de code

Python

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud import storage


def upload_blob(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""
    # The ID of your GCS bucket
    # bucket_name = "your-bucket-name"
    # The path to your file to upload
    # source_file_name = "local/path/to/file"
    # The ID of your GCS object
    # destination_blob_name = "storage-object-name"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    # Optional: set a generation-match precondition to avoid potential race conditions
    # and data corruptions. The request to upload is aborted if the object's
    # generation number does not match your precondition. For a destination
    # object that does not yet exist, set the if_generation_match precondition to 0.
    # If the destination object already exists in your bucket, set instead a
    # generation-match precondition using its generation number.
    generation_match_precondition = 0

    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)

    print(
        f"File {source_file_name} uploaded to {destination_blob_name}."
    )

Ruby

Pour vous authentifier auprès de Cloud Composer, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

def upload_file bucket_name:, local_file_path:, file_name: nil
  # The ID of your GCS bucket
  # bucket_name = "your-unique-bucket-name"

  # The path to your file to upload
  # local_file_path = "/local/path/to/file.txt"

  # The ID of your GCS object
  # file_name = "your-file-name"

  require "google/cloud/storage"

  storage = Google::Cloud::Storage.new
  bucket  = storage.bucket bucket_name, skip_lookup: true

  file = bucket.create_file local_file_path, file_name

  puts "Uploaded #{local_file_path} as #{file.name} in bucket #{bucket_name}"
end

Configurer DataflowTemplateOperator

Avant d'exécuter le DAG, définissez les variables Airflow suivantes.

Variable Airflow Valeur
project_id ID du projet
gce_zone Zone Compute Engine dans laquelle le cluster Dataflow doit être créé
bucket_path L'emplacement du bucket Cloud Storage que vous avez créé plus tôt

Vous allez maintenant référencer les fichiers que vous avez créés précédemment pour créer un DAG qui lance le workflow Dataflow. Copier ce DAG et l'enregistrer localement en tant que composer-dataflow-dag.py.

Airflow 2



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
For more info on zones where Dataflow is available see:
https://cloud.google.com/dataflow/docs/resources/locations
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.providers.google.cloud.operators.dataflow import (
    DataflowTemplatedJobStartOperator,
)
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplatedJobStartOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Airflow 1



"""Example Airflow DAG that creates a Cloud Dataflow workflow which takes a
text file and adds the rows to a BigQuery table.

This DAG relies on four Airflow variables
https://airflow.apache.org/docs/apache-airflow/stable/concepts/variables.html
* project_id - Google Cloud Project ID to use for the Cloud Dataflow cluster.
* gce_zone - Google Compute Engine zone where Cloud Dataflow cluster should be
  created.
  created.
Learn more about the difference between the two here:
https://cloud.google.com/compute/docs/regions-zones
* bucket_path - Google Cloud Storage bucket where you've stored the User Defined
Function (.js), the input file (.txt), and the JSON schema (.json).
"""

import datetime

from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from airflow.utils.dates import days_ago

bucket_path = "{{var.value.bucket_path}}"
project_id = "{{var.value.project_id}}"
gce_zone = "{{var.value.gce_zone}}"


default_args = {
    # Tell airflow to start one day ago, so that it runs as soon as you upload it
    "start_date": days_ago(1),
    "dataflow_default_options": {
        "project": project_id,
        # Set to your zone
        "zone": gce_zone,
        # This is a subfolder for storing temporary files, like the staged pipeline job.
        "tempLocation": bucket_path + "/tmp/",
    },
}

# Define a DAG (directed acyclic graph) of tasks.
# Any task you create within the context manager is automatically added to the
# DAG object.
with models.DAG(
    # The id you will see in the DAG airflow page
    "composer_dataflow_dag",
    default_args=default_args,
    # The interval with which to schedule the DAG
    schedule_interval=datetime.timedelta(days=1),  # Override to match your needs
) as dag:
    start_template_job = DataflowTemplateOperator(
        # The task id of your job
        task_id="dataflow_operator_transform_csv_to_bq",
        # The name of the template that you're using.
        # Below is a list of all the templates you can use.
        # For versions in non-production environments, use the subfolder 'latest'
        # https://cloud.google.com/dataflow/docs/guides/templates/provided-batch#gcstexttobigquery
        template="gs://dataflow-templates/latest/GCS_Text_to_BigQuery",
        # Use the link above to specify the correct parameters for your template.
        parameters={
            "javascriptTextTransformFunctionName": "transformCSVtoJSON",
            "JSONPath": bucket_path + "/jsonSchema.json",
            "javascriptTextTransformGcsPath": bucket_path + "/transformCSVtoJSON.js",
            "inputFilePattern": bucket_path + "/inputFile.txt",
            "outputTable": project_id + ":average_weather.average_weather",
            "bigQueryLoadingTemporaryDirectory": bucket_path + "/tmp/",
        },
    )

Importer le DAG dans Cloud Storage

Importez votre DAG dans le dossier /dags du dossier bucket. Une fois l'importation terminée, vous pouvez la consulter en cliquant sur le lien Dossier des DAG dans Cloud Composer. Environnements.

Le dossier de DAG de votre environnement contient votre DAG

Afficher l'état de la tâche

  1. Accédez à l'interface Web d'Airflow.
  2. Sur la page des DAG, cliquez sur le nom du DAG (par exemple, composerDataflowDAG).
  3. Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
  4. Vérifiez l'état :

    • Failed: la tâche est entourée d'un cadre rouge. Vous pouvez également placer le pointeur de la souris sur la tâche et rechercher la mention State: Failed (État : Échec).

    • Success: la tâche est entourée d'un cadre vert. Vous pouvez également garder le pointeur de la souris sur la tâche et vérifier État: Réussite.

Après quelques minutes, vous pouvez vérifier les résultats dans Dataflow et dans BigQuery.

Afficher votre tâche dans Dataflow

  1. Dans la console Google Cloud, accédez à la page Dataflow.

    Accéder à Dataflow

  2. Votre job est nommé dataflow_operator_transform_csv_to_bq avec un identifiant unique à la fin du nom par un trait d'union, comme ceci:

    La tâche Dataflow possède un identifiant unique

  3. Cliquez sur le nom pour afficher détails de l'offre d'emploi.

    voir tous les détails du job

Afficher vos résultats dans BigQuery

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Vous pouvez envoyer des requêtes à l'aide du langage SQL standard. Exécutez la requête suivante pour afficher les lignes qui ont été ajoutées à votre table :

    SELECT * FROM projectId.average_weather.average_weather