コールバックと Eventarc トリガーを使用してイベントを待機する


ワークフローでは外部プロセスを待機する必要がある場合があります。HTTP コールバックを使用して、別のサービスがコールバック エンドポイントにリクエストを行うことを待ち、そのリクエストによってワークフローの実行を再開できます。ポーリングの使用を待機することもできます。

このチュートリアルでは、ポーリングを使用する代わりに、HTTP コールバックと Eventarc トリガーを使用してイベントや Pub/Sub メッセージを待機する方法を説明します。イベントまたは Pub/Sub メッセージでワークフローをトリガーすることもできますが、続行する前にその実行を停止して別のイベントを待つことをおすすめします。たとえば、イベントによってワークフローがトリガーされてプロセスが開始されますが、ワークフローはプロセスの完了を通知する別のイベントを待機する必要があります。これは、あるワークフローで別のワークフローにコールバックすることで実装できます。

目標

このチュートリアルでは、次のことが発生します。

  1. イベントを待機する必要があるプライマリ ワークフローがデプロイされ、実行されます。イベントが発生するのを待つ必要があるため、セカンダリ ワークフローが詳細を取得できるように、コールバックの詳細を Firestore データベースに保存します。その後、プライマリ ワークフローは HTTP 呼び出しを待機します。

  2. セカンダリ ワークフローがイベントによってトリガーされ、イベントが生成されたときに Firestore データベースからコールバックの詳細を取得します。その後、セカンダリ ワークフローは、実行を再開するプライマリ ワークフローにコールバックします。

以下にプロセス全体の概要を示します。

コールバック、プライマリとセカンダリのワークフローを使用してイベントを待機する

プライマリ ワークフロー:

  1. callback-event-sample ワークフローは、2 つのイベントソース(Pub/Sub トピックと Cloud Storage バケット)のコールバック エンドポイントを作成します。
  2. このワークフローでは、両方のコールバック エンドポイントを Firestore ドキュメントに保存します。
  3. このワークフローは実行を停止し、HTTP リクエストがコールバック エンドポイントに到着するまで待機します。

イベント:

  1. イベントが発生します。メッセージが Pub/Sub トピックにパブリッシュされ、ファイルが Cloud Storage バケットにアップロードされます。

セカンダリ ワークフロー:

  1. Eventarc はイベントを callback-event-listener ワークフローにルーティングし、その実行をトリガーします。
  2. このワークフローは、Firestore ドキュメントから適切なコールバック エンドポイント URL を取得します。
  3. このワークフローは、サンプル ワークフローの適切なエンドポイントに対するコールバックを実行します。

プライマリ ワークフロー:

  1. callback-event-sample ワークフローはコールバック エンドポイントでイベントを受信し、実行を再開します。
  2. このワークフローは、Firestore ドキュメントからコールバック URL を削除し、実行を完了します。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

Google Cloud コンソールで次のコマンドを実行するか、ターミナルまたは Cloud Shell で Google Cloud CLI を使用できます。

組織で定義されているセキュリティの制約により、次の手順を完了できない場合があります。トラブルシューティング情報については、制約のある Google Cloud 環境でアプリケーションを開発するをご覧ください。

コンソール

  1. Google Cloud コンソールのプロジェクト セレクタ ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  3. App Engine、Eventarc、Firestore、Pub/Sub、Workflows API を有効にします。

    API を有効にする

  4. 他の Google Cloud サービスでの認証に使用するワークフローのサービス アカウントを作成し、適切なロールを付与します。

    1. Google Cloud コンソールで、[サービス アカウント] ページに移動します。

      [サービス アカウント] に移動

    2. [サービス アカウントの作成] ページに移動するには、プロジェクトを選択します。

    3. [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドの値が設定されます。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for tutorial

    4. [作成して続行] をクリックします。

    5. [ロールを選択] リストで、次のロールをフィルタして、前の手順で作成したユーザー管理のサービス アカウントに付与します。

      • Cloud Datastore ユーザー: Datastore モード(Datastore)のデータで Firestore にアクセスします。
      • Eventarc イベント受信者: イベント プロバイダからイベントを受信します。
      • ログ書き込み: ログを書き込みます。
      • ワークフロー起動元: ワークフローを実行し、実行内容を管理します。

      ロールを追加するには、[別のロールを追加] をクリックして各ロールを追加します。

    6. [続行] をクリックします。

    7. アカウントの作成を完了するには、[完了] をクリックします。

  5. Cloud Storage からイベントをルーティングする Eventarc トリガーを作成するには、Pub/Sub パブリッシャーのロールを Cloud Storage サービス エージェントに付与します。通常、これは service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com です。これにより、Cloud Storage サービス エージェントのメールアドレスを取得できます。

    1. Google Cloud コンソールの [IAM] ページに移動します。

      [IAM] に移動

    2. Cloud Storage サービス エージェントの行で、プリンシパルを編集します」をクリックします(サービス エージェントがリストにない場合は、次のステップに進みます)。権限の編集ペインが開きます。

      1. [別のロールを追加] をクリックして、Pub/Sub パブリッシャーのロールを検索します。
      2. ロールを選択します。
      3. [保存] をクリックします。
    3. サービス エージェントがリストにない場合は、[アクセス権を付与] をクリックします。アクセス権の付与ペインが開きます。

      1. [新しいプリンシパル] フィールドに、サービス エージェントのメールアドレスを入力します。
      2. [ロールを選択] リストで、Pub/Sub パブリッシャー ロールを検索します。
      3. ロールを選択します。
      4. [保存] をクリックします。

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Google Cloud プロジェクトの課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  3. App Engine、Eventarc、Firestore、Pub/Sub、Workflows API を有効にします。

    gcloud services enable \
        appengine.googleapis.com \
        eventarc.googleapis.com \
        firestore.googleapis.com \
        pubsub.googleapis.com \
        workflows.googleapis.com
  4. 他の Google Cloud サービスでの認証に使用するワークフローのサービス アカウントを作成し、適切なロールを付与します。

    1. サービス アカウントを作成します。

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME をサービス アカウントの名前に置き換えます。

    2. 前のステップで作成したユーザー管理のサービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

      • roles/datastore.user: Datastore モード(Datastore)のデータで Firestore にアクセスします。
      • roles/eventarc.eventReceiver: イベント プロバイダからイベントを受信します。
      • roles/logging.logWriter: ログを書き込みます。
      • roles/workflows.invoker: ワークフローを実行し、実行内容を管理します。
      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
          --role=ROLE

      以下を置き換えます。

      • PROJECT_ID: サービス アカウントを作成したプロジェクト ID
      • ROLE: 付与するロール
  5. Cloud Storage からイベントをルーティングする Eventarc トリガーを作成するには、Pub/Sub パブリッシャーのロールを Cloud Storage サービス エージェントに付与します。通常、これは service-PROJECT_NUMBER@gs-project-accounts.iam.gserviceaccount.com です。 最初に gcloud storage service-agent を使用して、Cloud Storage のサービス エージェントを取得します。

    SERVICE_ACCOUNT_STORAGE="$(gcloud storage service-agent --project=PROJECT_ID)"
    
    gcloud projects add-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:$SERVICE_ACCOUNT_STORAGE \
        --role=roles/pubsub.publisher

Firestore データベースを作成する

Firestore は、値に対応するフィールドを含むドキュメントにデータを保存します。これらのドキュメントはコレクションに格納されます。コレクションは、データの編成とクエリの作成に使用できるドキュメントのコンテナです。Firestore の詳細を確認する

各 Google Cloud プロジェクトは、Firestore データベースを 1 つに制限されていることに注意してください。新しいデータベースを作成する必要がある場合は、次の手順を行います。

コンソール

  1. Google Cloud コンソールで、Firestore の [スタートガイド] ページに移動します。

    [スタートガイド] に移動

  2. [ネイティブ モードを選択] をクリックします。

    データベースの選択に関するガイドと各機能の比較については、ネイティブ モードと Datastore モードからの選択をご覧ください。

  3. [ロケーションを選択] リストで、[nam5(United States)] を選択します。

    このロケーションは、Google Cloud プロジェクトの Firestore データベースと App Engine アプリケーションの両方に適用されます。 データベースを作成した後に、そのロケーションを変更することはできません。

  4. [データベースを作成] をクリックします。

gcloud

Firestore データベースを作成するには、App Engine アプリケーションを作成してから gcloud firestore databases create コマンドを実行する必要があります。

gcloud app create --region=us-central
gcloud firestore databases create --region=us-central

us-central is not a valid Firestore location 警告は無視してかまいません。 App Engine と Firestore は同じロケーションをサポートしていますが、App Engine の us-central(アイオワ)リージョンは Firestore nam5(米国)マルチリージョンにマッピングされます。

Pub/Sub トピックの作成

このチュートリアルでは、イベントソースとして Pub/Sub を使用します。Pub/Sub トピックを作成して、メッセージをパブリッシュできるようにします。 詳細については、トピックの作成と管理をご覧ください。

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに「topic-callback」と入力します。

  4. 他の値はデフォルトを使用します。

  5. [トピックを作成] をクリックします。

gcloud

トピックを作成するには、gcloud pubsub topics create コマンドを実行します。

gcloud pubsub topics create topic-callback

Cloud Storage バケットを作成する

このチュートリアルでは、イベントソースとして Cloud Storage を使用します。Cloud Storage バケットを作成して、ファイルをアップロードできるようにします。 ストレージ バケットの作成方法を学習する

コンソール

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [Cloud Storage] に移動

  2. [ 作成] をクリックします。

  3. バケットの [名前] に「PROJECT_ID-bucket-callback」と入力します。

    プロジェクト ID は、callback-event-sample ワークフローでバケットを識別するために使用されます。

  4. [続行] をクリックします。

  5. [ロケーション タイプ] で [リージョン] を選択し、[us-central1(アイオワ)] を選択します。

  6. 他の値はデフォルトを使用します。

  7. [作成] をクリックします。

gcloud

トピックを作成するには、gcloud storage buckets create コマンドを実行します。

gcloud storage buckets create gs://PROJECT_ID-bucket-callback \
    --location=us-central1

プロジェクト ID は、callback-event-sample ワークフローでバケットを識別するために使用されます。

イベントソースの作成後、イベント レシーバ ワークフローをデプロイできます。

イベントをリッスンするワークフローをデプロイする

callback-event-listener ワークフローは、メッセージが Pub/Sub トピックに公開されるか、ファイルが Cloud Storage バケットにアップロードされたときにトリガーされます。ワークフローはイベントを受け取り、Firestore データベースから適切なコールバックの詳細を取得して、HTTP リクエストをコールバック エンドポイントに送信します。

Console

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいフィールドの名前を入力します: callback-event-listener

  4. [リージョン] リストで [us-central1] を選択します。

  5. 先ほど作成した [サービス アカウント] を選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch callback-event-listener.yaml
  2. テキスト エディタで、次のワークフローをソースコード ファイルにコピーします。

    main:
      params: [event]
      steps:
        - log_event:
            call: sys.log
            args:
              text: ${event}
              severity: INFO
        - init:
            assign:
              - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
              - event_source_tokens: ${text.split(event.source, "/")}
              - event_source_len: ${len(event_source_tokens)}
              - event_source: ${event_source_tokens[event_source_len - 1]}
              - doc_name: ${database_root + event_source}
        - get_document_for_event_source:
            try:
              call: googleapis.firestore.v1.projects.databases.documents.get
              args:
                name: ${doc_name}
              result: document
            except:
                as: e
                steps:
                    - known_errors:
                        switch:
                        - condition: ${e.code == 404}
                          return: ${"No callbacks for event source " + event_source}
                    - unhandled_exception:
                        raise: ${e}
        - process_callback_urls:
            steps:
              - check_fields_exist:
                  switch:
                  - condition: ${not("fields" in document)}
                    return: ${"No callbacks for event source " + event_source}
                  - condition: true
                    next: processFields
              - processFields:
                  for:
                      value: key
                      in: ${keys(document.fields)}
                      steps:
                          - extract_callback_url:
                              assign:
                                  - callback_url: ${document.fields[key]["stringValue"]}
                          - log_callback_url:
                              call: sys.log
                              args:
                                text: ${"Calling back url " + callback_url}
                                severity: INFO
                          - http_post:
                              call: http.post
                              args:
                                  url: ${callback_url}
                                  auth:
                                      type: OAuth2
                                  body:
                                      event: ${event}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy callback-event-listener \
        --source=callback-event-listener.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME は、先ほど作成したサービス アカウントの名前に置き換えます。

イベントを待機するワークフローをデプロイする

callback-event-sample ワークフローは、コールバックの詳細を Firestore データベースに保存し、実行を停止して、特定のイベントが発生するのを待ちます。

Console

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいフィールドの名前を入力します: callback-event-sample

  4. [リージョン] リストで [us-central1] を選択します。

  5. 先ほど作成した [サービス アカウント] を選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch callback-event-sample.yaml
  2. テキスト エディタで、次のワークフローをソースコード ファイルにコピーします。

    main:
      steps:
        - init:
            assign:
              - pubsub_topic: topic-callback
              - storage_bucket: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "-bucket-callback"}
        - await_pubsub_message:
            call: await_callback_event
            args:
              event_source: ${pubsub_topic}
            result: pubsub_event
        - await_storage_bucket:
            call: await_callback_event
            args:
              event_source: ${storage_bucket}
            result: storage_event
        - return_events:
            return:
                pubsub_event: ${pubsub_event}
                storage_event: ${storage_event}
    
    await_callback_event:
        params: [event_source]
        steps:
            - init:
                assign:
                  - database_root: ${"projects/" + sys.get_env("GOOGLE_CLOUD_PROJECT_ID") + "/databases/(default)/documents/callbacks/"}
                  - doc_name: ${database_root + event_source}
                  - execution_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
                  - firestore_key: ${"exec_" + text.split(execution_id, "-")[0]}
            - create_callback:
                call: events.create_callback_endpoint
                args:
                  http_callback_method: POST
                result: callback_details
            - save_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
                  body:
                    fields:
                      ${firestore_key}:
                        stringValue: ${callback_details.url}
            - log_and_await_callback:
                try:
                  steps:
                    - log_await_start:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Started waiting 1hr for an event from source " + event_source}
                    - await_callback:
                        call: events.await_callback
                        args:
                          callback: ${callback_details}
                          timeout: 3600
                        result: callback_request
                    - log_await_stop:
                        call: sys.log
                        args:
                          severity: INFO
                          data: ${"Stopped waiting for an event from source " + event_source}
                except:
                    as: e
                    steps:
                        - log_error:
                            call: sys.log
                            args:
                                severity: "ERROR"
                                text: ${"Received error " + e.message}
            - delete_callback_url:
                call: googleapis.firestore.v1.projects.databases.documents.patch
                args:
                  name: ${doc_name}
                  updateMask:
                    fieldPaths: ["${firestore_key}"]
            - check_null_event:
                switch:
                  - condition: ${callback_request == null}
                    return: null
            - log_await_result:
                call: sys.log
                args:
                  severity: INFO
                  data: ${callback_request.http_request.body.event}
            - return_event:
                return: ${callback_request.http_request.body.event}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy callback-event-sample \
        --source=callback-event-sample.yaml \
        --location=us-central1 \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME は、先ほど作成したサービス アカウントの名前に置き換えます。

Pub/Sub イベントをルーティングする Eventarc トリガーを作成する

Eventarc トリガーを使用すると、イベントソースとターゲット ワークフローを含むトリガーのフィルタを指定して、イベントをルーティングできます。 Pub/Sub トピックにメッセージをパブリッシュした結果として callback-event-listener ワークフローを実行する Eventarc トリガーを作成します。 ワークフローのトリガーの詳細を確認する

コンソール

  1. Google Cloud コンソールで、[Eventarc] ページに移動します。

    [Eventarc] に移動

  2. [トリガーを作成] をクリックします。

  3. トリガー名を入力します。

    例: trigger-pubsub-events-listener

  4. [イベント プロバイダ] リストで、[Cloud Pub/Sub] を選択します。

  5. [イベント] リストの [カスタム] で、[google.cloud.pubsub.topic.v1.messagePublished] を選択します。

  6. [Cloud Pub/Sub トピックを選択] リストで、前に作成したトピックを選択します。

  7. [リージョン] リストで [us-central1 (Iowa)] を選択します。

  8. プロンプトが表示されたら、Pub/Sub サービス アカウントに iam.serviceAccountTokenCreator ロールを付与します。

  9. 先ほど作成した [サービス アカウント] を選択します。

  10. [イベントの宛先] リストで、[ワークフロー] を選択します。

  11. [ワークフローの選択] リストで、[callback-event-listener] ワークフローを選択します。

  12. [作成] をクリックします。

gcloud

トリガーを作成するには、gcloud eventarc triggers create コマンドを実行します。

gcloud eventarc triggers create trigger-pubsub-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=topic-callback \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

イベントは変換され、ランタイム引数としてワークフロー実行に渡されます。 新しいトリガーが有効になるまでに、最長で 2 分ほどかかる場合があります。

Cloud Storage イベントをルーティングする Eventarc トリガーを作成する

Eventarc トリガーを使用すると、イベントソースとターゲット ワークフローを含むトリガーのフィルタを指定して、イベントをルーティングできます。 Cloud Storage バケットにファイルをアップロードした結果として callback-event-listener ワークフローを実行する Eventarc トリガーを作成します。ワークフローのトリガーの詳細を確認する

コンソール

  1. Google Cloud コンソールで、[Eventarc] ページに移動します。

    [Eventarc] に移動

  2. [トリガーを作成] をクリックします。

  3. トリガー名を入力します。

    例: trigger-storage-events-listener

  4. [イベント プロバイダ] リストで、[Cloud Storage] を選択します。

  5. [イベント] リストの [直接] で、google.cloud.storage.object.v1.finald を選択します。

  6. [バケット] リストで、前に作成したバケットを参照して選択します。

  7. [リージョン] リストで、Cloud Storage バケットに基づいて、デフォルト値の [us-central1(アイオワ)] を受け入れます。

  8. プロンプトが表示されたら、Pub/Sub サービス アカウントに iam.serviceAccountTokenCreator ロールを付与します。

  9. 先ほど作成した [サービス アカウント] を選択します。

  10. [イベントの宛先] リストで、[ワークフロー] を選択します。

  11. [ワークフローの選択] リストで、[callback-event-listener] ワークフローを選択します。

  12. [作成] をクリックします。

gcloud

トリガーを作成するには、gcloud eventarc triggers create コマンドを実行します。

gcloud eventarc triggers create trigger-storage-events-listener \
    --location=us-central1 \
    --destination-workflow=callback-event-listener \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=PROJECT_ID-bucket-callback" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

イベントは変換され、ランタイム引数としてワークフロー実行に渡されます。 新しいトリガーが有効になるまでに、最長で 2 分ほどかかる場合があります。

プライマリ ワークフローを実行する

ワークフローを実行すると、そのワークフローに関連付けられた現在のワークフロー定義が実行されます。 callback-event-sample ワークフローを実行します。これはプライマリ ワークフローで、特定のイベントが発生するのを待ちます。セカンダリ ワークフローが適切なコールバック リクエストを行った場合にのみ、実行が再開されます。

コンソール

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、[callback-event-sample] ワークフローをクリックして詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. もう一度 [Execute] をクリックします。

    ワークフローの実行が開始されます。実行中は、Running実行状態と、次のようなログエントリが表示されます: Started waiting 1hr for an event from source topic-callback

gcloud

ワークフローを実行するには、gcloud workflows run コマンドを実行します。

gcloud workflows run callback-event-sample \
    --location=us-central1

ワークフローの実行が開始されます。実行中は、次のような実行状態が表示されます。

Waiting for execution [a848a164-268a-449c-b2fe-396f32f2ed66] to complete...working...

イベントを生成し、実行ステータスを確認する

結果が想定どおりであることを確認するには、イベントの生成、ログエントリの表示、ワークフローの実行ステータスの確認を行います。

メッセージの公開

前に作成した Pub/Sub トピックにメッセージをパブリッシュします。

コンソール

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [topic-callback] をクリックします。

  3. [メッセージ] タブをクリックします。

  4. [メッセージをパブリッシュ] をクリックします。

  5. [メッセージ本文] フィールドに「Hello World」と入力します。

  6. [公開] をクリックします。

gcloud

メッセージをパブリッシュするには、gcloud pubsub topics publish コマンドを使用します。

gcloud pubsub topics publish topic-callback \
    --message="Hello World"

オブジェクトのアップロード

前に作成した Cloud Storage バケットにファイルをアップロードします。

コンソール

  1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. 以前に作成したバケットの名前をクリックします。

  3. [オブジェクト] タブで、次のいずれかの操作を行います。

    • デスクトップまたはファイル マネージャーから目的のファイルを Google Cloud コンソールのメインペインにドラッグ&ドロップします。

    • [ファイルをアップロード] をクリックし、アップロードするファイルを選択してから、[開く] をクリックします。

gcloud

ファイルをアップロードするには、gcloud storage cp コマンドを実行します。

gcloud storage cp OBJECT_LOCATION gs://PROJECT_ID-bucket-callback/

OBJECT_LOCATION は、オブジェクトへのローカルパスに置き換えます。例: random.txt

ログエントリと実行ステータスを表示する

callback-event-sample ワークフローが正常に完了したことを確認します。

コンソール

  1. Google Cloud コンソールの [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、[callback-event-sample] ワークフローをクリックして詳細ページに移動します。

  3. [ワークフローの詳細] ページで特定の実行の詳細を取得するには、該当する実行 ID をクリックします。

    [実行状態] は [成功] になり、出力ペインに受信した Pub/Sub イベントと Cloud Storage イベントが表示されます。

gcloud

  1. ログエントリをフィルタして、JSON 形式で出力を返します。

    gcloud logging read "resource.type=workflows.googleapis.com/Workflow AND textPayload:calling OR textPayload:waiting" \
        --format=json
  2. 次のようなログエントリを探します。

    "textPayload": "Stopped waiting for an event from source..."
    "textPayload": "Calling back url https://workflowexecutions.googleapis.com/v1/projects/..."
    "textPayload": "Started waiting 1hr for an event from source..."
    
  3. 最後の実行の試行のステータスを確認します。

    gcloud workflows executions wait-last

    結果の例を以下に示します。

    Using cached execution name: projects/1085953646031/locations/us-central1/workflows/callback-event-sample/executions/79929e4e-82c1-4da1-b068-f828034c01b7
    Waiting for execution [79929e4e-82c1-4da1-b068-f828034c01b7] to complete...done.
    [...]
    state: SUCCEEDED
    

クリーンアップ

このチュートリアル用に新規プロジェクトを作成した場合は、そのプロジェクトを削除します。既存のプロジェクトを使用し、このチュートリアルで変更を加えずに残す場合は、チュートリアル用に作成したリソースを削除します。

プロジェクトを削除する

課金をなくす最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

プロジェクトを削除するには:

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

このチュートリアルで作成したリソースを削除する

  1. Firestore からデータを削除する

  2. Pub/Sub トピックを削除する

  3. Cloud Storage バケットを削除する

  4. Eventarc トリガーを削除する

  5. ワークフローを削除する

次のステップ