La pipeline ETL è un'architettura per l'esecuzione di pipeline di elaborazione dei dati batch utilizzando il metodo di estrazione, trasformazione e caricamento. Questa architettura è composta dai seguenti componenti:
- Google Cloud Storage per i dati dell'origine di destinazione
- Dataflow per eseguire trasformazioni sui dati di origine
- BigQuery come destinazione per i dati trasformati
- Ambiente Cloud Composer per orchestrare il processo ETL
Inizia
Fai clic sul seguente link per una copia del codice sorgente in Cloud Shell. Una volta lì, un singolo comando avvierà una copia funzionante dell'applicazione nel tuo progetto.
Visualizza il codice sorgente su GitHub
Componenti della pipeline ETL
L'architettura della pipeline ETL utilizza diversi prodotti. Di seguito sono elencati i componenti, insieme a ulteriori informazioni su di essi, inclusi link a video correlati, documentazione del prodotto e walkthrough interattivi.Script
Lo script di installazione utilizza un file eseguibile scritto in go
e gli strumenti Terraform CLI per
prendere un progetto vuoto e installarvi l'applicazione. L'output deve essere un'applicazione funzionante e un URL per l'indirizzo IP del bilanciamento del carico.
./main.tf
Attiva i servizi
I servizi Google Cloud sono disabilitati in un progetto per impostazione predefinita. Per utilizzare una delle soluzioni riportate di seguito, dobbiamo attivare quanto segue:
- IAM: gestisce l'identità e l'accesso per le risorse Google Cloud
- Archiviazione: servizio per archiviare i dati e accedervi su Google Cloud
- Dataflow: servizio gestito per l'esecuzione di un'ampia varietà di pattern di elaborazione dati
- BigQuery: piattaforma di dati per creare, gestire, condividere ed eseguire query sui dati
- Composer: gestisce gli ambienti Apache Airflow su Google Cloud
- Computing: macchine virtuali e servizi di rete (utilizzati da Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Crea account di servizio
Crea un account di servizio da utilizzare da Composer e Dataflow.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Assegna i ruoli
Concedi i ruoli necessari all'account di servizio e il ruolo Estensione agente di servizio API Cloud Composer v2 all'agente di servizio Cloud Composer (obbligatorio per gli ambienti Composer 2).
variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Crea l'ambiente Composer
Airflow dipende da molti microservizi per funzionare, quindi Cloud Composer esegue il provisioning dei componenti Google Cloud per eseguire i tuoi flussi di lavoro. Questi componenti sono noti collettivamente come ambiente Cloud Composer.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Creare un set di dati e una tabella BigQuery
Crea un set di dati e una tabella in BigQuery per contenere i dati elaborati e renderli disponibili per l'analisi.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Crea un bucket Cloud Storage e aggiungi file
Crea un bucket di archiviazione per contenere i file necessari per la pipeline, inclusi i dati di origine (inputFile.txt), lo schema di destinazione (jsonSchema.json) e la funzione definita dall'utente per la trasformazione (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Carica il file DAG
Innanzitutto utilizza un'origine dati per determinare il percorso del bucket Cloud Storage appropriato per l'aggiunta del file DAG, quindi aggiunge i file DAG al bucket. Il file DAG definisce i flussi di lavoro, le dipendenze e le pianificazioni per consentire ad Airflow di orchestrare la pipeline.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Conclusione
Al termine dell'esecuzione, dovresti avere configurato un ambiente Composer per eseguire job ETL sui dati presentati nell'esempio. Inoltre, dovresti disporre di tutto il codice necessario per modificare o estendere questa soluzione in base al tuo ambiente.