Pipeline ETL

+

Architecture

Un pipeline ETL est une architecture permettant d'exécuter des pipelines de traitement des données par lot à l'aide de la méthode d'extraction, de transformation et de chargement. Cette architecture se compose des composants suivants:

  • Google Cloud Storage pour les données de source de destination
  • Dataflow pour effectuer des transformations sur les données sources
  • BigQuery comme destination des données transformées
  • Environnement Cloud Composer pour orchestrer le processus ETL

Premiers pas

Cliquez sur le lien suivant pour obtenir une copie du code source dans Cloud Shell. Une seule commande permet de démarrer une copie fonctionnelle de l'application dans votre projet.

Ouvrir dans Cloud Shell

Afficher le code source sur GitHub


Composants du pipeline ETL

L'architecture du pipeline ETL utilise plusieurs produits. Vous trouverez ci-dessous la liste des composants, ainsi que des informations supplémentaires à leur sujet, y compris des liens vers des vidéos, des documentations produit et des tutoriels interactifs associés.
Vidéo Docs Tutoriels
BigQuery BigQuery est un entrepôt de données sans serveur, économique et multicloud, conçu pour vous aider à transformer le big data en de précieux insights commerciaux.
Cloud Composer Service d'orchestration de flux de travail entièrement géré conçu à partir d'Apache Airflow
Cloud Storage Cloud Storage permet de stocker des fichiers et de diffuser des images publiquement via HTTP(S).

Scripts

Le script d'installation utilise un exécutable écrit en go et des outils de la CLI Terraform pour prendre un projet vide et y installer l'application. La sortie doit être une application fonctionnelle et une URL pour l'adresse IP d'équilibrage de charge.

./main.tf

Activer les services

Les services Google Cloud sont désactivés par défaut dans un projet. Pour utiliser l'une des solutions ci-dessous, vous devez activer les éléments suivants:

  • IAM : gère l'identité et les accès aux ressources Google Cloud
  • Storage : service permettant de stocker vos données sur Google Cloud et d'y accéder
  • Dataflow : service géré permettant d'exécuter une grande variété de modèles de traitement des données
  • BigQuery : plate-forme de données permettant de créer, de gérer, de partager et d'interroger des données
  • Composer : gère les environnements Apache Airflow sur Google Cloud
  • Compute : machines virtuelles et services réseau (utilisés par Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Créer un compte de service

Crée un compte de service à utiliser par Composer et Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Attribuer des rôles

Accorde les rôles nécessaires au compte de service et accorde le rôle Extension de l'agent de service de l'API Cloud Composer v2 à l'agent de service Cloud Composer (requis pour les environnements Composer 2).

variable "build_roles_list" { description = "La liste des rôles dont Composer et Dataflow ont besoin" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Créer un environnement Composer

Airflow dépend de l'exécution de nombreux microservices. Par conséquent, Cloud Composer provisionne les composants Google Cloud pour exécuter vos workflows. Ces composants sont collectivement désignés sous le nom d'environnement Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Créer un ensemble de données et une table BigQuery

Crée un ensemble de données et une table dans BigQuery pour stocker les données traitées et les rendre disponibles pour l'analyse.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Créer un bucket Cloud Storage et ajouter des fichiers

Crée un bucket de stockage pour contenir les fichiers nécessaires au pipeline, y compris les données sources (inputFile.txt), le schéma cible (jsonSchema.json) et la fonction définie par l'utilisateur pour la transformation (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Importer le fichier DAG

Il utilise d'abord une source de données pour déterminer le chemin d'accès au bucket Cloud Storage approprié pour ajouter le fichier DAG, puis ajoute les fichiers DAG au bucket. Le fichier DAG définit les workflows, les dépendances et les planifications permettant à Airflow d'orchestrer votre pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusion

Une fois l'exécution terminée, vous devriez disposer d'un environnement Composer configuré pour exécuter des tâches ETL sur les données présentées dans l'exemple. De plus, vous devez disposer de tout le code nécessaire pour modifier ou étendre cette solution en fonction de votre environnement.