Pipeline ETL

Arsitektur

Pipeline ETL adalah arsitektur untuk menjalankan pipeline pemrosesan data batch menggunakan metode ekstrak, transformasi, dan pemuatan. Arsitektur ini terdiri dari komponen berikut:

  • Google Cloud Storage untuk data sumber landing
  • Dataflow untuk melakukan transformasi pada data sumber
  • BigQuery sebagai tujuan untuk data yang ditransformasi
  • Lingkungan Cloud Composer untuk mengorkestrasi proses ETL

Mulai

Klik link berikut untuk mendapatkan salinan kode sumber di Cloud Shell. Setelah ada, satu perintah akan menjalankan salinan aplikasi yang berfungsi di project Anda..

Buka di Cloud Shell

Lihat kode sumber di GitHub


Komponen Pipeline ETL

Arsitektur Pipeline ETL memanfaatkan beberapa produk. Berikut ini daftar komponen, beserta informasi selengkapnya tentang komponen, termasuk link ke video terkait, dokumentasi produk, dan panduan interaktif.
Video Dokumen Panduan
BigQuery BigQuery adalah data warehouse serverless, hemat biaya, dan multicloud yang dirancang untuk membantu Anda mengubah big data menjadi insight bisnis yang berharga.
Cloud Composer Layanan orkestrasi alur kerja terkelola sepenuhnya yang memanfaatkan Apache Airflow.
Cloud Storage Cloud Storage menyediakan penyimpanan file dan penyajian gambar secara publik melalui http.

Skrip

Skrip penginstalan menggunakan file yang dapat dieksekusi yang ditulis di alat CLI Terraform dan go untuk mengambil project kosong dan menginstal aplikasi di dalamnya. Output harus berupa aplikasi yang berfungsi dan URL untuk alamat IP load balancing.

./main.tf

Aktifkan Layanan

Layanan Google Cloud dinonaktifkan dalam project secara default. Untuk menggunakan salah satu solusi di sini, kita harus mengaktifkan hal berikut:

  • IAM — Mengelola identitas dan akses untuk resource Google Cloud
  • Storage — Layanan untuk menyimpan dan mengakses data Anda di Google Cloud
  • Dataflow — Layanan terkelola untuk menjalankan berbagai pola pemrosesan data
  • BigQuery — Platform data untuk membuat, mengelola, membagikan, dan mengkueri data
  • Composer — Mengelola lingkungan Apache Airflow di Google Cloud
  • Compute — Virtual machine dan Layanan Jaringan (digunakan oleh Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Buat akun layanan

Membuat akun layanan yang akan digunakan oleh Composer dan Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Tetapkan peran

Memberikan peran yang diperlukan ke akun layanan, dan memberikan peran Ekstensi Agen Layanan Cloud Composer v2 ke Agen Layanan Cloud Composer (diperlukan untuk lingkungan Composer 2).

variabel "build_roles_list" { description = "Daftar peran yang diperlukan Composer dan Dataflow" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflowsAgent2"

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Membuat lingkungan Composer

Airflow bergantung pada banyak layanan mikro untuk dijalankan, sehingga Cloud Composer menyediakan komponen Google Cloud untuk menjalankan alur kerja Anda. Komponen-komponen ini secara kolektif dikenal sebagai lingkungan Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Membuat set data dan tabel BigQuery

Membuat set data dan tabel di BigQuery untuk menyimpan data yang diproses dan menyediakannya untuk analisis.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Membuat bucket Cloud Storage dan menambahkan file

Membuat bucket penyimpanan untuk menyimpan file yang diperlukan untuk pipeline, termasuk data sumber (inputFile.txt), skema target (jsonSchema.json), dan fungsi yang ditentukan pengguna untuk transformasi (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Upload file DAG

Pertama-tama, menggunakan sumber data untuk menentukan jalur bucket Cloud Storage yang sesuai guna menambahkan file DAG, lalu menambahkan file DAG ke bucket. File DAG menentukan alur kerja, dependensi, dan jadwal untuk Airflow guna mengorkestrasi pipeline Anda.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Kesimpulan

Setelah dijalankan, sekarang Anda akan menyiapkan Lingkungan Composer untuk menjalankan tugas ETL pada data yang ditampilkan dalam contoh. Selain itu, Anda harus memiliki semua kode untuk dimodifikasi atau memperluas solusi ini agar sesuai dengan lingkungan Anda.