Cloud Functions と Pub/Sub メッセージを使用して DAG をトリガーする

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Pub/Sub トピックの変更に応じて Cloud Composer DAG をトリガーすることで、イベントベースのプッシュ アーキテクチャを作成する方法について説明します。このチュートリアルの例では、DAG プロセスの一部として、サブスクリプション管理など、Pub/Sub 管理の全サイクルを処理します。これは、DAG をトリガーする必要があるものの、追加のアクセス権限を設定したくない一般的なユースケースに適しています。

たとえば、セキュリティ上の理由から Cloud Composer 環境への直接アクセスを提供しない場合は、Pub/Sub を介して送信されるメッセージをソリューションとして使用できます。Pub/Sub メッセージを作成し、Pub/Sub トピックで公開する Cloud Run functions の関数を構成できます。次に、Pub/Sub メッセージを pull してこれらのメッセージを処理する DAG を作成できます。

この例では、Cloud Run functions の関数を作成し、2 つの DAG をデプロイします。最初の DAG は Pub/Sub メッセージを pull し、Pub/Sub メッセージ コンテンツに沿って 2 番目の DAG をトリガーします。

このチュートリアルは、Python と Google Cloud コンソールに精通していることを前提としています。

目標

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

準備

このチュートリアルでは、Google Cloud プロジェクトが必要です。プロジェクトは、次のように構成します:

  1. Google Cloud コンソールで、プロジェクトを選択または作成します:

    プロジェクト セレクタに移動

  2. プロジェクトに対して課金が有効になっていることを確認します。、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  3. Google Cloud プロジェクトのユーザーが、必要なリソースを作成するための次のロールを持っていることを確認します。

    • サービス アカウント ユーザー (roles/iam.serviceAccountUser)
    • Pub/Sub 編集者roles/pubsub.editor
    • 環境とストレージ オブジェクトの管理者 (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Run Functions 管理者roles/cloudfunctions.admin
    • ログビューア (roles/logging.viewer)
  4. Cloud Run functions の関数を実行するサービス アカウントに、プロジェクトで Pub/Sub にアクセスするための十分な権限が付与されていることを確認します。デフォルトでは、Cloud Run functions は App Engine のデフォルトのサービス アカウントを使用します。このサービス アカウントには編集者のロールがあり、このチュートリアルに必要な権限が付与されています。

プロジェクトでAPI を有効にする

コンソール

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Terraform スクリプトに次のリソース定義を追加して、プロジェクトで Cloud Composer API を有効にします。

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project

Cloud Composer 環境を作成する

Cloud Composer 2 環境を作成する.

この手順の一環としてCloud Composer v2 API サービス エージェント拡張機能roles/composer.ServiceAgentV2Ext)ロールを Composer サービス エージェント アカウントに付与します。Cloud Composer では、このアカウントを使用して Google Cloud プロジェクトでオペレーションを実行します。

Pub/Sub トピックの作成

この例では、Pub/Sub トピックに push されたメッセージに応答して DAG をトリガーします。この例で使用する Pub/Sub トピックを作成します。

コンソール

  1. Google Cloud コンソールで、Pub/Sub トピック ページに移動します。

    Pub/Sub トピックに移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、トピックの ID として dag-topic-trigger を入力します。

  4. 他のオプションはデフォルトのままにします。

  5. [トピックを作成] をクリックします。

gcloud

トピックを作成するには、Google Cloud CLI で gcloud pubsub topics create コマンドを実行します。

gcloud pubsub topics create dag-topic-trigger

Terraform

Terraform スクリプトに次のリソース定義を追加します。

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project

DAG をアップロードする

DAG を環境にアップロードします。

  1. 次の DAG ファイルをローカル コンピュータに保存します。
  2. <PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
  3. 編集済み DAG ファイルをご利用の環境にアップロードします。
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

サンプルコードには、trigger_dagtarget_dag の 2 つの DAG が含まれています。

trigger_dag DAG は Pub/Sub トピックに登録し、Pub/Sub メッセージを pull し、Pub/Sub メッセージ データの DAG ID で指定された別の DAG をトリガーします。この例では、trigger_dagtarget_dag DAG をトリガーし、タスクログにメッセージを出力します。

trigger_dag DAG には次のタスクが含まれています。

  • subscribe_task: Pub/Sub トピックへのサブスクライブ
  • pull_messages_operator: PubSubPullOperator を使用して Pub/Sub メッセージ データを読み取ります。
  • trigger_target_dag: Pub/Sub トピックから pull されたメッセージのデータに応じて、別の DAG(この例では target_dag)をトリガーします。

target_dag DAG には、output_to_logs という 1 つのタスクのみが含まれています。このタスクは、1 秒の遅延でタスクログにメッセージを出力します。

Pub/Sub トピックにメッセージを公開する Cloud Run functions の関数をデプロイする

このセクションでは、Pub/Sub トピックにメッセージを公開する Cloud Run functions の関数をデプロイします。

Cloud Run functions の関数を作成し、その構成を指定する

コンソール

  1. Google Cloud コンソールで、[Cloud Run 関数] ページに移動します。

    Cloud Run 関数に移動します

  2. [関数を作成] をクリックします。

  3. [環境] フィールドで [第 1 世代] を選択します。

  4. [関数名] フィールドに、関数の名前(pubsub-publisher)を入力します。

  5. [トリガー] フィールドで、[HTTP] を選択します。

  6. [認証] セクションで、[未認証の呼び出しを許可] を選択します。このオプションを使用すると、未認証のユーザーに HTTP 関数の呼び出しを許可できます。

  7. [保存] をクリックします。

  8. [次へ] をクリックして、[コード] ステップに進みます。

Terraform

Terraform から関数のソースコードを管理する簡単な方法がないため、この手順では Google Cloud コンソールの使用を検討してください。

この例では、Cloud Storage バケットを作成し、このバケットにファイルを保存して、バケットからのファイルを Cloud Run functions の関数のソースとして使用することで、Cloud Run functions の関数をローカルの zip アーカイブ ファイルからアップロードする方法を示します。この方法を使用する場合、新しいアーカイブ ファイルを作成しても、Terraform は関数のソースコードを自動的に更新しません。関数コードを再アップロードするには、アーカイブのファイル名を変更します。

  1. pubsub_publisher.py ファイルと requirements.txt ファイルをダウンロードします。
  2. pubsub_publisher.py ファイルで、<PROJECT_ID> をプロジェクトのプロジェクト ID に置き換えます。例: example-project
  3. pbusub_publisner.py ファイルと requirements.txt ファイルを含む pubsub_function.zip という名前の ZIP アーカイブを作成します。
  4. zip アーカイブを、Terraform スクリプトが保存されているディレクトリに保存します。
  5. Terraform スクリプトに次のリソース定義を追加し、<PROJECT_ID> をプロジェクトのプロジェクト ID に置き換えます。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Cloud Run functions 関数のコードのパラメータを指定する

コンソール

  1. [コード] ステップの [ランタイム] フィールドで、関数が使用する言語ランタイムを選択します。この例では、[Python 3.10] を選択します。

  2. [エントリ ポイント] フィールドに、「pubsub_publisher」と入力します。これは、Cloud Run functions の関数の実行時に実行されるコードです。このフラグの値は、ソースコード内に存在する関数名または完全修飾クラス名である必要があります。

Terraform

このステップをスキップします。Cloud Run functions の関数のパラメータは、すでに google_cloudfunctions_function リソースで定義されています。

Cloud Run functions の関数のコードをアップロードする

コンソール

[ソースコード] フィールドで、関数のソースコードの提供方法に適したオプションを選択します。このチュートリアルでは、Cloud Run functions の関数のインライン エディタを使用して関数コードを追加します。または、ZIP ファイルをアップロードするか、Cloud Source Repositories を使用することもできます。

  1. 次のコードサンプルを main.py ファイルに配置します。
  2. <PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

このステップをスキップします。Cloud Run functions の関数のパラメータは、すでに google_cloudfunctions_function リソースで定義されています。

Cloud Run functions の関数の依存関係を指定する

コンソール

requirements.txt メタデータ ファイルで関数の依存関係を指定します。

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.21.5

関数をデプロイすると、Cloud Run functions は requirements.txt ファイル内で宣言されている依存関係を、パッケージごとに 1 行ずつダウンロードしてインストールします。このファイルは、関数コードを含む main.py ファイルと同じディレクトリに配置する必要があります。詳細については、pip ドキュメントの要件ファイルをご覧ください。

Terraform

このステップをスキップします。Cloud Run functions の関数の依存関係は、pubsub_function.zip アーカイブの requirements.txt ファイルで定義されます。

Cloud Run functions の関数をデプロイする

コンソール

[デプロイ] をクリックします。デプロイが正常に完了すると、Google Cloud コンソールの [Cloud Run functions] ページに関数と緑色のチェックマークが表示されます。

Cloud Run functions の関数を実行するサービス アカウントには、プロジェクト内に Pub/Sub にアクセスするための十分な権限があることを確認してください。

Terraform

  1. Terraform を初期化します。

    terraform init
    
  2. 構成を確認して、Terraform が作成または更新するリソースが想定どおりであることを確認します。

    terraform plan
    
  3. 次のコマンドを実行して、構成が有効かどうかを確認します。

    terraform validate
    
  4. 次のコマンドを実行し、プロンプトで「yes」と入力して、Terraform 構成を適用します。

    terraform apply
    

Terraform に「Apply complete!」のメッセージが表示されるまで待ちます。

Google Cloud コンソールで、UI のリソースに移動して、Terraform によって作成または更新されたことを確認します。

Cloud Run functions の関数をテストする

関数が Pub/Sub トピックでメッセージをパブリッシュし、サンプルの DAG が意図したとおりに機能することを確認するには:

  1. DAG がアクティブであることを確認します。

    1. Google Cloud コンソールで [環境] ページに移動します。

      [環境] に移動

    2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    3. [DAG] タブに移動します。

    4. trigger_dag という名前の DAG と target_dag という名前の DAG の [State] 列の値を確認します。両方の DAG が Active 状態である必要があります。

  2. テスト用の Pub/Sub メッセージを push します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。

    1. Google Cloud コンソールで、[Functions] ページに移動します。

      Cloud Run 関数に移動します

    2. 関数の名前をクリックします。pubsub-publisher

    3. [テスト] タブに移動します。

    4. [トリガー イベントの構成] セクションで、次の JSON の Key-Value を入力します。{"message": "target_dag"}このメッセージは後でテスト DAG をトリガーするため、キー値ペアを変更しないでください。

    5. [テストコマンド] セクションで、[Cloud Shell でテストする] をクリックします。

    6. [Cloud Shell ターミナル] で、コマンドが自動的に表示されるまで待ちます。Enter キーを押して、このコマンドを実行します。

    7. [Cloud Shell の承認] メッセージが表示されたら、[承認] をクリックします。

    8. メッセージの内容が Pub/Sub メッセージに対応していることを確認します。この例では、出力メッセージは、関数からのレスポンスとして Message b'target_dag' with message_length 10 published to で始まる必要があります。

  3. target_dag がトリガーされたことを確認します。

    1. trigger_dag の新しい DAG 実行が完了するまで 1 分以上待ちます。

    2. Google Cloud コンソールで [環境] ページに移動します。

      [環境] に移動

    3. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    4. [DAG] タブに移動します。

    5. trigger_dag をクリックすると DAG の詳細ページに移動します。[実行] タブに、trigger_dag DAG の DAG 実行のリストが表示されます。

      この DAG は 1 分ごとに実行され、関数から送信されたすべての Pub/Sub メッセージを処理します。メッセージが送信されなかった場合、DAG 実行ログで trigger_target タスクは Skipped としてマークされます。DAG がトリガーされた場合、trigger_target タスクは Success としてマークされます。

    6. 最近の DAG 実行をいくつか調べて、3 つのタスク(subscribe_taskpull_messages_operatortrigger_target)すべてが Success ステータスになっている DAG 実行を見つけます。

    7. [DAG] タブに戻り、target_dag DAG の [成功した実行] 列に成功した実行が 1 つリストされていることを確認します。

概要

このチュートリアルでは、Cloud Run functions を使用して Pub/Sub トピックでメッセージをパブリッシュし、Pub/Sub トピックにサブスクライブして Pub/Sub メッセージを pull し、メッセージ データの DAG ID で指定された別の DAG をトリガーする DAG をデプロイする方法を学びました。

また、Pub/Sub サブスクリプションの作成と管理、および DAG のトリガーには、このチュートリアルの範囲外である別の方法も存在します。たとえば、指定されたイベントが発生したときに、Cloud Run functions を使用して Airflow DAG をトリガーできます。チュートリアルを参照し、Google Cloud の他の機能を試します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

リソースを個別に削除する

複数のチュートリアルとクイックスタートを実施する予定がある場合は、プロジェクトを再利用すると、プロジェクトの割り当て上限を超えないようにできます。

コンソール

  1. Cloud Composer 環境を削除します。この手順で環境のバケットも削除します。
  2. Pub/Sub トピックを削除しますdag-topic-trigger
  3. Cloud Run functions の関数を削除する

    1. Google Cloud コンソールで、[Cloud Run functions] に移動します。

      Cloud Run 関数に移動します

    2. 削除する関数のチェックボックスをクリックします、pubsub-publisher

    3. [削除] をクリックして、手順どおりに実行します。

Terraform

  1. Terraform スクリプトに、プロジェクトでまだ必要なリソースのエントリが含まれていないことを確認します。たとえば、一部の API を有効にしたまま、IAM 権限をまだ割り当てておくことができます(Terraform スクリプトにこのような定義を追加した場合)。
  2. terraform destroy を実行します。
  3. 環境のバケットを手動で削除します。Cloud Composer では自動的に削除されません。これは Google Cloud コンソールまたは Google Cloud CLI から行うことができます。

次のステップ