Cloud Functions と Pub/Sub メッセージで DAG をトリガーする

Cloud Composer 1 | Cloud Composer 2

このページでは、Pub/Sub トピックの変更に応じて Cloud Composer DAG をトリガーして、イベントベースの push アーキテクチャを作成する方法について説明します。このチュートリアルの例では、DAG プロセスの一部として、サブスクリプションの管理を含む Pub/Sub 管理の全サイクルを処理する方法を示します。DAG をトリガーする必要があるが、追加のアクセス権限を設定したくない一般的なユースケースに適しています。

たとえば、Pub/Sub を介して送信されるメッセージは、セキュリティ上の理由から、Cloud Composer 環境に直接アクセスしたくない場合のソリューションとして使用できます。Pub/Sub メッセージを作成し、Pub/Sub トピックで公開する Cloud Functions を構成できます。その後、Pub/Sub メッセージを pull し、それらのメッセージを処理する DAG を作成できます。

この特定の例では、Cloud Functions の関数を作成し、2 つの DAG をデプロイします。最初の DAG は Pub/Sub メッセージを pull し、Pub/Sub メッセージ コンテンツに沿って 2 番目の DAG をトリガーします。

このチュートリアルは、Python と Google Cloud コンソールに精通していることを前提としています。

目標

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

準備

このチュートリアルでは、Google Cloud プロジェクトが必要です。プロジェクトは、次のように構成します:

  1. Google Cloud コンソールで、プロジェクトを選択または作成します:

    プロジェクト セレクタに移動

  2. プロジェクトに対して課金が有効になっていることを確認します。、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  3. Google Cloud プロジェクトのユーザーが、必要なリソースを作成するための次のロールを持っていることを確認します。

    • サービス アカウント ユーザー (roles/iam.serviceAccountUser)
    • Pub/Sub 編集者roles/pubsub.editor
    • 環境とストレージ オブジェクトの管理者 (roles/composer.environmentAndStorageObjectAdmin)
    • Cloud Functions 管理者 (roles/cloudfunctions.admin)
    • ログビューア (roles/logging.viewer)
  4. Cloud Functions を実行するサービス アカウントに、プロジェクトで Pub/Sub にアクセスするための十分な権限が付与されていることを確認します。デフォルトでは、Cloud Functions は App Engine のデフォルトのサービス アカウントを使用します。 このサービス アカウントには、このチュートリアルで十分な権限がある編集者のロールがあります。

プロジェクトでAPI を有効にする

コンソール

Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Terraform スクリプトに次のリソース定義を追加して、プロジェクトで Cloud Composer API を有効にします。

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

<PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project

Cloud Composer 環境を作成する

Cloud Composer 2 環境を作成する.

この手順の一環としてCloud Composer v2 API サービス エージェント拡張機能roles/composer.ServiceAgentV2Ext)ロールを Composer サービス エージェント アカウントに付与します。Cloud Composer では、このアカウントを使用して Google Cloud プロジェクトでオペレーションを実行します。

Pub/Sub トピックの作成

この例では、Pub/Sub トピックに push されたメッセージに応答して DAG をトリガーします。この例で使用する Pub/Sub トピックを作成します。

コンソール

  1. Google Cloud コンソールで、Pub/Sub トピック ページに移動します。

    Pub/Sub トピックに移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、トピックの ID として dag-topic-trigger を入力します。

  4. 他のオプションはデフォルトのままにします。

  5. [トピックを作成] をクリックします。

gcloud

トピックを作成するには、Google Cloud CLI で gcloud pubsub topics create コマンドを実行します。

gcloud pubsub topics create dag-topic-trigger

Terraform

Terraform スクリプトに次のリソース定義を追加します。

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

<PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project

DAG をアップロードする

DAG をお使いの環境にアップロードします。

  1. 次の DAG ファイルをローカル コンピュータに保存します。
  2. <PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
  3. 編集済み DAG ファイルをご利用の環境にアップロードします。
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"

def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids

# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)

def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")

# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

サンプルコードには 2 つの DAG(trigger_dagtarget_dag)が含まれています。

trigger_dag DAG は Pub/Sub トピックに登録し、Pub/Sub メッセージを pull し、Pub/Sub メッセージ データの DAG ID で指定された別の DAG をトリガーします。この例では、trigger_dag によって target_dag DAG がトリガーされ、タスクログにメッセージが出力されます。

trigger_dag DAG には次のタスクが含まれます。

  • subscribe_task: Pub/Sub トピックへのサブスクライブ
  • pull_messages_operator: PubSubPullOperator を使用して Pub/Sub メッセージ データを読み取ります。
  • trigger_target_dag: Pub/Sub トピックから pull されたメッセージのデータに従って、別の DAG(この例では target_dag)をトリガーします。

target_dag DAG にはタスク output_to_logs が 1 つだけ含まれます。このタスクは、1 秒間遅延してタスクログにメッセージを出力します。

Pub/Sub トピックにメッセージを公開する Cloud Functions をデプロイする

このセクションでは、Pub/Sub トピックにメッセージを公開する Cloud Functions をデプロイします。

Cloud Functions を作成し、その構成を指定する

コンソール

  1. Google Cloud コンソールで、[Cloud Functions] ページに移動します。

    Cloud Functions に移動

  2. [関数の作成] をクリックします。

  3. [環境] フィールドで [第 1 世代] を選択します。

  4. [関数名] フィールドに、関数の名前(pubsub-publisher)を入力します。

  5. [トリガー] フィールドで、[HTTP] を選択します。

  6. [認証] セクションで、[未認証の呼び出しを許可] を選択します。このオプションを使用すると、認証されていないユーザーに HTTP 関数の呼び出しが許可されます。

  7. [保存] をクリックする。

  8. [次へ] をクリックして、[コード] ステップに進みます。

Terraform

この手順では Google Cloud コンソールの使用を検討してください。Terraform から関数のソースコードを管理する簡単な方法がないためです。

この例では、Cloud Storage バケットを作成し、このバケットにファイルを保存して、バケットからのファイルを Cloud Functions のソースとして使用することで、Cloud Functions をローカルの zip アーカイブ ファイルからアップロードする方法を示します。この方法を使用した場合、新しいアーカイブ ファイルを作成しても、Terraform では関数のソースコードが自動的に更新されません。関数コードを再アップロードするには、アーカイブのファイル名を変更します。

  1. pubsub_publisher.py ファイルと requirements.txt ファイルをダウンロードします。
  2. pubsub_publisher.py ファイルで、<PROJECT_ID> をプロジェクトのプロジェクト ID に置き換えます。例: example-project
  3. pbusub_publisner.py ファイルと requirements.txt ファイルを含む pubsub_function.zip という名前の zip アーカイブを作成します。
  4. Terraform スクリプトが保存されているディレクトリに zip アーカイブを保存します。
  5. 次のリソース定義を Terraform スクリプトに追加し、<PROJECT_ID> をプロジェクトのプロジェクト ID に置き換えます。
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Cloud Functions のコード パラメータを指定する

コンソール

  1. [コード] ステップの [ランタイム] フィールドで、関数で使用する言語ランタイムを選択します。この例では、Python 3.10 を選択します。

  2. [エントリ ポイント] フィールドに、「pubsub_publisher」と入力します。これは、Cloud Functions の関数の実行時に実行されるコードです。このフラグの値は、ソースコード内に存在する関数名または完全修飾クラス名である必要があります。

Terraform

このステップをスキップします。Cloud Functions のパラメータは、すでに google_cloudfunctions_function リソースで定義されています。

Cloud Functions のコードをアップロードする

コンソール

[ソースコード] フィールドで、関数のソースコードの提供方法に適したオプションを選択します。このチュートリアルでは、Cloud Functions インライン エディタを使用して関数コードを追加します。または、ZIP ファイルをアップロードするか、Cloud Source Repositories を使用することもできます。

  1. main.py ファイルに次のサンプルコードを入力します。
  2. <PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"

def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

このステップをスキップします。Cloud Functions のパラメータは、すでに google_cloudfunctions_function リソースで定義されています。

Cloud Functions の依存関係を指定する

コンソール

requirements.txt メタデータ ファイルで関数の依存関係を指定します。

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

関数をデプロイすると、Cloud Functions は requirements.txt ファイル内で宣言されている依存関係を、パッケージごとに 1 行ずつダウンロードしてインストールします。 このファイルは、関数コードを含む main.py ファイルと同じディレクトリに配置する必要があります。詳細については、pip ドキュメントの要件ファイルをご覧ください。

Terraform

このステップをスキップします。Cloud Functions の依存関係は、pubsub_function.zip アーカイブの requirements.txt ファイルで定義されます。

Cloud Functions の関数のデプロイ

コンソール

デプロイ] をクリックします。デプロイが正常に完了すると、Google Cloud コンソールの [Cloud Functions] ページで、関数に緑色のチェックマークが表示されます。

Cloud Functions を実行するサービス アカウントには、プロジェクト内に Pub/Sub にアクセスするための十分な権限があることを確認してください。

Terraform

  1. Terraform を初期化します。

    terraform init
    
  2. 構成を確認して、Terraform が作成または更新するリソースが想定どおりであることを確認します。

    terraform plan
    
  3. 次のコマンドを実行して、構成が有効かどうかを確認します。

    terraform validate
    
  4. 次のコマンドを実行し、プロンプトで「yes」と入力して、Terraform 構成を適用します。

    terraform apply
    

Terraform に「Apply complete!」のメッセージが表示されるまで待ちます。

Google Cloud コンソールで、UI のリソースに移動して、Terraform によって作成または更新されたことを確認します。

Cloud Functions の関数をテストする

関数が Pub/Sub トピックでメッセージをパブリッシュし、サンプルの DAG が意図したとおりに機能することを確認するには:

  1. DAG がアクティブであることを確認します。

    1. Google Cloud コンソールで [環境] ページに移動します。

      [環境] に移動

    2. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    3. [DAG] タブに移動します。

    4. trigger_dagtarget_dag という名前の DAG の [State] 列の値を確認します。両方の DAG が Active 状態になっている必要があります。

  2. テスト用の Pub/Sub メッセージを push します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。

    1. Google Cloud コンソールで、[Functions] ページに移動します。

      Cloud Functions に移動

    2. 関数の名前をクリックします。pubsub-publisher

    3. [テスト] タブに移動します。

    4. [トリガー イベントの構成] セクションで、次の JSON の Key-Value を入力します。{"message": "target_dag"}このメッセージは後でテスト DAG をトリガーするため、Key-Value ペアを変更しないでください。

    5. [テストコマンド] セクションで、[Cloud Shell でテスト] をクリックします。

    6. [Cloud Shell ターミナル] で、コマンドが自動的に表示されるまで待ちます。Enter を押して、このコマンドを実行します。

    7. [Cloud Shell の承認] メッセージが表示されたら、[承認] をクリックします。

    8. メッセージの内容が Pub/Sub メッセージに対応していることを確認します。この例では、出力メッセージは、関数からのレスポンスとして Message b'target_dag' with message_length 10 published to で始まる必要があります。

  3. target_dag がトリガーされたことを確認します。

    1. trigger_dag の新しい DAG 実行が完了するまで、少なくとも 1 分間待ちます。

    2. Google Cloud コンソールで [環境] ページに移動します。

      [環境] に移動

    3. 環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。

    4. [DAG] タブに移動します。

    5. trigger_dag をクリックすると DAG の詳細ページに移動します。[実行] タブに、trigger_dag DAG の DAG 実行のリストが表示されます。

      この DAG は 1 分ごとに実行され、関数から送信されたすべての Pub/Sub メッセージを処理します。メッセージが送信されていない場合は、DAG 実行ログで trigger_target タスクが Skipped とマークされます。DAG がトリガーされた場合、trigger_target タスクは Success としてマークされます。

    6. 最近の DAG 実行をいくつか調べて、3 つのタスク(subscribe_taskpull_messages_operatortrigger_target)すべてが Success ステータスになっている DAG 実行を探します。

    7. [DAG] タブに戻り、target_dag DAG の [成功した実行] 列に成功した実行が 1 つリストされていることを確認します。

Summary

このチュートリアルでは、Cloud Functions を使用して Pub/Sub トピックでメッセージをパブリッシュし、Pub/Sub トピックにサブスクライブして Pub/Sub メッセージを pull し、メッセージ データの DAG ID で指定された別の DAG をトリガーする DAG をデプロイする方法を学びました。

また、Pub/Sub サブスクリプションの作成と管理、および DAG のトリガーには、このチュートリアルの範囲外である別の方法も存在します。たとえば、指定されたイベントが発生したときに、Cloud Functions を使用して Airflow DAG をトリガーできます。 チュートリアルを参照し、Google Cloud の他の機能を試します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

リソースを個別に削除する

複数のチュートリアルとクイックスタートを実施する予定がある場合は、プロジェクトを再利用すると、プロジェクトの割り当て上限を超えないようにできます。

コンソール

  1. Cloud Composer 環境を削除します。この手順で環境のバケットも削除します。
  2. Pub/Sub トピックを削除しますdag-topic-trigger
  3. Cloud Functions 関数を削除する。

    1. Google Cloud コンソールで、[Cloud Functions] に移動します。

      Cloud Functions に移動

    2. 削除する関数のチェックボックスをクリックします、pubsub-publisher

    3. [削除] をクリックして、手順どおりに実行します。

Terraform

  1. Terraform スクリプトに、プロジェクトで必要なリソースのエントリが含まれていないことを確認します。たとえば、一部の API を有効にしたまま、IAM 権限をまだ割り当てておくことができます(Terraform スクリプトにこのような定義を追加した場合)。
  2. terraform destroy を実行します。
  3. 環境のバケットを手動で削除します。Cloud Composer では自動的には削除されません。これは Google Cloud コンソールまたは Google Cloud CLI から行うことができます。

次のステップ