Pipeline ETL

+

Arsitektur

Pipeline ETL adalah arsitektur untuk menjalankan pipeline pemrosesan data batch menggunakan metode ekstrak, transformasi, pemuatan. Arsitektur ini terdiri dari komponen berikut:

  • Google Cloud Storage untuk data sumber landing
  • Dataflow untuk melakukan transformasi pada data sumber
  • BigQuery sebagai tujuan untuk data yang ditransformasi
  • Lingkungan Cloud Composer untuk mengatur proses ETL

Mulai

Klik link berikut untuk melihat salinan kode sumber di Cloud Shell. Setelah di sana, satu perintah akan membuat salinan aplikasi yang berfungsi di project Anda.

Buka di Cloud Shell

Melihat kode sumber di GitHub


Komponen Pipeline ETL

Arsitektur Pipeline ETL menggunakan beberapa produk. Berikut adalah daftar komponen, beserta informasi selengkapnya tentang komponen tersebut, termasuk link ke video terkait, dokumentasi produk, dan panduan interaktif.
Video Dokumen Panduan
BigQuery BigQuery adalah data warehouse serverless, hemat biaya, dan multicloud yang dirancang untuk membantu Anda mengubah big data menjadi insight bisnis yang berharga.
Cloud Composer Layanan orkestrasi alur kerja terkelola sepenuhnya yang memanfaatkan Apache Airflow.
Cloud Storage Cloud Storage menyediakan penyimpanan file dan penayangan gambar secara publik melalui http(s).

Skrip

Skrip penginstalan menggunakan file yang dapat dieksekusi yang ditulis di go dan alat Terraform CLI untuk mengambil project kosong dan menginstal aplikasi di dalamnya. Output-nya harus berupa aplikasi yang berfungsi dan URL untuk alamat IP load balancing.

./main.tf

Aktifkan Layanan

Layanan Google Cloud dinonaktifkan di project secara default. Untuk menggunakan solusi apa pun di sini, kita harus mengaktifkan hal berikut:

  • IAM — Mengelola identitas dan akses untuk resource Google Cloud
  • Penyimpanan — Layanan untuk menyimpan dan mengakses data Anda di Google Cloud
  • Dataflow — Layanan terkelola untuk menjalankan berbagai macam pola pemrosesan data
  • BigQuery — Platform data untuk membuat, mengelola, membagikan, dan membuat kueri data
  • Composer — Mengelola lingkungan Apache Airflow di Google Cloud
  • Compute — Virtual machine dan Layanan Jaringan (digunakan oleh Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Membuat akun layanan

Membuat akun layanan yang akan digunakan oleh Composer dan Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Tetapkan peran

Memberikan peran yang diperlukan ke akun layanan, dan memberikan peran Ekstensi Agen Layanan Cloud Composer v2 API ke Agen Layanan Cloud Composer (diperlukan untuk lingkungan Composer 2).

variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Membuat lingkungan Composer

Airflow bergantung pada banyak microservice untuk dijalankan, sehingga Cloud Composer menyediakan komponen Google Cloud untuk menjalankan alur kerja Anda. Komponen ini secara kolektif dikenal sebagai lingkungan Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Membuat set data dan tabel BigQuery

Membuat set data dan tabel di BigQuery untuk menyimpan data yang diproses dan menyediakannya untuk analisis.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Membuat bucket Cloud Storage dan menambahkan file

Membuat bucket penyimpanan untuk menyimpan file yang diperlukan untuk pipeline, termasuk data sumber (inputFile.txt), skema target (jsonSchema.json), dan fungsi yang ditentukan pengguna untuk transformasi (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Mengupload file DAG

Pertama-tama, gunakan sumber data untuk menentukan jalur bucket Cloud Storage yang sesuai untuk menambahkan file DAG, lalu tambahkan file DAG ke bucket. File DAG menentukan alur kerja, dependensi, dan jadwal untuk Airflow guna mengatur pipeline Anda.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Kesimpulan

Setelah dijalankan, Anda kini harus menyiapkan Lingkungan Composer untuk menjalankan tugas ETL pada data yang ditampilkan dalam contoh. Selain itu, Anda harus memiliki semua kode untuk memodifikasi atau memperluas solusi ini agar sesuai dengan lingkungan Anda.