Un pipeline ETL est une architecture permettant d'exécuter des pipelines de traitement des données par lot à l'aide de la méthode d'extraction, de transformation et de chargement. Cette architecture se compose des composants suivants:
- Google Cloud Storage pour les données de source de destination
- Dataflow pour effectuer des transformations sur les données sources
- BigQuery comme destination des données transformées
- Environnement Cloud Composer pour orchestrer le processus ETL
Premiers pas
Cliquez sur le lien suivant pour obtenir une copie du code source dans Cloud Shell. Une seule commande permet de démarrer une copie fonctionnelle de l'application dans votre projet.
Afficher le code source sur GitHub
Composants du pipeline ETL
L'architecture du pipeline ETL utilise plusieurs produits. Vous trouverez ci-dessous la liste des composants, ainsi que des informations supplémentaires à leur sujet, y compris des liens vers des vidéos, des documentations produit et des tutoriels interactifs associés.Scripts
Le script d'installation utilise un exécutable écrit en go
et des outils de la CLI Terraform pour prendre un projet vide et y installer l'application. La sortie doit être une application fonctionnelle et une URL pour l'adresse IP d'équilibrage de charge.
./main.tf
Activer les services
Les services Google Cloud sont désactivés par défaut dans un projet. Pour utiliser l'une des solutions ci-dessous, vous devez activer les éléments suivants:
- IAM : gère l'identité et les accès aux ressources Google Cloud
- Storage : service permettant de stocker vos données sur Google Cloud et d'y accéder
- Dataflow : service géré permettant d'exécuter une grande variété de modèles de traitement des données
- BigQuery : plate-forme de données permettant de créer, de gérer, de partager et d'interroger des données
- Composer : gère les environnements Apache Airflow sur Google Cloud
- Compute : machines virtuelles et services réseau (utilisés par Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Créer un compte de service
Crée un compte de service à utiliser par Composer et Dataflow.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Attribuer des rôles
Accorde les rôles nécessaires au compte de service et accorde le rôle Extension de l'agent de service de l'API Cloud Composer v2 à l'agent de service Cloud Composer (requis pour les environnements Composer 2).
variable "build_roles_list" { description = "La liste des rôles dont Composer et Dataflow ont besoin" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Créer un environnement Composer
Airflow dépend de l'exécution de nombreux microservices. Par conséquent, Cloud Composer provisionne les composants Google Cloud pour exécuter vos workflows. Ces composants sont collectivement désignés sous le nom d'environnement Cloud Composer.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Créer un ensemble de données et une table BigQuery
Crée un ensemble de données et une table dans BigQuery pour stocker les données traitées et les rendre disponibles pour l'analyse.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Créer un bucket Cloud Storage et ajouter des fichiers
Crée un bucket de stockage pour contenir les fichiers nécessaires au pipeline, y compris les données sources (inputFile.txt), le schéma cible (jsonSchema.json) et la fonction définie par l'utilisateur pour la transformation (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Importer le fichier DAG
Il utilise d'abord une source de données pour déterminer le chemin d'accès au bucket Cloud Storage approprié pour ajouter le fichier DAG, puis ajoute les fichiers DAG au bucket. Le fichier DAG définit les workflows, les dépendances et les planifications permettant à Airflow d'orchestrer votre pipeline.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Conclusion
Une fois l'exécution terminée, vous devriez disposer d'un environnement Composer configuré pour exécuter des tâches ETL sur les données présentées dans l'exemple. De plus, vous devez disposer de tout le code nécessaire pour modifier ou étendre cette solution en fonction de votre environnement.