Pipeline ETL

Restez organisé à l'aide des collections Enregistrez et classez les contenus selon vos préférences.

Architecture

Le pipeline ETL est une architecture permettant d'exécuter des pipelines de traitement de données par lot à l'aide de la méthode d'extraction, de transformation et de chargement. Cette architecture comprend les composants suivants:

  • Google Cloud Storage pour les données sources
  • Dataflow pour effectuer des transformations sur les données sources
  • BigQuery comme destination des données transformées
  • Environnement Cloud Composer pour orchestrer le processus ETL

Get Started

Cliquez sur le lien suivant pour accéder à une copie du code source dans Cloud Shell. Une seule commande suffit ensuite pour lancer une copie de travail de l'application dans votre projet.

Ouvrir dans Cloud Shell

Afficher le code source sur GitHub


Composants du pipeline ETL

L'architecture du pipeline ETL utilise plusieurs produits. Vous trouverez ci-dessous la liste des composants, ainsi que des informations complémentaires, y compris des liens vers des vidéos similaires, la documentation du produit et des tutoriels interactifs.
Vidéo Docs Tutoriels
BigQuery BigQuery est un entrepôt de données sans serveur, économique et multicloud, conçu pour vous aider à transformer le big data en de précieux insights commerciaux.
Cloud Composer Service d'orchestration de flux de travail entièrement géré conçu à partir d'Apache Airflow
Cloud Storage Cloud Storage permet de stocker des fichiers et de diffuser des images via HTTP(S).

Scripts

Le script d'installation utilise un exécutable écrit dans go et les outils CLI Terraform pour effectuer un projet vide et y installer l'application. Le résultat doit être une application opérationnelle et une URL pour l'adresse IP d'équilibrage de charge.

./main.tf

Activer les services

Les services Google Cloud sont désactivés par défaut dans un projet. Pour utiliser l'une des solutions, vous devez activer les éléments suivants:

  • IAM : gère l'identité et les accès pour les ressources Google Cloud.
  • Stockage : service de stockage et d'accès à vos données sur Google Cloud
  • Dataflow : service géré permettant d'exécuter une grande variété de modèles de traitement des données
  • BigQuery : plate-forme de données pour créer, gérer, partager et interroger des données
  • Composer : gère les environnements Apache Airflow sur Google Cloud
  • Calcul : machines virtuelles et services de mise en réseau (utilisés par Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Créer un compte de service

Crée un compte de service à utiliser par Composer et Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Attribuer des rôles

Accorde les rôles requis au compte de service et le rôle d'extension d'agent de service de l'API Cloud Composer v2 à l'agent de service Cloud Composer (nécessaire pour les environnements Composer 2).

15

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Créer un environnement Composer

Airflow nécessite l'exécution de nombreux microservices. Cloud Composer provisionne donc les composants Google Cloud pour exécuter vos workflows. Ces composants sont collectivement appelés "environnement Cloud Composer".

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Créer un ensemble de données et une table BigQuery

crée un ensemble de données et une table dans BigQuery pour contenir les données traitées et les rendre disponibles pour l'analyse ;

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Créer un bucket Cloud Storage et ajouter des fichiers

Crée un bucket de stockage pour stocker les fichiers nécessaires au pipeline, y compris les données sources (inputFile.txt), le schéma cible (jsonSchema.json) et la fonction définie par l'utilisateur pour la transformation (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Importer un fichier DAG

Elle utilise d'abord une source de données afin de déterminer le chemin d'accès approprié au bucket Cloud Storage pour ajouter le fichier DAG, puis ajoute les fichiers DAG au bucket. Le fichier DAG définit les workflows, les dépendances et les planifications pour qu'Airflow orchestre votre pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusion

Une fois l'exécution terminée, vous devez disposer d'un environnement Composer configuré pour exécuter des tâches ETL sur les données présentées dans l'exemple. De plus, vous devez disposer de tout le code nécessaire pour modifier ou étendre cette solution en fonction de votre environnement.