Memicu DAG dengan Cloud Functions dan Pesan Pub/Sub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Halaman ini memandu Anda membuat arsitektur push berbasis peristiwa dengan memicu DAG Cloud Composer sebagai respons terhadap Pub/Sub perubahan topik. Contoh dalam tutorial ini menunjukkan penanganan siklus penuh pengelolaan Pub/Sub, termasuk pengelolaan langganan, sebagai bagian dari proses DAG. Ini cocok untuk beberapa kasus penggunaan umum saat Anda perlu memicu DAG, tetapi tidak ingin menyiapkan izin akses tambahan.

Misalnya, pesan yang dikirim melalui Pub/Sub dapat digunakan sebagai solusi jika Anda tidak ingin memberikan akses langsung ke Cloud Composer lingkungan untuk alasan keamanan. Anda dapat mengonfigurasi Fungsi Cloud Run yang membuat pesan Pub/Sub dan memublikasikannya pada topik Pub/Sub. Anda kemudian dapat membuat DAG yang menarik pesan Pub/Sub dan kemudian menangani pesan ini.

Dalam contoh spesifik ini, Anda membuat fungsi Cloud Run dan men-deploy dua DAG. DAG pertama menarik pesan Pub/Sub dan memicu DAG kedua sesuai dengan konten pesan Pub/Sub.

Tutorial ini mengasumsikan Anda telah memahami Python dan Konsol Google Cloud.

Tujuan

Biaya

Tutorial ini menggunakan komponen Google Cloud yang dapat ditagih berikut:

Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan lanjutan dengan menghapus untuk resource yang Anda buat. Lihat Pembersihan untuk mengetahui detail selengkapnya.

Sebelum memulai

Untuk tutorial ini, Anda memerlukan resource Google Cloud project Anda. Konfigurasikan project dengan cara berikut:

  1. Di konsol Google Cloud, pilih atau buat project:

    Buka Pemilih Project

  2. Pastikan penagihan diaktifkan untuk project Anda. Pelajari cara memeriksa apakah penagihan diaktifkan pada sebuah project.

  3. Pastikan pengguna project Google Cloud Anda memiliki peran berikut untuk membuat resource yang diperlukan:

    • Pengguna Akun Layanan (roles/iam.serviceAccountUser)
    • Editor Pub/Sub (roles/pubsub.editor)
    • Administrator Objek Penyimpanan dan Lingkungan (roles/composer.environmentAndStorageObjectAdmin)
    • Admin Cloud Run Functions (roles/cloudfunctions.admin)
    • Logs Viewer (roles/logging.viewer)
  4. Pastikan bahwa akun layanan yang menjalankan fungsi Cloud Run Anda memiliki izin yang memadai di project Anda untuk mengakses Pub/Sub. Menurut secara default, fungsi Cloud Run menggunakan Akun layanan default App Engine. Akun layanan ini memiliki peran Editor, yang memiliki izin untuk tutorial ini.

Mengaktifkan API untuk project Anda

Konsol

Aktifkan API Cloud Composer, Cloud Run functions, and Pub/Sub.

Mengaktifkan API

gcloud

Aktifkan API Cloud Composer, Cloud Run functions, and Pub/Sub:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Aktifkan Cloud Composer API di project Anda dengan menambahkan hal berikut ke skrip Terraform Anda:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Ganti <PROJECT_ID> dengan Project ID dari proyek Anda. Misalnya, example-project.

Membuat lingkungan Cloud Composer

Membuat lingkungan Cloud Composer 2.

Sebagai bagian dari prosedur ini, Anda memberikan Ekstensi Agen Layanan Cloud Composer v2 API (roles/composer.ServiceAgentV2Ext) ke Composer Service Agent menggunakan akun layanan. Cloud Composer menggunakan akun ini untuk menjalankan operasi di project Google Cloud Anda.

Membuat topik Pub/Sub

Contoh ini memicu DAG sebagai respons terhadap pesan yang dikirim ke Topik Pub/Sub. Buat topik Pub/Sub untuk digunakan dalam contoh:

Konsol

  1. Di Konsol Google Cloud, buka halaman Pub/Sub Topics.

    Buka Topik Pub/Sub

  2. Klik Buat Topik.

  3. Di kolom Topic ID, masukkan dag-topic-trigger sebagai ID untuk topik.

  4. Biarkan opsi lain tetap default.

  5. Klik Buat Topik.

gcloud

Untuk membuat topik, jalankan gcloud pubsub topics create di Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Tambahkan definisi resource berikut ke skrip Terraform Anda:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Ganti <PROJECT_ID> dengan Project ID dari proyek Anda. Misalnya, example-project.

Mengupload DAG

Upload DAG ke lingkungan Anda:

  1. Simpan file DAG berikut di komputer lokal Anda.
  2. Ganti <PROJECT_ID> dengan Project ID dari proyek Anda. Misalnya, example-project.
  3. Upload file DAG yang telah diedit ke lingkungan Anda.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

Kode contoh berisi dua DAG: trigger_dag dan target_dag.

DAG trigger_dag berlangganan topik Pub/Sub, menarik Pesan Pub/Sub, dan memicu DAG lain yang ditentukan dalam ID DAG dari data pesan Pub/Sub. Dalam contoh ini, trigger_dag memicu DAG target_dag, yang menghasilkan pesan ke log tugas.

DAG trigger_dag berisi tugas berikut:

  • subscribe_task: Berlangganan topik Pub/Sub.
  • pull_messages_operator: Membaca data pesan Pub/Sub dengan PubSubPullOperator.
  • trigger_target_dag: Memicu DAG lain (dalam contoh ini, target_dag) sesuai dengan data dalam pesan yang diambil dari Pub/Sub topik.

DAG target_dag hanya berisi satu tugas: output_to_logs. Tugas ini mencetak pesan ke log tugas dengan penundaan satu detik.

Men-deploy fungsi Cloud Run yang memublikasikan pesan pada topik Pub/Sub

Di bagian ini, Anda akan men-deploy fungsi Cloud Run yang memublikasikan pesan pada topik Pub/Sub.

Membuat fungsi Cloud Run dan menentukan konfigurasinya

Konsol

  1. Di konsol Google Cloud, buka halaman Cloud Run functions.

    Buka fungsi Cloud Run

  2. Klik Create function.

  3. Di kolom Environment, pilih 1st generasi.

  4. Di kolom Nama fungsi, masukkan nama untuk fungsi Anda: pubsub-publisher.

  5. Pada kolom Jenis pemicu, pilih HTTP.

  6. Di bagian Authentication, pilih Izinkan pemanggilan yang tidak diautentikasi. Opsi ini memberikan pengguna yang tidak diautentikasi kemampuan untuk memanggil fungsi HTTP.

  7. Klik Simpan.

  8. Klik Berikutnya untuk berpindah ke langkah Kode.

Terraform

Pertimbangkan untuk menggunakan konsol Google Cloud untuk langkah ini, karena tidak ada cara mudah untuk mengelola kode sumber fungsi dari Terraform.

Contoh ini menunjukkan cara mengupload fungsi Cloud Run dari file arsip zip lokal dengan membuat bucket Cloud Storage, menyimpan file tersebut di bucket ini, lalu menggunakan file dari bucket sebagai untuk fungsi Cloud Run. Jika Anda menggunakan pendekatan ini, Terraform tidak secara otomatis memperbarui kode sumber fungsi Anda, bahkan jika Anda membuat file arsip baru. Untuk mengupload ulang kode fungsi, Anda dapat mengubah nama file arsip.

  1. Download pubsub_publisher.py dan requirements.txt .
  2. Di file pubsub_publisher.py, ganti <PROJECT_ID> dengan Project ID project Anda. Sebagai contoh, example-project.
  3. Buat arsip zip bernama pubsub_function.zip dengan pbusub_publisner.py dan file requirements.txt.
  4. Simpan arsip zip ke direktori tempat skrip Terraform Anda disimpan.
  5. Tambahkan definisi resource berikut ke skrip Terraform Anda dan Ganti <PROJECT_ID> dengan Project ID project Anda.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Menentukan parameter kode fungsi Cloud Run

Konsol

  1. Di langkah Code, di kolom Runtime, pilih bahasa runtime yang digunakan fungsi Anda. Dalam contoh ini, pilih Python 3.10.

  2. Di kolom Titik entri, masukkan pubsub_publisher. Ini adalah kodenya yang dijalankan saat fungsi Cloud Run Anda dijalankan. Nilai dari flag ini harus berupa nama fungsi atau nama class yang sepenuhnya memenuhi syarat yang ada dalam kode sumber Anda.

Terraform

Lewati langkah ini. Parameter fungsi Cloud Run sudah ditentukan di resource google_cloudfunctions_function.

Mengupload kode fungsi Cloud Run

Konsol

Di kolom Source code, pilih opsi yang sesuai dengan cara Anda memberikan kode sumber fungsi. Dalam tutorial ini, tambahkan kode fungsi Anda menggunakan fungsi Cloud Run Editor Inline. Sebagai alternatif, Anda dapat mengunggah file ZIP, atau menggunakan dan Cloud Source Repositories.

  1. Masukkan contoh kode berikut ke dalam file main.py.
  2. Ganti <PROJECT_ID> dengan Project ID dari proyek Anda. Misalnya, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Lewati langkah ini. Parameter fungsi Cloud Run sudah ditentukan di resource google_cloudfunctions_function.

Menentukan dependensi fungsi Cloud Run Anda

Konsol

Tentukan dependensi fungsi dalam file metadata requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

Saat Anda men-deploy fungsi, fungsi Cloud Run akan mendownload dan menginstal dependensi yang dideklarasikan dalam file requirements.txt, satu baris per paket. File ini harus berada di direktori yang sama seperti file main.py yang berisi kode fungsi Anda. Untuk detail selengkapnya, lihat File Persyaratan dalam dokumentasi pip.

Terraform

Lewati langkah ini. Dependensi fungsi Cloud Run didefinisikan dalam file requirements.txt dalam arsip pubsub_function.zip.

Men-deploy fungsi Cloud Run

Konsol

Klik Deploy. Saat deployment berhasil diselesaikan, fungsi akan muncul dengan tanda centang hijau di halaman Cloud Run functions di Konsol Google Cloud Anda.

Pastikan akun layanan yang menjalankan fungsi Cloud Run Anda memiliki izin yang memadai di project Anda untuk mengakses Pub/Sub.

Terraform

  1. Lakukan inisialisasi Terraform:

    terraform init
    
  2. Tinjau konfigurasi dan pastikan bahwa resource Terraform akan dibuat atau diperbarui sesuai dengan ekspektasi Anda:

    terraform plan
    
  3. Untuk memeriksa apakah konfigurasi Anda valid, jalankan perintah berikut berikut:

    terraform validate
    
  4. Terapkan konfigurasi Terraform dengan menjalankan perintah berikut dan memasukkan "yes" pada perintah:

    terraform apply
    

Tunggu hingga Terraform menampilkan pesan "Apply complete!".

Di Konsol Google Cloud, buka resource Anda di UI untuk membuat memastikan bahwa Terraform telah membuat atau memperbaruinya.

Menguji fungsi Cloud Run

Untuk memeriksa apakah fungsi Anda memublikasikan pesan pada topik Pub/Sub dan contoh DAG berfungsi sebagaimana mestinya:

  1. Pastikan DAG aktif:

    1. Di Konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    2. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

    3. Buka tab DAGs.

    4. Periksa nilai di kolom State untuk DAG bernama trigger_dag dan target_dag. Kedua DAG harus dalam status Active.

  2. Mengirim pesan Pub/Sub pengujian. Anda bisa melakukannya dengan Cloud Shell:

    1. Di Konsol Google Cloud, buka halaman Functions.

      Buka fungsi Cloud Run

    2. Klik nama fungsi Anda, pubsub-publisher.

    3. Buka tab Pengujian.

    4. Di bagian Konfigurasi peristiwa pemicu, masukkan informasi berikut Nilai kunci JSON: {"message": "target_dag"}. Jangan ubah nilai kunci karena pesan ini akan memicu DAG pengujian.

    5. Di bagian Test Command, klik Test in Cloud Shell.

    6. Di Terminal Cloud Shell, tunggu hingga perintah muncul secara otomatis. Jalankan perintah ini dengan menekan Enter.

    7. Jika pesan Authorize Cloud Shell muncul, klik Authorize.

    8. Memeriksa apakah konten pesan sesuai dengan Pub/Sub untuk membuat pesan email baru. Dalam contoh ini, pesan {i>output<i} harus diawali dengan Message b'target_dag' with message_length 10 published to sebagai respons dari fungsi Anda.

  3. Pastikan target_dag telah dipicu:

    1. Tunggu minimal satu menit, agar proses DAG baru trigger_dag berjalan selesai.

    2. Di Konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    3. Pada daftar lingkungan, klik nama lingkungan Anda. Halaman Detail lingkungan akan terbuka.

    4. Buka tab DAGs.

    5. Klik trigger_dag untuk membuka halaman detail DAG. Di Runs , daftar DAG yang dijalankan untuk DAG trigger_dag akan ditampilkan.

      DAG ini berjalan setiap menit dan memproses semua Pub/Sub pesan yang dikirim dari fungsi tersebut. Jika tidak ada pesan yang dikirim, maka Tugas trigger_target ditandai sebagai Skipped di log operasi DAG. Jika DAG dipicu, lalu tugas trigger_target ditandai sebagai Success.

    6. Lihat beberapa operasi DAG terbaru untuk menemukan proses DAG di mana semua tiga tugas (subscribe_task, pull_messages_operator, dan trigger_target) dalam status Success.

    7. Kembali ke tab DAGs dan periksa apakah tab Berhasil dijalankan untuk DAG target_dag mencantumkan satu operasi yang berhasil.

Ringkasan

Dalam tutorial ini, Anda telah mempelajari cara menggunakan fungsi Cloud Run untuk memublikasikan pada topik Pub/Sub dan men-deploy DAG yang berlangganan Topik Pub/Sub, mengambil pesan Pub/Sub, dan memicu DAG lain yang ditentukan dalam ID DAG data pesan.

Ada juga cara alternatif untuk membuat dan mengelola langganan Pub/Sub dan memicu DAG yang berada di luar dari cakupan tutorial ini. Sebagai contoh, Anda dapat menggunakan fungsi Cloud Run untuk memicu DAG Airflow saat terjadi peristiwa tertentu. Lihat tutorial kami untuk mencoba fitur Google Cloud untuk Anda sendiri.

Pembersihan

Agar tidak menimbulkan biaya ke akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut atau mempertahankan proyek dan menghapus sumber daya satu per satu.

Menghapus project

    Menghapus project Google Cloud:

    gcloud projects delete PROJECT_ID

Menghapus resource satu per satu

Jika Anda berencana mempelajari beberapa tutorial dan panduan memulai, menggunakan kembali project dapat membantu Anda agar tidak melampaui batas kuota project.

Konsol

  1. Hapus lingkungan Cloud Composer. Anda juga menghapus bucket lingkungan selama prosedur ini.
  2. Hapus topik Pub/Sub, dag-topic-trigger.
  3. Hapus fungsi Cloud Run.

    1. Di konsol Google Cloud, buka fungsi Cloud Run.

      Buka fungsi Cloud Run

    2. Klik kotak centang untuk fungsi yang ingin Anda hapus, pubsub-publisher.

    3. Klik Hapus, lalu ikuti petunjuknya.

Terraform

  1. Pastikan skrip Terraform Anda tidak berisi entri untuk resource yang masih dibutuhkan oleh project Anda. Sebagai contoh, Anda mungkin ingin tetap mengaktifkan beberapa API dan IAM izin akses masih ditetapkan (jika Anda menambahkan definisi tersebut ke skrip Terraform).
  2. Jalankan terraform destroy.
  3. Menghapus bucket lingkungan secara manual. Cloud Composer tidak menghapusnya secara otomatis. Anda bisa melakukannya dari konsol Google Cloud atau Google Cloud CLI.

Langkah selanjutnya