La pipeline ETL è un'architettura per l'esecuzione di pipeline di elaborazione dati in batch mediante il metodo di estrazione, trasformazione e caricamento. Questa architettura è composta dai seguenti componenti:
- Google Cloud Storage per i dati di origine di destinazione.
- Dataflow per eseguire trasformazioni sui dati di origine
- BigQuery come destinazione per i dati trasformati
- Ambiente Cloud Composer per l'orchestrazione del processo ETL
Inizia
Fai clic sul seguente link che rimanda a una copia del codice sorgente in Cloud Shell. Una volta eseguito l'accesso, un singolo comando genererà una copia funzionante dell'applicazione nel tuo progetto.
Visualizza il codice sorgente su GitHub
Componenti della pipeline ETL
L'architettura di pipeline ETL utilizza diversi prodotti. Di seguito sono elencati i componenti, insieme a ulteriori informazioni sui componenti, inclusi i link ai video correlati, la documentazione del prodotto e le procedure dettagliate interattive.Script
Lo script di installazione utilizza un file eseguibile scritto in strumenti go
dell'interfaccia a riga di comando di Terraform e Terraform per eseguire un progetto vuoto e installare l'applicazione al suo interno. L'output dovrebbe essere un'applicazione funzionante e un URL per l'indirizzo IP di bilanciamento del carico.
./main.tf
Attiva i servizi
I servizi Google Cloud sono disabilitati per impostazione predefinita in un progetto. Per poter utilizzare una qualsiasi delle soluzioni qui riportate, dobbiamo attivare quanto segue:
- IAM: gestisce l'identità e l'accesso alle risorse Google Cloud
- Archiviazione: servizio di archiviazione e accesso ai dati su Google Cloud
- Dataflow: servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati
- BigQuery: piattaforma di dati per creare, gestire, condividere ed eseguire query sui dati
- Writer: gestisce gli ambienti Apache Airflow su Google Cloud
- Compute: macchine virtuali e servizi di rete (utilizzati da Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Crea un account di servizio
Crea un account di servizio che deve essere utilizzato da Composer e Dataflow.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Assegna i ruoli
Concede i ruoli necessari all'account di servizio e concede il ruolo Estensione agente di servizio API Cloud Composer v2 all'agente di servizio Cloud Composer (obbligatorio per gli ambienti Composer 2).
variabili "build_roles_list" { description = "L'elenco dei ruoli di cui Composer e Dataflow hanno bisogno.
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Crea ambiente Composer
Airflow dipende da molti microservizi da eseguire, quindi Cloud Composer esegue il provisioning dei componenti Google Cloud per eseguire i tuoi flussi di lavoro. Questi componenti sono noti collettivamente come ambiente Cloud Composer.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Crea set di dati e tabella BigQuery
Crea un set di dati e una tabella in BigQuery per conservare i dati elaborati e renderli disponibili per l'analisi.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Crea un bucket Cloud Storage e aggiungi file
Crea un bucket di archiviazione per contenere i file necessari per la pipeline, inclusi i dati di origine (inputFile.txt), lo schema di destinazione (jsonSchema.json) e la funzione definita dall'utente per la trasformazione (TransformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Carica file DAG
Innanzitutto, utilizza un'origine dati per determinare il percorso del bucket Cloud Storage appropriato per aggiungere il file DAG, quindi aggiunge i file DAG al bucket. Il file DAG definisce i flussi di lavoro, le dipendenze e le pianificazioni per Airflow per orchestrare la pipeline.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Conclusione
Una volta eseguito, dovresti avere un ambiente Composer configurato per eseguire job ETL sui dati presentati nell'esempio. Inoltre, dovresti avere tutto il codice per modificare o estendere questa soluzione in base all'ambiente.