Pipeline ETL

Mantieni tutto organizzato con le raccolte Salva e classifica i contenuti in base alle tue preferenze.

Architettura

La pipeline ETL è un'architettura per l'esecuzione di pipeline di elaborazione dati in batch mediante il metodo di estrazione, trasformazione e caricamento. Questa architettura è composta dai seguenti componenti:

  • Google Cloud Storage per i dati di origine di destinazione.
  • Dataflow per eseguire trasformazioni sui dati di origine
  • BigQuery come destinazione per i dati trasformati
  • Ambiente Cloud Composer per l'orchestrazione del processo ETL

Inizia

Fai clic sul seguente link che rimanda a una copia del codice sorgente in Cloud Shell. Una volta eseguito l'accesso, un singolo comando genererà una copia funzionante dell'applicazione nel tuo progetto.

Apri in Cloud Shell

Visualizza il codice sorgente su GitHub


Componenti della pipeline ETL

L'architettura di pipeline ETL utilizza diversi prodotti. Di seguito sono elencati i componenti, insieme a ulteriori informazioni sui componenti, inclusi i link ai video correlati, la documentazione del prodotto e le procedure dettagliate interattive.
Video Documenti Procedure dettagliate
BigQuery BigQuery è un data warehouse serverless, economico e multi-cloud progettato per aiutarti a trasformare i big data in preziosi insight aziendali.
Cloud Composer Un servizio di orchestrazione del flusso di lavoro completamente gestito basato su Apache Airflow.
Cloud Storage Cloud Storage offre archiviazione di file e pubblicazione pubblica di immagini su http(s).

Script

Lo script di installazione utilizza un file eseguibile scritto in strumenti go dell'interfaccia a riga di comando di Terraform e Terraform per eseguire un progetto vuoto e installare l'applicazione al suo interno. L'output dovrebbe essere un'applicazione funzionante e un URL per l'indirizzo IP di bilanciamento del carico.

./main.tf

Attiva i servizi

I servizi Google Cloud sono disabilitati per impostazione predefinita in un progetto. Per poter utilizzare una qualsiasi delle soluzioni qui riportate, dobbiamo attivare quanto segue:

  • IAM: gestisce l'identità e l'accesso alle risorse Google Cloud
  • Archiviazione: servizio di archiviazione e accesso ai dati su Google Cloud
  • Dataflow: servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati
  • BigQuery: piattaforma di dati per creare, gestire, condividere ed eseguire query sui dati
  • Writer: gestisce gli ambienti Apache Airflow su Google Cloud
  • Compute: macchine virtuali e servizi di rete (utilizzati da Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Crea un account di servizio

Crea un account di servizio che deve essere utilizzato da Composer e Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Assegna i ruoli

Concede i ruoli necessari all'account di servizio e concede il ruolo Estensione agente di servizio API Cloud Composer v2 all'agente di servizio Cloud Composer (obbligatorio per gli ambienti Composer 2).

variabili "build_roles_list" { description = "L'elenco dei ruoli di cui Composer e Dataflow hanno bisogno.

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Crea ambiente Composer

Airflow dipende da molti microservizi da eseguire, quindi Cloud Composer esegue il provisioning dei componenti Google Cloud per eseguire i tuoi flussi di lavoro. Questi componenti sono noti collettivamente come ambiente Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Crea set di dati e tabella BigQuery

Crea un set di dati e una tabella in BigQuery per conservare i dati elaborati e renderli disponibili per l'analisi.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Crea un bucket Cloud Storage e aggiungi file

Crea un bucket di archiviazione per contenere i file necessari per la pipeline, inclusi i dati di origine (inputFile.txt), lo schema di destinazione (jsonSchema.json) e la funzione definita dall'utente per la trasformazione (TransformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Carica file DAG

Innanzitutto, utilizza un'origine dati per determinare il percorso del bucket Cloud Storage appropriato per aggiungere il file DAG, quindi aggiunge i file DAG al bucket. Il file DAG definisce i flussi di lavoro, le dipendenze e le pianificazioni per Airflow per orchestrare la pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusione

Una volta eseguito, dovresti avere un ambiente Composer configurato per eseguire job ETL sui dati presentati nell'esempio. Inoltre, dovresti avere tutto il codice per modificare o estendere questa soluzione in base all'ambiente.