Die ETL-Pipeline ist eine Architektur zum Ausführen von Batch-Datenverarbeitungspipelines mit der Methode zum Extrahieren, Transformieren und Laden. Diese Architektur besteht aus den folgenden Komponenten:
- Google Cloud Storage zum Abrufen von Quelldaten
- Dataflow zum Durchführen von Transformationen der Quelldaten
- BigQuery als Ziel für die transformierten Daten
- Cloud Composer-Umgebung zur Orchestrierung des ETL-Prozesses
Jetzt starten
Klicken Sie auf den folgenden Link, um den Quellcode in Cloud Shell zu kopieren. Dort wird mit einem einzigen Befehl eine funktionierende Kopie der Anwendung in Ihrem Projekt erstellt.
ETL-Pipeline-Komponenten
Die ETL-Pipeline-Architektur nutzt mehrere Produkte. Im Folgenden finden Sie eine Liste der Komponenten sowie weitere Informationen zu den Komponenten, einschließlich Links zu ähnlichen Videos, Produktdokumentationen und interaktiven Schritt-für-Schritt-Anleitungen.Skripts
Das Installationsskript verwendet eine ausführbare Datei, die in go
und Terraform-Befehlszeilentools geschrieben ist, um ein leeres Projekt zu erstellen und die Anwendung darin zu installieren. Die Ausgabe sollte eine funktionierende Anwendung und eine URL für die Load-Balancing-IP-Adresse sein.
./main.tf
Dienste aktivieren
Google Cloud-Dienste sind in einem Projekt standardmäßig deaktiviert. Damit Sie eine der hier aufgeführten Lösungen verwenden können, müssen Sie Folgendes aktivieren:
- IAM: Verwaltet die Identität und den Zugriff für Google Cloud-Ressourcen
- Speicher: Dienst zum Speichern Ihrer Daten in Google Cloud und zum Zugriff auf diese Daten
- Dataflow – verwalteter Dienst zum Ausführen eines breiten Spektrums an Datenverarbeitungsmustern
- BigQuery: Datenplattform zum Erstellen, Verwalten, Freigeben und Abfragen von Daten
- Composer: Verwaltet Apache Airflow-Umgebungen in Google Cloud
- Computing: Virtuelle Maschinen und Netzwerkdienste (von Composer verwendet)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Dienstkonto erstellen
Erstellt ein Dienstkonto, das von Composer und Dataflow verwendet werden soll.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Rollen zuweisen
Gewährt dem Dienstkonto erforderliche Rollen und dem Cloud Composer-Dienst-Agent die Rolle „Cloud Composer v2 API-Dienst-Agent-Erweiterung“ (erforderlich für Composer 2-Umgebungen).
variable "build_roles_list" { description = "Die Liste der von Composer und Dataflow benötigten Rollen" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/Agent.serviceAgentt.Service" "roles/dataflow.serviceAgentt.Service" "roles/dataflow.serviceAgentt.Service"
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Composer-Umgebung erstellen
Airflow hängt davon ab, dass viele Mikrodienste ausgeführt werden können. Daher stellt Cloud Composer die Google Cloud-Komponenten zum Ausführen Ihrer Workflows bereit. Diese Komponenten werden als Cloud Composer-Umgebung bezeichnet.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
BigQuery-Dataset und -Tabelle erstellen
Erstellt ein Dataset und eine Tabelle in BigQuery, um die verarbeiteten Daten zu speichern und für Analysen verfügbar zu machen.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Cloud Storage-Bucket erstellen und Dateien hinzufügen
Erstellt einen Storage-Bucket zum Speichern der für die Pipeline erforderlichen Dateien, einschließlich der Quelldaten (inputFile.txt), des Zielschemas (jsonSchema.json) und der benutzerdefinierten Funktion für die Transformation (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
DAG-Datei hochladen
Verwendet zuerst eine Datenquelle, um den geeigneten Cloud Storage-Bucket-Pfad zum Hinzufügen der DAG-Datei zu ermitteln, und fügt dann die DAG-Dateien dem Bucket hinzu. Die DAG-Datei definiert die Workflows, Abhängigkeiten und Zeitpläne für Airflow, um Ihre Pipeline zu orchestrieren.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Fazit
Nach der Ausführung sollte eine Composer-Umgebung eingerichtet sein, um ETL-Jobs für die im Beispiel dargestellten Daten auszuführen. Außerdem sollten Sie über den gesamten Code verfügen, um diese Lösung an Ihre Umgebung anzupassen oder zu erweitern.