Pipeline ETL

Architettura

La pipeline ETL è un'architettura per l'esecuzione di pipeline di elaborazione dati in batch con il metodo di estrazione, trasformazione e caricamento. Questa architettura è composta dai seguenti componenti:

  • Google Cloud Storage per i dati dell'origine di destinazione
  • Dataflow per l'esecuzione di trasformazioni sui dati di origine
  • BigQuery come destinazione per i dati trasformati
  • Ambiente Cloud Composer per l'orchestrazione del processo ETL

Inizia

Fai clic sul link seguente a una copia del codice sorgente in Cloud Shell. Da lì, un singolo comando avvierà una copia funzionante dell'applicazione nel tuo progetto.

Apri in Cloud Shell

Visualizza il codice sorgente su GitHub


Componenti della pipeline ETL

L'architettura della pipeline ETL utilizza diversi prodotti. Di seguito sono elencati i componenti, insieme a ulteriori informazioni sui componenti, tra cui link a video correlati, documentazione del prodotto e procedure dettagliate interattive.
Video Documenti Procedure dettagliate
BigQuery BigQuery è un data warehouse serverless, economico e multi-cloud progettato per aiutarti a trasformare i big data in preziosi insight aziendali.
Cloud Composer Un servizio di orchestrazione del flusso di lavoro completamente gestito basato su Apache Airflow.
Cloud Storage Cloud Storage offre archiviazione di file e pubblicazione pubblica di immagini su http.

Script

Lo script di installazione utilizza un eseguibile scritto in go e negli strumenti dell'interfaccia a riga di comando di Terraform per acquisire un progetto vuoto e installare l'applicazione al suo interno. L'output deve essere un'applicazione funzionante e un URL per l'indirizzo IP di bilanciamento del carico.

./main.tf

Attiva i servizi

I servizi Google Cloud sono disabilitati in un progetto per impostazione predefinita. Per utilizzare una qualsiasi delle soluzioni qui riportate, dobbiamo attivare quanto segue:

  • IAM: gestisce identità e accesso per le risorse Google Cloud.
  • Archiviazione: servizio per l'archiviazione e l'accesso ai dati su Google Cloud.
  • Dataflow: servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati.
  • BigQuery: piattaforma dati per creare, gestire, condividere ed eseguire query sui dati
  • Composer: gestisce gli ambienti Apache Airflow su Google Cloud
  • Computing: macchine virtuali e servizi di rete (utilizzati da Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Crea un account di servizio

Crea l'account di servizio che può essere utilizzato da Composer e Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Assegna ruoli

Concede i ruoli necessari all'account di servizio e il ruolo per l'estensione agente di servizio API Cloud Composer v2 all'agente di servizio Cloud Composer (obbligatorio per gli ambienti Composer 2).

variabile "build_roles_list" { description = "L'elenco dei ruoli necessari a Composer e Dataflow" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflows/Agent2", "roles/dataflows/Agent2",

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Crea ambiente Composer

Airflow dipende da molti microservizi da eseguire, quindi Cloud Composer esegue il provisioning dei componenti di Google Cloud per eseguire i flussi di lavoro. Questi componenti sono noti collettivamente come ambiente Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Crea set di dati e tabella BigQuery

Crea un set di dati e una tabella in BigQuery per contenere i dati elaborati e renderli disponibili per l'analisi.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Crea un bucket Cloud Storage e aggiungi file

Crea un bucket di archiviazione per conservare i file necessari per la pipeline, inclusi i dati di origine (inputFile.txt), lo schema di destinazione (jsonSchema.json) e la funzione definita dall'utente#39;utente per la trasformazione (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Carica file DAG

Innanzitutto utilizza un'origine dati per determinare il percorso del bucket Cloud Storage appropriato per aggiungere il file DAG, quindi aggiunge i file DAG al bucket. Il file DAG definisce i flussi di lavoro, le dipendenze e le pianificazioni di Airflow per orchestrare la pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusione

Dopo l'esecuzione, ora dovresti aver configurato un ambiente Composer per eseguire job ETL sui dati presentati nell'esempio. Inoltre, devi disporre di tutto il codice per modificare o estendere questa soluzione in base al tuo ambiente.