ETL-Pipeline

Mit Sammlungen den Überblick behalten Sie können Inhalte basierend auf Ihren Einstellungen speichern und kategorisieren.

Architektur

ETL Pipeline ist eine Architektur zum Ausführen von Batch-Datenverarbeitungspipelines, die die Methode Extrahieren, Transformieren und Laden verwenden. Diese Architektur besteht aus den folgenden Komponenten:

  • Google Cloud Storage für Quelldaten in der Landingpage
  • Dataflow zum Ausführen der Transformationen an den Quelldaten
  • BigQuery als Ziel für die transformierten Daten
  • Cloud Composer-Umgebung zum Orchestrieren des ETL-Prozesses

Jetzt starten

Klicken Sie auf den folgenden Link, um eine Kopie des Quellcodes in Cloud Shell zu erhalten. Dort wird mit einem einzigen Befehl eine funktionierende Kopie der Anwendung in Ihrem Projekt erstellt.

In Cloud Shell öffnen

Quellcode auf GitHub ansehen


ETL-Pipelinekomponenten

In der ETL-Pipeline-Architektur werden mehrere Produkte verwendet. Im Folgenden sind die Komponenten und weitere Informationen zu den Komponenten aufgeführt, darunter Links zu ähnlichen Videos, Produktdokumentationen und interaktiven Schritt-für-Schritt-Anleitungen.
Video Docs Schritt-für-Schritt-Anleitungen
BigQuery BigQuery ist ein serverloses, kosteneffizientes Multi-Cloud-Data-Warehouse, mit dem Sie wertvolle Geschäftsinformationen aus Big Data gewinnen können.
Cloud Composer Vollständig verwalteter, auf Apache Airflow basierender Dienst für die Workflow-Orchestrierung
Cloud Storage Cloud Storage bietet Dateispeicher und öffentliche Bereitstellung von Bildern über HTTP(s).

Skripts

Das Installationsskript verwendet eine ausführbare Datei, die in go und den Terraform-Befehlszeilentools geschrieben wurde, um ein leeres Projekt zu übernehmen und die Anwendung darin zu installieren. Die Ausgabe sollte eine funktionierende Anwendung und eine URL für die IP-Adresse des Load-Balancings sein.

./main.tf

Dienste aktivieren

Google Cloud-Dienste sind in einem Projekt standardmäßig deaktiviert. Zur Verwendung einer dieser Lösungen müssen wir Folgendes aktivieren:

  • IAM – verwaltet die Identität und den Zugriff für Google Cloud-Ressourcen
  • Speicher: Dienst zum Speichern und Abrufen Ihrer Daten in Google Cloud
  • Dataflow: Verwalteter Dienst zum Ausführen einer Vielzahl von Datenverarbeitungsmustern
  • BigQuery: Datenplattform zum Erstellen, Verwalten, Freigeben und Abfragen von Daten
  • Composer – Verwaltet Apache Airflow-Umgebungen in Google Cloud
  • Computing: Virtuelle Maschinen und Netzwerkdienste (von Composer verwendet)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Dienstkonto erstellen

Erstellt ein Dienstkonto, das von Composer und Dataflow verwendet werden soll.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Rollen zuweisen

Gewährt dem Dienstkonto erforderliche Rollen und dem Cloud Composer-Dienst-Agent die Rolle „Dienst-Agent“ von Cloud Composer v2 (erforderlich für Composer 2-Umgebungen).

variable "build_roles_list" { description = " die Liste der Rollen, die Composer und Dataflow benötigt" type = list(string) default = [ "roles/composer.worker", "roles/Dataflow.admin", "roles/Dataflow.quot;quo;quot.quo und Quot&quot.quo;quot;quot&quot

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Composer-Umgebung erstellen

Airflow ist für die Ausführung von vielen Mikrodiensten erforderlich. Cloud Composer stellt daher die Google Cloud-Komponenten zum Ausführen Ihrer Workflows bereit. Diese Komponenten werden zusammen als Cloud Composer-Umgebung bezeichnet.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

BigQuery-Dataset und -Tabelle erstellen

Erstellt ein Dataset und eine Tabelle in BigQuery, um die verarbeiteten Daten zu speichern und für Analysen verfügbar zu machen.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Cloud Storage-Bucket erstellen und Dateien hinzufügen

Erstellt einen Storage-Bucket, in dem die für die Pipeline erforderlichen Dateien gespeichert werden, einschließlich der Quelldaten (inputFile.txt), des Zielschemas (jsonSchema.json) und der benutzerdefinierten Funktion für die Transformation (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

DAG-Datei hochladen

Zuerst wird anhand einer Datenquelle der entsprechende Cloud Storage-Bucket-Pfad zum Hinzufügen der DAG-Datei ermittelt. Anschließend werden die DAG-Dateien dem Bucket hinzugefügt. Die DAG-Datei definiert die Workflows, Abhängigkeiten und Zeitpläne für Airflow zur Orchestrierung Ihrer Pipeline.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Fazit

Nach der Ausführung sollten Sie eine Composer-Umgebung eingerichtet haben, um ETL-Jobs für die im Beispiel gezeigten Daten auszuführen. Außerdem sollten Sie den gesamten Code haben, um diese Lösung an Ihre Umgebung anzupassen.