Pub/Sub の push によるトリガー

このページでは、Pub/Sub を使用してメッセージを Cloud Run サービスのエンドポイントに push する方法を説明します。このエンドポイントでは、メッセージが HTTP リクエストとしてコンテナに配信されます。このページでは、同じ Google Cloud プロジェクトの Pub/Sub サブスクリプションから push されたメッセージを、サービスで安全に処理する方法について説明します。

サービス アカウントと IAM 権限を利用することで、Cloud Run サービスを公開することなく、Cloud Run で Pub/Sub を非公開かつ安全に使用できます。設定した Pub/Sub サブスクリプションのみがサービスを呼び出せます。

Cloud Run の確認応答期限

Pub/Sub サブスクリプションの確認応答期限(ackDeadlineSeconds)を 600 秒以下にしてください。Cloud Run サービスは 600 秒以内にレスポンスを返すことで Pub/Sub メッセージに応答する必要があります。応答しなかった場合、Pub/Sub はメッセージを再配信するため、Cloud Run サービスで重複トリガーが発生します。

ユースケース

考えられるユースケースには、次のようなものがあります。

統合の概要

サービスを Pub/Sub と統合するには:

  • Pub/Sub トピックを作成します。
  • 作成したトピックに送信された Pub/Sub メッセージに応答するコードを、Cloud Run サービスに追加します。
  • 必要な権限を持つサービス アカウントを作成します。
  • Pub/Sub サブスクリプションを作成し、サービス アカウントに関連付けます。このサブスクリプションは、トピックにパブリッシュされているすべてのメッセージをサービスに送信します。

始める前に

  • Cloud Run の設定ページの説明に従って環境を設定します(まだ設定していない場合)。
  • このガイドでは、すでに Cloud Run サービスが存在していて、それを Pub/Sub と統合するコードを追加する場合を前提としています。このようなサービスがない場合は、このページではなく、Pub/Sub の Cloud Run チュートリアルをご覧ください。

Pub/Sub からのメッセージを処理するコードを追加する

既存のサービスコードを編集して、Pub/Sub のサポートに必要なコードを追加します。サービスはリクエストからメッセージを抽出し、所定の成功コードを返す必要があります。以下の代表的な言語のスニペット(任意の言語を使用可能)では、単純な Hello World メッセージを出力する方法を示します。

Node.js

app.post('/', (req, res) => {
  if (!req.body) {
    const msg = 'no Pub/Sub message received';
    console.error(`error: ${msg}`);
    res.status(400).send(`Bad Request: ${msg}`);
    return;
  }
  if (!req.body.message) {
    const msg = 'invalid Pub/Sub message format';
    console.error(`error: ${msg}`);
    res.status(400).send(`Bad Request: ${msg}`);
    return;
  }

  const pubSubMessage = req.body.message;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
    : 'World';

  console.log(`Hello ${name}!`);
  res.status(204).send();
});

Python

@app.route("/", methods=["POST"])
def index():
    """Receive and parse Pub/Sub messages."""
    envelope = request.get_json()
    if not envelope:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(envelope, dict) or "message" not in envelope:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = envelope["message"]

    name = "World"
    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()

    print(f"Hello {name}!")

    return ("", 204)

Go


// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloPubSub receives and processes a Pub/Sub push message.
func HelloPubSub(w http.ResponseWriter, r *http.Request) {
	var m PubSubMessage
	body, err := ioutil.ReadAll(r.Body)
	if err != nil {
		log.Printf("ioutil.ReadAll: %v", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}
	// byte slice unmarshalling handles base64 decoding.
	if err := json.Unmarshal(body, &m); err != nil {
		log.Printf("json.Unmarshal: %v", err)
		http.Error(w, "Bad Request", http.StatusBadRequest)
		return
	}

	name := string(m.Message.Data)
	if name == "" {
		name = "World"
	}
	log.Printf("Hello %s!", name)
}

Java

import com.example.cloudrun.Body;
import java.util.Base64;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

// PubsubController consumes a Pub/Sub message.
@RestController
public class PubSubController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(@RequestBody Body body) {
    // Get PubSub message from request body.
    Body.Message message = body.getMessage();
    if (message == null) {
      String msg = "Bad Request: invalid Pub/Sub message format";
      System.out.println(msg);
      return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    String target =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String msg = "Hello " + target + "!";

    System.out.println(msg);
    return new ResponseEntity<>(msg, HttpStatus.OK);
  }
}

正確な HTTP レスポンス コードを返すようにサービスをコーディングする必要があります。HTTP 200204 などの成功コードは、Pub/Sub メッセージの処理の完了を意味します。HTTP 400 や HTTP 500 などのエラーコードは、push を使用したメッセージの受信で説明されているとおり、メッセージが再試行されることを示します。

上記の Pub/Sub コードで更新した後、Cloud Run サービスをビルドしてデプロイします。

サブスクリプション用のサービス アカウントを作成する

Pub/Sub サブスクリプションに関連付けるサービス アカウントを作成し、Cloud Run サービスの呼び出し権限を付与する必要があります。Cloud Run サービスに push された Pub/Sub メッセージには、このサービス アカウントの ID が含まれています。

既存のサービス アカウントを使用して Pub/Sub サブスクリプション ID とするか、新しいサービス アカウントを作成することが可能です。

新しいサービス アカウントを作成し、Cloud Run サービスを呼び出す権限を付与するには:

コンソール

  1. Google Cloud コンソールで、[サービス アカウント] ページに移動します。

    [サービス アカウント] に移動

  2. プロジェクトを選択します。

  3. Google Cloud コンソールに表示するサービス アカウント名を入力します。

    この名前に基づいてサービス アカウント ID が生成され、Google Cloud コンソールに表示されます。必要に応じて ID を編集します。後で ID を変更することはできません。

  4. (省略可)サービス アカウントの説明を入力します。

  5. [作成して続行] をクリックします。

  6. 省略可: [ロールを選択] フィールドをクリックします。

  7. [Cloud Run] > [Cloud Run 起動元] を選択します。

  8. [完了] をクリックします。

コマンドライン

  1. サービス アカウントを作成します。

    gcloud iam service-accounts create SERVICE_ACCOUNT_NAME \
       --display-name "DISPLAYED_SERVICE_ACCOUNT_NAME"

    次のように置き換えます。

    • SERVICE_ACCOUNT_NAME は、Google Cloud プロジェクト内で一意の小文字の名前(my-invoker-service-account-name など)に置き換えます。
    • DISPLAYED_SERVICE_ACCOUNT_NAME は、このサービス アカウントに対してコンソール上で表示する名前(My Invoker Service Account など)に置き換えます。
  2. Cloud Run では、サービス アカウントにサービスを呼び出す権限を付与します。

    gcloud run services add-iam-policy-binding SERVICE \
       --member=serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com \
       --role=roles/run.invoker

    次のように置き換えます。

    • SERVICE は、Pub/Sub によって呼び出されるサービスの名前に置き換えます。
    • SERVICE_ACCOUNT_NAME は、サービス アカウントの名前に置き換えます。
    • PROJECT_ID は、Google Cloud プロジェクト ID に置き換えます。
  3. サービス アカウントにプロジェクトへのアクセス権を付与すると、プロジェクト内のリソースに対して特定の操作を行う権限が与えられます。

    gcloud projects add-iam-policy-binding RESOURCE_ID \
       --member=PRINCIPAL --role=roles/run.invoker

    次のように置き換えます。

    • RESOURCE_ID: Google Cloud プロジェクト ID。

    • PRINCIPAL: プリンシパルまたはメンバーの識別子。通常、PRINCIPAL_TYPE:ID の形式です(たとえば、user:my-user@example.com)。PRINCIPAL に使用できる値の一覧については、ポリシー バインディングのリファレンスをご覧ください。

Terraform

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

  1. サービス アカウントを作成するには、既存の .tf ファイルに次の行を追加します。

    resource "google_service_account" "sa" {
      account_id   = "cloud-run-pubsub-invoker"
      display_name = "Cloud Run Pub/Sub Invoker"
    }
  2. サービス アカウントにサービスを呼び出す権限を付与するには、既存の .tf ファイルに次の行を追加します。

    resource "google_cloud_run_service_iam_binding" "binding" {
      location = google_cloud_run_v2_service.default.location
      service  = google_cloud_run_v2_service.default.name
      role     = "roles/run.invoker"
      members  = ["serviceAccount:${google_service_account.sa.email}"]
    }

Pub/Sub トピックを作成する

サービスへのリクエストは Pub/Sub トピックに公開されたメッセージによってトリガーされるため、トピックを作成する必要があります。

コンソール

  1. Google Cloud コンソールの [Pub/Sub トピック] ページにアクセスします。

    [Pub/Sub トピック] ページ

  2. [トピックを作成] をクリックします。

  3. トピックに一意の名前(MyTopic など)を入力します。

コマンドライン

gcloud pubsub topics create TOPIC-NAME

TOPIC-NAME は、Google Cloud プロジェクト内で一意のトピック名に置き換えます。

Terraform

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

このセクションでは、TerraformGoogle Cloud Platform Providergoogle_pubsub_topic リソースを使用して、Terraform 構成にサービスを定義する方法を説明します。

次の内容を既存の .tf ファイルに追加します。

resource "google_pubsub_topic" "default" {
  name = "pubsub_topic"
}

push サブスクリプションを作成してサービス アカウントに関連付ける

Pub/Sub トピックを作成後、トピックに送信されたメッセージを受信するためにサービスを登録し、サービス用に作成したサービス アカウントにサブスクリプションを関連付ける必要があります。Google Cloud コンソールまたは gcloud コマンドラインのいずれかを使用できます。

コンソール

  1. [Pub/Sub トピック] ページに移動します。

    [Pub/Sub トピック] ページ

  2. 登録するトピックをクリックします。

  3. [サブスクリプションを作成] をクリックして登録フォームを表示します。

    登録フォーム

    フォームで次の操作を行います。

    1. 配信タイプで [push] を指定します。
    2. [エンドポイント URL] で、サービスの URL を指定します。この URL はサービスの詳細ページに表示されます。
    3. [サービス アカウント] プルダウンから、必要な権限付きで作成したサービス アカウントを選択します。
    4. サブスクリプションの有効期限と 600 秒の確認応答期限を設定します。
    5. [作成] をクリックします。
  4. 登録が完了しました。これで、トピックに送信されたメッセージがサービスに push されるようになります。

コマンドライン

  1. Pub/Sub がプロジェクトで認証トークンを作成できるようにします。

    gcloud projects add-iam-policy-binding PROJECT-ID \
         --member=serviceAccount:service-PROJECT-NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com \
         --role=roles/iam.serviceAccountTokenCreator

    次のように置き換えます。

    • PROJECT-ID は、Google Cloud プロジェクト ID に置き換えます。
    • PROJECT-NUMBER は、Google Cloud プロジェクト番号に置き換えます。

      プロジェクト ID とプロジェクト番号は、Google Cloud コンソールで、対象プロジェクトの [プロジェクト情報] パネルに表示されます。

  2. 必要な権限付きで作成されたサービス アカウントで、Pub Sub サブスクリプションを作成します。

    gcloud pubsub subscriptions create SUBSCRIPTION-ID --topic TOPIC-NAME \
       --ack-deadline=600 \
       --push-endpoint=SERVICE-URL/ \
       --push-auth-service-account=SERVICE-ACCOUNT-NAME@PROJECT-ID.iam.gserviceaccount.com

    次のように置き換えます。

    • TOPIC-NAME は、以前に作成したトピックに置き換えます。
    • SERVICE-URL は、サービスをデプロイしたときに指定した HTTPS URL に置き換えます。これを確認するには、サービスの名前を指定してコマンド gcloud run services describe を実行し、出力結果の中から domain で始まる行を探します。
    • PROJECT-ID は、Google Cloud プロジェクト ID に置き換えます。

    --push-auth-service-account フラグは、認証と認可用に Pub/Sub の push 機能を有効にします。

    確認応答期限は最大 600 秒に設定されています。

  3. 登録が完了しました。これで、トピックに送信されたメッセージがサービスに push されるようになります。次のコマンドを使用して、テスト メッセージをトピックに push できます。

    gcloud pubsub topics publish TOPIC --message "hello"

    TOPIC は、作成したトピックの名前に置き換えます。

Terraform

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

  1. Pub/Sub がプロジェクトで認証トークンを作成できるようにします。次のコードを .tf ファイルに追加します。

    resource "google_project_service_identity" "pubsub_agent" {
      provider = google-beta
      project  = data.google_project.project.project_id
      service  = "pubsub.googleapis.com"
    }
    
    resource "google_project_iam_binding" "project_token_creator" {
      project = data.google_project.project.project_id
      role    = "roles/iam.serviceAccountTokenCreator"
      members = ["serviceAccount:${google_project_service_identity.pubsub_agent.email}"]
    }
  2. 必要な権限付きで作成されたサービス アカウントで、Pub/Sub サブスクリプションを作成します。次のコードを .tf ファイルに追加します。

    resource "google_pubsub_subscription" "subscription" {
      name  = "pubsub_subscription"
      topic = google_pubsub_topic.default.name
      push_config {
        push_endpoint = google_cloud_run_v2_service.default.uri
        oidc_token {
          service_account_email = google_service_account.sa.email
        }
        attributes = {
          x-goog-version = "v1"
        }
      }
      depends_on = [google_cloud_run_v2_service.default]
    }
  3. 登録が完了しました。これで、トピックに送信されたメッセージがサービスに push されるようになります。次のコマンドを使用して、テスト メッセージをトピックに push できます。

    gcloud pubsub topics publish TOPIC --message "hello"

    TOPIC は、作成したトピックの名前に置き換えます。

次のステップ