ETL-Pipeline

Architektur

Die ETL-Pipeline ist eine Architektur zum Ausführen von Batch-Datenverarbeitungspipelines mit der Methode zum Extrahieren, Transformieren und Laden. Diese Architektur besteht aus den folgenden Komponenten:

  • Google Cloud Storage zum Abrufen von Quelldaten
  • Dataflow zum Durchführen von Transformationen der Quelldaten
  • BigQuery als Ziel für die transformierten Daten
  • Cloud Composer-Umgebung zur Orchestrierung des ETL-Prozesses

Jetzt starten

Klicken Sie auf den folgenden Link, um den Quellcode in Cloud Shell zu kopieren. Dort wird mit einem einzigen Befehl eine funktionierende Kopie der Anwendung in Ihrem Projekt erstellt.

In Cloud Shell öffnen

Quellcode auf GitHub ansehen


ETL-Pipeline-Komponenten

Die ETL-Pipeline-Architektur nutzt mehrere Produkte. Im Folgenden finden Sie eine Liste der Komponenten sowie weitere Informationen zu den Komponenten, einschließlich Links zu ähnlichen Videos, Produktdokumentationen und interaktiven Schritt-für-Schritt-Anleitungen.
Video Docs Schritt-für-Schritt-Anleitungen
BigQuery BigQuery ist ein serverloses, kosteneffizientes Multi-Cloud-Data-Warehouse, mit dem Sie aus Big Data wertvolle Insights für Ihr Geschäft gewinnen können.
Cloud Composer Vollständig verwalteter, auf Apache Airflow basierender Dienst für die Workflow-Orchestrierung
Cloud Storage Cloud Storage bietet Dateispeicher und öffentliche Bereitstellung von Bildern über HTTP(s).

Skripts

Das Installationsskript verwendet eine ausführbare Datei, die in go und Terraform-Befehlszeilentools geschrieben ist, um ein leeres Projekt zu erstellen und die Anwendung darin zu installieren. Die Ausgabe sollte eine funktionierende Anwendung und eine URL für die Load-Balancing-IP-Adresse sein.

./main.tf

Dienste aktivieren

Google Cloud-Dienste sind in einem Projekt standardmäßig deaktiviert. Damit Sie eine der hier aufgeführten Lösungen verwenden können, müssen Sie Folgendes aktivieren:

  • IAM: Verwaltet die Identität und den Zugriff für Google Cloud-Ressourcen
  • Speicher: Dienst zum Speichern Ihrer Daten in Google Cloud und zum Zugriff auf diese Daten
  • Dataflow – verwalteter Dienst zum Ausführen eines breiten Spektrums an Datenverarbeitungsmustern
  • BigQuery: Datenplattform zum Erstellen, Verwalten, Freigeben und Abfragen von Daten
  • Composer: Verwaltet Apache Airflow-Umgebungen in Google Cloud
  • Computing: Virtuelle Maschinen und Netzwerkdienste (von Composer verwendet)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Dienstkonto erstellen

Erstellt ein Dienstkonto, das von Composer und Dataflow verwendet werden soll.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Rollen zuweisen

Gewährt dem Dienstkonto erforderliche Rollen und dem Cloud Composer-Dienst-Agent die Rolle „Cloud Composer v2 API-Dienst-Agent-Erweiterung“ (erforderlich für Composer 2-Umgebungen).

variable "build_roles_list" { description = "Die Liste der von Composer und Dataflow benötigten Rollen" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/Agent.serviceAgentt.Service" "roles/dataflow.serviceAgentt.Service" "roles/dataflow.serviceAgentt.Service"

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Composer-Umgebung erstellen

Airflow hängt davon ab, dass viele Mikrodienste ausgeführt werden können. Daher stellt Cloud Composer die Google Cloud-Komponenten zum Ausführen Ihrer Workflows bereit. Diese Komponenten werden als Cloud Composer-Umgebung bezeichnet.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

BigQuery-Dataset und -Tabelle erstellen

Erstellt ein Dataset und eine Tabelle in BigQuery, um die verarbeiteten Daten zu speichern und für Analysen verfügbar zu machen.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Cloud Storage-Bucket erstellen und Dateien hinzufügen

Erstellt einen Storage-Bucket zum Speichern der für die Pipeline erforderlichen Dateien, einschließlich der Quelldaten (inputFile.txt), des Zielschemas (jsonSchema.json) und der benutzerdefinierten Funktion für die Transformation (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

DAG-Datei hochladen

Verwendet zuerst eine Datenquelle, um den geeigneten Cloud Storage-Bucket-Pfad zum Hinzufügen der DAG-Datei zu ermitteln, und fügt dann die DAG-Dateien dem Bucket hinzu. Die DAG-Datei definiert die Workflows, Abhängigkeiten und Zeitpläne für Airflow, um Ihre Pipeline zu orchestrieren.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Fazit

Nach der Ausführung sollte eine Composer-Umgebung eingerichtet sein, um ETL-Jobs für die im Beispiel dargestellten Daten auszuführen. Außerdem sollten Sie über den gesamten Code verfügen, um diese Lösung an Ihre Umgebung anzupassen oder zu erweitern.