Pipeline ETL

+

Architettura

La pipeline ETL è un'architettura per l'esecuzione di pipeline di elaborazione dei dati batch utilizzando il metodo di estrazione, trasformazione e caricamento. Questa architettura è composta dai seguenti componenti:

  • Google Cloud Storage per i dati dell'origine di destinazione
  • Dataflow per eseguire trasformazioni sui dati di origine
  • BigQuery come destinazione per i dati trasformati
  • Ambiente Cloud Composer per orchestrare il processo ETL

Inizia

Fai clic sul seguente link per una copia del codice sorgente in Cloud Shell. Una volta lì, un singolo comando avvierà una copia funzionante dell'applicazione nel tuo progetto.

Apri in Cloud Shell

Visualizza il codice sorgente su GitHub


Componenti della pipeline ETL

L'architettura della pipeline ETL utilizza diversi prodotti. Di seguito sono elencati i componenti, insieme a ulteriori informazioni su di essi, inclusi link a video correlati, documentazione del prodotto e walkthrough interattivi.
Video Documenti Procedure dettagliate
BigQuery BigQuery è un data warehouse serverless, economico e multi-cloud progettato per aiutarti a trasformare i big data in preziosi insight aziendali.
Cloud Composer Un servizio di orchestrazione del flusso di lavoro completamente gestito basato su Apache Airflow.
Cloud Storage Cloud Storage fornisce archiviazione di file e pubblicazione pubblica di immagini tramite http(s).

Script

Lo script di installazione utilizza un file eseguibile scritto in go e gli strumenti Terraform CLI per prendere un progetto vuoto e installarvi l'applicazione. L'output deve essere un'applicazione funzionante e un URL per l'indirizzo IP del bilanciamento del carico.

./main.tf

Attiva i servizi

I servizi Google Cloud sono disabilitati in un progetto per impostazione predefinita. Per utilizzare una delle soluzioni riportate di seguito, dobbiamo attivare quanto segue:

  • IAM: gestisce l'identità e l'accesso per le risorse Google Cloud
  • Archiviazione: servizio per archiviare i dati e accedervi su Google Cloud
  • Dataflow: servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati
  • BigQuery: piattaforma di dati per creare, gestire, condividere ed eseguire query sui dati
  • Composer: gestisce gli ambienti Apache Airflow su Google Cloud
  • Computing: macchine virtuali e servizi di rete (utilizzati da Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Crea account di servizio

Crea un account di servizio da utilizzare da Composer e Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Assegna i ruoli

Concedi i ruoli necessari all'account di servizio e il ruolo Estensione agente di servizio API Cloud Composer v2 all'agente di servizio Cloud Composer (obbligatorio per gli ambienti Composer 2).

variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Crea l'ambiente Composer

Airflow dipende da molti microservizi per funzionare, quindi Cloud Composer esegue il provisioning dei componenti Google Cloud per eseguire i tuoi flussi di lavoro. Questi componenti sono noti collettivamente come ambiente Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Creare un set di dati e una tabella BigQuery

Crea un set di dati e una tabella in BigQuery per contenere i dati elaborati e renderli disponibili per l'analisi.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Crea un bucket Cloud Storage e aggiungi file

Crea un bucket di archiviazione per contenere i file necessari per la pipeline, inclusi i dati di origine (inputFile.txt), lo schema di destinazione (jsonSchema.json) e la funzione definita dall'utente per la trasformazione (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Carica il file DAG

Innanzitutto utilizza un'origine dati per determinare il percorso del bucket Cloud Storage appropriato per l'aggiunta del file DAG, quindi aggiunge i file DAG al bucket. Il file DAG definisce i flussi di lavoro, le dipendenze e le pianificazioni per consentire ad Airflow di orchestrare la pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusione

Al termine dell'esecuzione, dovresti avere configurato un ambiente Composer per eseguire job ETL sui dati presentati nell'esempio. Inoltre, dovresti disporre di tutto il codice necessario per modificare o estendere questa soluzione in base al tuo ambiente.