Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page explique comment utiliser DataflowTemplateOperator
pour lancer des pipelines Dataflow à partir de Cloud Composer.
Le pipeline de texte Cloud Storage vers BigQuery est un pipeline par lot qui vous permet d'importer des fichiers texte stockés dans Cloud Storage, de les transformer à l'aide de la fonction JavaScript définie par l&#UDF;utilisateur que vous fournissez, et de générer les résultats dans BigQuery.
Présentation
Avant de démarrer le workflow, vous allez créer les entités suivantes:
Une table BigQuery vide à partir d'un ensemble de données vide qui contient les colonnes d'informations suivantes :
location
,average_temperature
,month
et, éventuellement,inches_of_rain
,is_current
etlatest_measurement
.Fichier JSON qui normalisera les données du fichier
.txt
au format approprié pour le schéma de la table BigQuery. L'objet JSON comportera un tableauBigQuery Schema
, où chaque objet contiendra un nom de colonne, un type d'entrée et une indication s'il s'agit d'un champ obligatoire ou non.Un fichier d'entrée
.txt
qui contiendra les données à importer par lot dans la table BigQueryUne fonction définie par l'utilisateur écrite en JavaScript qui transforme chaque ligne du fichier
.txt
en variables pertinentes pour notre table.Fichier DAG Airflow qui pointe vers l'emplacement de ces fichiers.
Vous allez ensuite importer les fichiers
.txt
, le fichier UDF.js
et le fichier de schéma.json
dans un bucket Cloud Storage. Vous importerez également le DAG dans votre environnement Cloud Composer.Une fois le DAG importé, Airflow exécute une tâche à partir de celui-ci. Cette tâche lance un pipeline Dataflow qui applique la fonction définie par l'utilisateur au fichier
.txt
et la met en forme conformément au schéma JSON.Enfin, les données sont importées dans la table BigQuery que vous avez créée précédemment.
Avant de commencer
- Ce guide requiert une bonne connaissance de JavaScript pour écrire la fonction définie par l'utilisateur.
- Dans ce guide, nous partons du principe que vous disposez déjà d'un environnement Cloud Composer. Consultez Créer un environnement pour en créer un. Vous pouvez utiliser n'importe quelle version de Cloud Composer dans ce guide.
-
Activer les API Cloud Composer, Dataflow, Cloud Storage, BigQuery.
Créer une table BigQuery vide avec une définition de schéma
Créer une table BigQuery avec une définition de schéma Vous utiliserez cette définition de schéma plus loin dans ce guide. Cette table BigQuery contient les résultats de l'importation groupée.
Pour créer une table vide avec une définition de schéma :
Console
Dans la console Google Cloud, accédez à la page BigQuery:
Dans le panneau de navigation, dans la section Ressources, développez votre projet.
Dans le panneau des détails, cliquez sur Create dataset (Créer un ensemble de données).
Dans la section ID de l'ensemble de données de la page "Créer un ensemble de données", nommez l'ensemble de données
average_weather
. Conservez l'état par défaut de tous les autres champs.Cliquez sur Créer un ensemble de données.
Revenez au panneau de navigation, dans la section Ressources, développez votre projet. Cliquez ensuite sur l'ensemble de données
average_weather
.Dans le panneau de détails, cliquez sur Create table (Créer une table).
Dans la section Source de la page Créer une table, sélectionnez Table vide.
Dans la section Destination de la page Créer une table :
Dans le champ Dataset name (Nom de l'ensemble de données), sélectionnez l'ensemble de données
average_weather
.Dans le champ Table name (Nom de la table), saisissez le nom
average_weather
.Vérifiez que Type de table est défini sur Table native.
Dans la section Schema (Schéma), saisissez la définition du schéma. Vous pouvez utiliser l'une des approches suivantes:
Saisissez les informations de schéma manuellement en activant l'option Modifier sous forme de texte et en saisissant le schéma de la table sous forme de tableau JSON. Saisissez-le dans les champs suivants:
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
Utilisez l'option Ajouter un champ pour saisir manuellement le schéma:
Dans le champ Paramètres de partitionnement et de clustering, conservez la valeur par défaut :
No partitioning
.Dans la section Options avancées, pour Chiffrement, conservez la valeur par défaut
Google-managed key
.Cliquez sur Créer une table.
bq
Utilisez la commande bq mk
pour créer un ensemble de données vide et une table dans cet ensemble de données.
Exécutez la commande suivante pour créer un ensemble de données sur la météo moyenne mondiale:
bq --location=LOCATION mk \
--dataset PROJECT_ID:average_weather
Remplacez les éléments suivants :
LOCATION
: région dans laquelle se trouve l'environnement.PROJECT_ID
: ID du projet
Exécutez la commande suivante pour créer une table vide dans cet ensemble de données avec la définition de schéma:
bq mk --table \
PROJECT_ID:average_weather.average_weather \
location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
Une fois la table créée, vous pouvez mettre à jour son délai d'expiration, sa description et ses étiquettes. Vous pouvez également modifier la définition du schéma.
Python
Enregistrez ce code sous le nom dataflowtemplateoperator_create_dataset_and_table_helper.py
et mettez à jour les variables qu'il contient pour refléter votre projet et votre emplacement, puis exécutez-le à l'aide de la commande suivante:
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Créer un bucket Cloud Storage
Créez un bucket pour stocker tous les fichiers nécessaires au workflow. Le DAG que vous créerez plus loin dans ce guide référencera les fichiers que vous importerez dans ce bucket de stockage. Pour créer un bucket de stockage, procédez comme suit :
Console
Ouvrez Cloud Storage dans la console Google Cloud.
Cliquez sur Créer un bucket pour ouvrir le formulaire de création de bucket.
Saisissez les informations concernant votre bucket et cliquez sur Continuer à chaque étape :
Spécifiez un nom unique pour le bucket. Ce guide utilise
bucketName
comme exemple.Sélectionnez Régional comme type d'emplacement. Ensuite, sélectionnez l'emplacement où stocker les données du bucket.
Sélectionnez Standard comme classe de stockage par défaut pour vos données.
Sélectionnez Uniforme pour le contrôle d'accès à vos objets.
Cliquez sur OK.
gsutil
Exécutez la commande gsutil mb
:
gsutil mb gs://bucketName/
Remplacez les éléments suivants :
bucketName
: nom du bucket que vous avez créé précédemment dans ce guide.
Exemples de code
C#
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Go
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Java
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Python
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Ruby
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Créer un schéma BigQuery au format JSON pour votre table de sortie
Créez un fichier de schéma BigQuery au format JSON correspondant à la table de sortie que vous avez créée précédemment. Notez que les noms, types et modes de champs doivent correspondre à ceux définis précédemment dans votre schéma de table BigQuery. Ce fichier normalisera les données de votre fichier .txt
dans un format compatible avec votre schéma BigQuery. Nommez ce fichier jsonSchema.json
.
{
"BigQuery Schema": [
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC"
},
{
"name": "is_current",
"type": "BOOLEAN"
},
{
"name": "latest_measurement",
"type": "DATE"
}]
}
Créer un fichier JavaScript pour mettre en forme vos données
Dans ce fichier, vous allez définir votre fonction définie par l'utilisateur (UDF), qui fournit la logique pour transformer les lignes de texte de votre fichier d'entrée. Notez que cette fonction utilise chaque ligne de texte de votre fichier d'entrée comme son propre argument. Elle sera donc exécutée une fois pour chaque ligne de votre fichier d'entrée. Nommez ce fichier transformCSVtoJSON.js
.
Créer votre fichier d'entrée
Ce fichier contiendra les informations que vous souhaitez importer dans votre table BigQuery. Copiez ce fichier localement et nommez-le inputFile.txt
.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16
POINT(41.8781 87.6298),23,'October',13,false,2015-02-13
POINT(48.8566 2.3522),80,'December',null,true,null
POINT(6.5244 3.3792),15,'March',14,true,null
Importer des fichiers dans votre bucket
Importez les fichiers suivants dans le bucket Cloud Storage que vous avez créé précédemment:
- Schéma BigQuery au format JSON (
.json
) - Fonction définie par l'utilisateur JavaScript (
transformCSVtoJSON.js
) Fichier d'entrée du texte à traiter (
.txt
)
Console
- Dans la console Google Cloud, accédez à la page Buckets Cloud Storage.
Dans la liste des buckets, cliquez sur votre bucket.
Dans l'onglet Objets du bucket, effectuez l'une des opérations suivantes:
Effectuez un glisser-déposer des fichiers souhaités depuis votre bureau ou votre gestionnaire de fichiers vers le volet principal de la console Google Cloud.
Cliquez sur le bouton Importer des fichiers, sélectionnez les fichiers que vous souhaitez importer dans la boîte de dialogue qui s'affiche, puis cliquez sur Ouvrir.
gsutil
Exécutez la commande gsutil cp
:
gsutil cp OBJECT_LOCATION gs://bucketName
Remplacez les éléments suivants :
bucketName
: nom du bucket que vous avez créé précédemment dans ce guide.OBJECT_LOCATION
: chemin d'accès local à votre objet. Exemple :Desktop/transformCSVtoJSON.js
.
Exemples de code
Python
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Ruby
Pour vous authentifier auprès de Cloud Composer, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.
Configurer DataflowTemplateOperator
Avant d'exécuter le DAG, définissez les variables Airflow suivantes.
Variable Airflow | Valeur |
---|---|
project_id
|
L'ID du projet |
gce_zone
|
Zone Compute Engine dans laquelle le cluster Dataflow doit être créé |
bucket_path
|
L'emplacement du bucket Cloud Storage que vous avez créé précédemment |
Vous allez maintenant référencer les fichiers que vous avez créés précédemment pour créer un DAG qui lance le workflow Dataflow. Copiez ce DAG et enregistrez-le localement sous le nom composer-dataflow-dag.py
.
Airflow 2
Airflow 1
Importer le DAG dans Cloud Storage
Importez votre DAG dans le dossier /dags
du bucket de votre environnement. Une fois l'importation terminée, vous pouvez la voir en cliquant sur le lien Dossier des DAG de la page "Environnements Cloud Composer".
Afficher l'état de la tâche
- Accédez à l'interface Web d'Airflow.
- Sur la page des DAG, cliquez sur le nom du DAG (par exemple,
composerDataflowDAG
). - Sur la page "Détails des DAG", cliquez sur Graph View (Vue graphique).
Vérifiez l'état :
Failed
: la tâche est entourée d'un cadre rouge. Vous pouvez également placer le pointeur de la souris sur la tâche et rechercher la mention State: Failed (État : Échec).Success
: la tâche est entourée d'un cadre vert. Vous pouvez également maintenir le pointeur sur la tâche et vérifier si l'état State: Success (État : Réussite) apparaît.
Après quelques minutes, vous pouvez vérifier les résultats dans Dataflow et BigQuery.
Afficher votre tâche dans Dataflow
Dans la console Google Cloud, accédez à la page Dataflow.
Votre tâche est nommée
dataflow_operator_transform_csv_to_bq
avec un ID unique joint à la fin du nom par un trait d'union, comme ceci:Cliquez sur le nom pour afficher les détails de la tâche.
Afficher vos résultats dans BigQuery
Dans la console Google Cloud, accédez à la page BigQuery.
Vous pouvez envoyer des requêtes à l'aide du langage SQL standard. Exécutez la requête suivante pour afficher les lignes qui ont été ajoutées à votre table :
SELECT * FROM projectId.average_weather.average_weather