Le pipeline ETL est une architecture permettant d'exécuter des pipelines de traitement de données par lot à l'aide de la méthode d'extraction, de transformation et de chargement. Cette architecture comprend les composants suivants:
- Google Cloud Storage pour les données sources
- Dataflow pour effectuer des transformations sur les données sources
- BigQuery comme destination des données transformées
- Environnement Cloud Composer pour orchestrer le processus ETL
Get Started
Cliquez sur le lien suivant pour accéder à une copie du code source dans Cloud Shell. Une seule commande suffit ensuite pour lancer une copie de travail de l'application dans votre projet.
Afficher le code source sur GitHub
Composants du pipeline ETL
L'architecture du pipeline ETL utilise plusieurs produits. Vous trouverez ci-dessous la liste des composants, ainsi que des informations complémentaires, y compris des liens vers des vidéos similaires, la documentation du produit et des tutoriels interactifs.Scripts
Le script d'installation utilise un exécutable écrit dans go
et les outils CLI Terraform pour effectuer un projet vide et y installer l'application. Le résultat doit être une application opérationnelle et une URL pour l'adresse IP d'équilibrage de charge.
./main.tf
Activer les services
Les services Google Cloud sont désactivés par défaut dans un projet. Pour utiliser l'une des solutions, vous devez activer les éléments suivants:
- IAM : gère l'identité et les accès pour les ressources Google Cloud.
- Stockage : service de stockage et d'accès à vos données sur Google Cloud
- Dataflow : service géré permettant d'exécuter une grande variété de modèles de traitement des données
- BigQuery : plate-forme de données pour créer, gérer, partager et interroger des données
- Composer : gère les environnements Apache Airflow sur Google Cloud
- Calcul : machines virtuelles et services de mise en réseau (utilisés par Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Créer un compte de service
Crée un compte de service à utiliser par Composer et Dataflow.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Attribuer des rôles
Accorde les rôles requis au compte de service et le rôle d'extension d'agent de service de l'API Cloud Composer v2 à l'agent de service Cloud Composer (nécessaire pour les environnements Composer 2).
15
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Créer un environnement Composer
Airflow nécessite l'exécution de nombreux microservices. Cloud Composer provisionne donc les composants Google Cloud pour exécuter vos workflows. Ces composants sont collectivement appelés "environnement Cloud Composer".
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Créer un ensemble de données et une table BigQuery
crée un ensemble de données et une table dans BigQuery pour contenir les données traitées et les rendre disponibles pour l'analyse ;
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Créer un bucket Cloud Storage et ajouter des fichiers
Crée un bucket de stockage pour stocker les fichiers nécessaires au pipeline, y compris les données sources (inputFile.txt), le schéma cible (jsonSchema.json) et la fonction définie par l'utilisateur pour la transformation (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Importer un fichier DAG
Elle utilise d'abord une source de données afin de déterminer le chemin d'accès approprié au bucket Cloud Storage pour ajouter le fichier DAG, puis ajoute les fichiers DAG au bucket. Le fichier DAG définit les workflows, les dépendances et les planifications pour qu'Airflow orchestre votre pipeline.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Conclusion
Une fois l'exécution terminée, vous devez disposer d'un environnement Composer configuré pour exécuter des tâches ETL sur les données présentées dans l'exemple. De plus, vous devez disposer de tout le code nécessaire pour modifier ou étendre cette solution en fonction de votre environnement.