Pipeline ETL adalah arsitektur untuk menjalankan pipeline pemrosesan data batch menggunakan metode ekstrak, transformasi, pemuatan. Arsitektur ini terdiri dari komponen berikut:
- Google Cloud Storage untuk data sumber landing
- Dataflow untuk melakukan transformasi pada data sumber
- BigQuery sebagai tujuan untuk data yang ditransformasi
- Lingkungan Cloud Composer untuk mengatur proses ETL
Mulai
Klik link berikut untuk melihat salinan kode sumber di Cloud Shell. Setelah di sana, satu perintah akan membuat salinan aplikasi yang berfungsi di project Anda.
Komponen Pipeline ETL
Arsitektur Pipeline ETL menggunakan beberapa produk. Berikut adalah daftar komponen, beserta informasi selengkapnya tentang komponen tersebut, termasuk link ke video terkait, dokumentasi produk, dan panduan interaktif.Skrip
Skrip penginstalan menggunakan file yang dapat dieksekusi yang ditulis di go
dan alat Terraform CLI untuk mengambil project kosong dan menginstal aplikasi di dalamnya. Output-nya harus berupa aplikasi yang berfungsi dan URL untuk alamat IP load balancing.
./main.tf
Aktifkan Layanan
Layanan Google Cloud dinonaktifkan di project secara default. Untuk menggunakan solusi apa pun di sini, kita harus mengaktifkan hal berikut:
- IAM — Mengelola identitas dan akses untuk resource Google Cloud
- Penyimpanan — Layanan untuk menyimpan dan mengakses data Anda di Google Cloud
- Dataflow — Layanan terkelola untuk menjalankan berbagai macam pola pemrosesan data
- BigQuery — Platform data untuk membuat, mengelola, membagikan, dan membuat kueri data
- Composer — Mengelola lingkungan Apache Airflow di Google Cloud
- Compute — Virtual machine dan Layanan Jaringan (digunakan oleh Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Membuat akun layanan
Membuat akun layanan yang akan digunakan oleh Composer dan Dataflow.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Tetapkan peran
Memberikan peran yang diperlukan ke akun layanan, dan memberikan peran Ekstensi Agen Layanan Cloud Composer v2 API ke Agen Layanan Cloud Composer (diperlukan untuk lingkungan Composer 2).
variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Membuat lingkungan Composer
Airflow bergantung pada banyak microservice untuk dijalankan, sehingga Cloud Composer menyediakan komponen Google Cloud untuk menjalankan alur kerja Anda. Komponen ini secara kolektif dikenal sebagai lingkungan Cloud Composer.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Membuat set data dan tabel BigQuery
Membuat set data dan tabel di BigQuery untuk menyimpan data yang diproses dan menyediakannya untuk analisis.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Membuat bucket Cloud Storage dan menambahkan file
Membuat bucket penyimpanan untuk menyimpan file yang diperlukan untuk pipeline, termasuk data sumber (inputFile.txt), skema target (jsonSchema.json), dan fungsi yang ditentukan pengguna untuk transformasi (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Mengupload file DAG
Pertama-tama, gunakan sumber data untuk menentukan jalur bucket Cloud Storage yang sesuai untuk menambahkan file DAG, lalu tambahkan file DAG ke bucket. File DAG menentukan alur kerja, dependensi, dan jadwal untuk Airflow guna mengatur pipeline Anda.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Kesimpulan
Setelah dijalankan, Anda kini harus menyiapkan Lingkungan Composer untuk menjalankan tugas ETL pada data yang ditampilkan dalam contoh. Selain itu, Anda harus memiliki semua kode untuk memodifikasi atau memperluas solusi ini agar sesuai dengan lingkungan Anda.