Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Halaman ini memandu Anda membuat arsitektur push berbasis peristiwa dengan memicu DAG Cloud Composer sebagai respons terhadap perubahan topik Pub/Sub. Contoh dalam tutorial ini menunjukkan penanganan siklus lengkap pengelolaan Pub/Sub, termasuk pengelolaan langganan, sebagai bagian dari proses DAG. Ini cocok untuk beberapa kasus penggunaan umum saat Anda perlu memicu DAG, tetapi tidak ingin menyiapkan izin akses tambahan.
Misalnya, pesan yang dikirim melalui Pub/Sub dapat digunakan sebagai solusi jika Anda tidak ingin memberikan akses langsung ke lingkungan Cloud Composer karena alasan keamanan. Anda dapat mengonfigurasi fungsi Cloud Run yang membuat pesan Pub/Sub dan memublikasikannya di topik Pub/Sub. Kemudian, Anda dapat membuat DAG yang mengambil pesan Pub/Sub, lalu menangani pesan ini.
Dalam contoh khusus ini, Anda akan membuat fungsi Cloud Run dan men-deploy dua DAG. DAG pertama mengambil pesan Pub/Sub dan memicu DAG kedua sesuai dengan konten pesan Pub/Sub.
Tutorial ini mengasumsikan bahwa Anda sudah memahami Python dan konsol Google Cloud.
Tujuan
Biaya
Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih berikut:
- Cloud Composer (lihat juga biaya tambahan)
- Pub/Sub
- Cloud Run Functions
Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Lihat Pembersihan untuk mengetahui detail selengkapnya.
Sebelum memulai
Untuk tutorial ini, Anda memerlukan project Google Cloud. Konfigurasikan project dengan cara berikut:
Di konsol Google Cloud, pilih atau buat project:
Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada project.
Pastikan pengguna project Google Cloud Anda memiliki peran berikut untuk membuat resource yang diperlukan:
- Service Account User (
roles/iam.serviceAccountUser
) - Pub/Sub Editor (
roles/pubsub.editor
) - Environment and Storage Object Administrator
(
roles/composer.environmentAndStorageObjectAdmin
) - Cloud Run functions Admin (
roles/cloudfunctions.admin
) - Logs Viewer (
roles/logging.viewer
)
- Service Account User (
Pastikan bahwa akun layanan yang menjalankan fungsi Cloud Run Anda memiliki izin yang memadai di project Anda untuk mengakses Pub/Sub. Secara default, fungsi Cloud Run menggunakan akun layanan default App Engine. Akun layanan ini memiliki peran Editor, yang memiliki izin yang memadai untuk tutorial ini.
Mengaktifkan API untuk project Anda
Konsol
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Aktifkan Cloud Composer API di project Anda dengan menambahkan definisi resource berikut ke skrip Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Ganti <PROJECT_ID>
dengan Project ID project Anda. Misalnya, example-project
.
Membuat lingkungan Cloud Composer
Buat lingkungan Cloud Composer 2.
Sebagai bagian dari prosedur ini,
Anda memberikan peran Ekstensi Agen Layanan Cloud Composer v2 API
(roles/composer.ServiceAgentV2Ext
) ke akun Agen Layanan Composer. Cloud Composer menggunakan akun ini untuk melakukan operasi
di project Google Cloud Anda.
Membuat topik Pub/Sub
Contoh ini memicu DAG sebagai respons terhadap pesan yang dikirim ke topik Pub/Sub. Buat topik Pub/Sub untuk digunakan dalam contoh ini:
Konsol
Di konsol Google Cloud, buka halaman Pub/Sub Topics.
Klik Buat Topik.
Di kolom Topic ID, masukkan
dag-topic-trigger
sebagai ID untuk topik Anda.Biarkan opsi lain tetap pada setelan defaultnya.
Klik Buat Topik.
gcloud
Untuk membuat topik, jalankan perintah gcloud pubsub topics create di Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Tambahkan definisi resource berikut ke skrip Terraform Anda:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Ganti <PROJECT_ID>
dengan Project ID project Anda. Misalnya, example-project
.
Mengupload DAG
Upload DAG ke lingkungan Anda:
- Simpan file DAG berikut di komputer lokal Anda.
- Ganti
<PROJECT_ID>
dengan Project ID project Anda. Misalnya,example-project
. - Upload file DAG yang telah diedit ke lingkungan Anda.
Kode contoh berisi dua DAG: trigger_dag
dan target_dag
.
DAG trigger_dag
berlangganan topik Pub/Sub, mengambil pesan Pub/Sub, dan memicu DAG lain yang ditentukan dalam ID DAG data pesan Pub/Sub. Dalam contoh ini, trigger_dag
memicu DAG target_dag
, yang menghasilkan pesan ke log tugas.
DAG trigger_dag
berisi tugas berikut:
subscribe_task
: Berlangganan topik Pub/Sub.pull_messages_operator
: Membaca data pesan Pub/Sub denganPubSubPullOperator
.trigger_target_dag
: Memicu DAG lain (dalam contoh ini,target_dag
) sesuai dengan data dalam pesan yang diambil dari topik Pub/Sub.
DAG target_dag
hanya berisi satu tugas: output_to_logs
. Tugas ini
mencetak pesan ke log tugas dengan penundaan satu detik.
Men-deploy fungsi Cloud Run yang memublikasikan pesan di topik Pub/Sub
Di bagian ini, Anda akan men-deploy fungsi Cloud Run yang memublikasikan pesan di topik Pub/Sub.
Membuat fungsi Cloud Run dan menentukan konfigurasinya
Konsol
Di konsol Google Cloud, buka halaman Cloud Run functions.
Klik Create function.
Di kolom Environment, pilih 1st gen.
Di kolom Nama fungsi, masukkan nama untuk fungsi Anda:
pubsub-publisher
.Pada kolom Jenis pemicu, pilih HTTP.
Di bagian Autentikasi, pilih Izinkan pemanggilan yang tidak diautentikasi. Opsi ini memberikan kemampuan kepada pengguna yang tidak diautentikasi untuk memanggil fungsi HTTP.
Klik Simpan.
Klik Berikutnya untuk berpindah ke langkah Kode.
Terraform
Pertimbangkan untuk menggunakan konsol Google Cloud untuk langkah ini, karena tidak ada cara yang mudah untuk mengelola kode sumber fungsi dari Terraform.
Contoh ini menunjukkan cara mengupload fungsi Cloud Run dari file arsip zip lokal dengan membuat bucket Cloud Storage, menyimpan file di bucket ini, lalu menggunakan file dari bucket sebagai sumber untuk fungsi Cloud Run. Jika Anda menggunakan pendekatan ini, Terraform tidak akan otomatis memperbarui kode sumber fungsi Anda, meskipun Anda membuat file arsip baru. Untuk mengupload ulang kode fungsi, Anda dapat mengubah nama file arsip.
- Download file
pubsub_publisher.py
danrequirements.txt
. - Dalam file
pubsub_publisher.py
, ganti<PROJECT_ID>
dengan Project ID project Anda. Misalnya,example-project
. - Buat arsip zip bernama
pubsub_function.zip
dengan filepbusub_publisner.py
danrequirements.txt
. - Simpan arsip zip ke direktori tempat skrip Terraform Anda disimpan.
- Tambahkan definisi resource berikut ke skrip Terraform Anda dan
ganti
<PROJECT_ID>
dengan Project ID project Anda.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Menentukan parameter kode fungsi Cloud Run
Konsol
Pada langkah Kode, di kolom Runtime, pilih runtime bahasa yang digunakan fungsi Anda. Dalam contoh ini, pilih Python 3.10.
Di kolom Entry point, masukkan
pubsub_publisher
. Ini adalah kode yang dijalankan saat fungsi Cloud Run Anda berjalan. Nilai flag ini harus berupa nama fungsi atau nama class yang sepenuhnya memenuhi syarat yang ada dalam kode sumber Anda.
Terraform
Lewati langkah ini. Parameter fungsi Cloud Run sudah ditentukan dalam resource google_cloudfunctions_function
.
Mengupload kode fungsi Cloud Run
Konsol
Di kolom Source code, pilih opsi yang sesuai untuk cara Anda menyediakan kode sumber fungsi. Dalam tutorial ini, tambahkan kode fungsi Anda menggunakan Editor Inline fungsi Cloud Run. Sebagai alternatif, Anda dapat mengupload file ZIP, atau menggunakan Cloud Source Repositories.
- Masukkan contoh kode berikut ke dalam file main.py.
- Ganti
<PROJECT_ID>
dengan Project ID project Anda. Misalnya,example-project
.
Terraform
Lewati langkah ini. Parameter fungsi Cloud Run sudah ditentukan dalam resource google_cloudfunctions_function
.
Menentukan dependensi fungsi Cloud Run
Konsol
Tentukan dependensi fungsi dalam file metadata requirements.txt:
Saat Anda men-deploy fungsi, fungsi Cloud Run akan mendownload dan menginstal dependensi yang dideklarasikan dalam file requirements.txt, satu baris per paket.
File ini harus berada di direktori yang sama dengan file main.py yang berisi
kode fungsi Anda. Untuk mengetahui detail selengkapnya, lihat
File Persyaratan
dalam dokumentasi pip
.
Terraform
Lewati langkah ini. Dependensi fungsi Cloud Run ditentukan dalam file requirements.txt
dalam arsip pubsub_function.zip
.
Men-deploy fungsi Cloud Run
Konsol
Klik Deploy. Setelah deployment berhasil, fungsi akan muncul dengan tanda centang hijau di halaman fungsi Cloud Run di konsol Google Cloud.
Pastikan akun layanan yang menjalankan fungsi Cloud Run Anda memiliki izin yang memadai di project Anda untuk mengakses Pub/Sub.
Terraform
Lakukan inisialisasi Terraform:
terraform init
Tinjau konfigurasi dan pastikan resource yang akan dibuat atau diupdate oleh Terraform sesuai dengan ekspektasi Anda:
terraform plan
Untuk memeriksa apakah konfigurasi Anda valid, jalankan perintah berikut:
terraform validate
Terapkan konfigurasi Terraform dengan menjalankan perintah berikut dan memasukkan yes pada prompt:
terraform apply
Tunggu hingga Terraform menampilkan pesan "Apply complete!".
Di konsol Google Cloud, buka resource Anda di UI untuk memastikan Terraform telah membuat atau memperbaruinya.
Menguji fungsi Cloud Run
Untuk memeriksa apakah fungsi Anda memublikasikan pesan di topik Pub/Sub dan contoh DAG berfungsi sebagaimana mestinya:
Pastikan DAG aktif:
Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab DAG.
Periksa nilai di kolom Status untuk DAG bernama
trigger_dag
dantarget_dag
. Kedua DAG harus dalam statusActive
.
Kirim pesan Pub/Sub pengujian. Anda dapat melakukannya di Cloud Shell:
Di konsol Google Cloud, buka halaman Functions.
Klik nama fungsi Anda,
pubsub-publisher
.Buka tab Pengujian.
Di bagian Configure triggering event, masukkan key-value JSON berikut:
{"message": "target_dag"}
. Jangan ubah pasangan nilai kunci, karena pesan ini akan memicu DAG pengujian nanti.Di bagian Test Command, klik Test in Cloud Shell.
Di Cloud Shell Terminal, tunggu hingga perintah muncul secara otomatis. Jalankan perintah ini dengan menekan
Enter
.Jika pesan Izinkan Cloud Shell muncul, klik Izinkan.
Pastikan konten pesan sesuai dengan pesan Pub/Sub. Dalam contoh ini, pesan output harus dimulai dengan
Message b'target_dag' with message_length 10 published to
sebagai respons dari fungsi Anda.
Pastikan
target_dag
telah dipicu:Tunggu setidaknya satu menit, sehingga DAG baru yang dijalankan
trigger_dag
selesai.Di konsol Google Cloud, buka halaman Environments.
Di daftar lingkungan, klik nama lingkungan Anda. Halaman Environment details akan terbuka.
Buka tab DAG.
Klik
trigger_dag
untuk membuka halaman Detail DAG. Di tab Runs, daftar operasi DAG untuk DAGtrigger_dag
akan ditampilkan.DAG ini berjalan setiap menit dan memproses semua pesan Pub/Sub yang dikirim dari fungsi. Jika tidak ada pesan yang dikirim, tugas
trigger_target
akan ditandai sebagaiSkipped
dalam log operasi DAG. Jika DAG dipicu, tugastrigger_target
akan ditandai sebagaiSuccess
.Lihat beberapa operasi DAG terbaru untuk menemukan operasi DAG dengan ketiga tugas (
subscribe_task
,pull_messages_operator
, dantrigger_target
) dalam statusSuccess
.Kembali ke tab DAG dan pastikan kolom Operasi yang berhasil untuk DAG
target_dag
mencantumkan satu operasi yang berhasil.
Ringkasan
Dalam tutorial ini, Anda telah mempelajari cara menggunakan fungsi Cloud Run untuk memublikasikan pesan di topik Pub/Sub dan men-deploy DAG yang berlangganan topik Pub/Sub, mengambil pesan Pub/Sub, dan memicu DAG lain yang ditentukan dalam ID DAG data pesan.
Ada juga cara alternatif untuk membuat dan mengelola langganan Pub/Sub dan memicu DAG yang berada di luar cakupan tutorial ini. Misalnya, Anda dapat menggunakan fungsi Cloud Run untuk memicu DAG Airflow saat peristiwa tertentu terjadi. Lihat tutorial kami untuk mencoba fitur Google Cloud lainnya.
Pembersihan
Agar tidak dikenai biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.
Menghapus project
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Menghapus resource satu per satu
Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.
Konsol
- Hapus lingkungan Cloud Composer. Anda juga akan menghapus bucket lingkungan selama prosedur ini.
- Hapus topik Pub/Sub,
dag-topic-trigger
. Hapus fungsi Cloud Run.
Di konsol Google Cloud, buka fungsi Cloud Run.
Klik kotak centang untuk fungsi yang ingin Anda hapus,
pubsub-publisher
.Klik Hapus, lalu ikuti petunjuknya.
Terraform
- Pastikan skrip Terraform Anda tidak berisi entri untuk resource yang masih diperlukan oleh project Anda. Misalnya, Anda mungkin ingin tetap mengaktifkan beberapa API dan izin IAM masih ditetapkan (jika Anda menambahkan definisi tersebut ke skrip Terraform).
- Jalankan
terraform destroy
. - Hapus bucket lingkungan secara manual. Cloud Composer tidak menghapusnya secara otomatis. Anda dapat melakukannya dari konsol Google Cloud atau Google Cloud CLI.
Langkah selanjutnya
- Menguji DAG
- Menguji Fungsi HTTP
- Men-deploy fungsi Cloud Run
- Coba sendiri fitur Google Cloud lainnya. Lihat tutorial kami.