Eine ETL-Pipeline ist eine Architektur zum Ausführen von Batch-Datenverarbeitungspipelines mit der Methode „Extrahieren, Transformieren, Laden“. Diese Architektur besteht aus den folgenden Komponenten:
- Google Cloud Storage für das Bereitstellen von Quelldaten
- Dataflow zum Ausführen von Transformationen an den Quelldaten
- BigQuery als Ziel für die umgewandelten Daten
- Cloud Composer-Umgebung für die Orchestrierung des ETL-Prozesses
Jetzt starten
Klicken Sie auf den folgenden Link, um eine Kopie des Quellcodes in Cloud Shell aufzurufen. Dort können Sie mit einem einzigen Befehl eine funktionierende Kopie der Anwendung in Ihrem Projekt erstellen.
ETL-Pipeline-Komponenten
Die ETL-Pipeline-Architektur nutzt mehrere Produkte. Im Folgenden finden Sie eine Liste der Komponenten sowie weitere Informationen zu den Komponenten, einschließlich Links zu ähnlichen Videos, Produktdokumentationen und interaktiven Schritt-für-Schritt-Anleitungen.Skripts
Das Installationsskript verwendet eine in go
geschriebene ausführbare Datei und Terraform-Befehlszeilentools, um ein leeres Projekt zu erstellen und die Anwendung darin zu installieren. Die Ausgabe sollte eine funktionierende Anwendung und eine URL für die Load Balancing-IP-Adresse sein.
./main.tf
Dienste aktivieren
Google Cloud-Dienste sind in einem Projekt standardmäßig deaktiviert. Damit Sie eine der folgenden Lösungen verwenden können, müssen Sie Folgendes aktivieren:
- IAM: Verwaltet Identitäten und Zugriff für Google Cloud-Ressourcen
- Speicher: Dienst zum Speichern und Abrufen von Daten in Google Cloud
- Dataflow: Verwalteter Dienst zur Ausführung einer Vielzahl von Datenverarbeitungsmustern
- BigQuery: Datenplattform zum Erstellen, Verwalten, Freigeben und Abfragen von Daten
- Composer: Verwaltet Apache Airflow-Umgebungen in Google Cloud
- Compute: Virtuelle Maschinen und Netzwerkdienste (von Composer verwendet)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Dienstkonto erstellen
Erstellt ein Dienstkonto, das von Composer und Dataflow verwendet wird.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Rollen zuweisen
Erteilt dem Dienstkonto die erforderlichen Rollen und dem Cloud Composer-Dienst-Agent die Rolle „Cloud Composer v2 API Service Agent Extension“ (erforderlich für Composer 2-Umgebungen).
variable "build_roles_list" { description = "Die Liste der Rollen, die für Composer und Dataflow erforderlich sind" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Composer-Umgebung erstellen
Airflow hängt von der Anzahl der ausgeführten Mikrodienste ab. Cloud Composer stellt Google Cloud-Komponenten zum Ausführen Ihrer Workflows bereit. Diese Komponenten werden allgemein als Cloud Composer-Umgebung bezeichnet.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
BigQuery-Dataset und ‑Tabelle erstellen
Erstellt ein Dataset und eine Tabelle in BigQuery, um die verarbeiteten Daten zu speichern und für Analysen verfügbar zu machen.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Cloud Storage-Bucket erstellen und Dateien hinzufügen
Erstellt einen Speicher-Bucket für die für die Pipeline erforderlichen Dateien, einschließlich der Quelldaten (inputFile.txt), des Zielschemas (jsonSchema.json) und der benutzerdefinierten Funktion für die Transformation (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
DAG-Datei hochladen
Zuerst wird anhand einer Datenquelle der richtige Cloud Storage-Bucket-Pfad zum Hinzufügen der DAG-Datei ermittelt und dann werden die DAG-Dateien dem Bucket hinzugefügt. In der DAG-Datei werden die Workflows, Abhängigkeiten und Zeitpläne für Airflow definiert, um Ihre Pipeline zu orchestrieren.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Fazit
Danach sollten Sie eine Composer-Umgebung eingerichtet haben, in der ETL-Jobs für die im Beispiel dargestellten Daten ausgeführt werden können. Außerdem sollten Sie den gesamten Code haben, um diese Lösung an Ihre Umgebung anzupassen oder zu erweitern.