Google Cloud Pub/Sub トリガー

Cloud Functions は、関数と同じ Google Cloud プロジェクトで Pub/Sub トピックに公開されるメッセージによってトリガーできます。Pub/Sub はグローバルに分散されたメッセージバスであり、必要に応じて自動的にスケーリングし、堅牢でグローバルな独自のサービスを構築するための基盤を提供します。

イベントタイプ

Cloud Functions で使用される単一の Pub/Sub イベントが存在し、そのトリガータイプ値は google.pubsub.topic.publish です。

このイベントは、関数のデプロイ時に指定される Pub/Sub トピックにメッセージが公開されたときに送信されます。このトピックに公開されるすべてのメッセージによって関数の実行がトリガーされ、メッセージのコンテンツが入力データとして送られます。

イベントの構造

Pub/Sub トピックからトリガーされる Cloud Functions には、PubsubMessage タイプに準拠するイベントが送信されますが、PubsubMessage では publishTimemessageId を直接的に使用できないことに注意します。代わりに、publishTimemessageId にはイベント メタデータのイベント ID とタイムスタンプのプロパティを使用するとアクセスできます。このメタデータには、呼び出しの際に関数に渡されるコンテキスト オブジェクトを介してアクセスできます

トピックに公開されるデータである PubsubMessage オブジェクトのペイロードは、base64 エンコード文字列として PubsubMessagedata 属性に格納されます。PubsubMessage オブジェクトのペイロードを抽出するには、次の例に示されているように、データ属性のデコードが必要な場合もあります。

サンプルコード

Node.js

/**
 * Background Cloud Function to be triggered by Pub/Sub.
 * This function is exported by index.js, and executed when
 * the trigger topic receives a message.
 *
 * @param {object} data The event payload.
 * @param {object} context The event metadata.
 */
exports.helloPubSub = (data, context) => {
  const pubSubMessage = data;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString()
    : 'World';

  console.log(`Hello, ${name}!`);
};

Python

def hello_pubsub(event, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         event (dict):  The dictionary with data specific to this type of
         event. The `data` field contains the PubsubMessage message. The
         `attributes` field will contain custom attributes if there are any.
         context (google.cloud.functions.Context): The Cloud Functions event
         metadata. The `event_id` field contains the Pub/Sub message ID. The
         `timestamp` field contains the publish time.
    """
    import base64

    print("""This Function was triggered by messageId {} published at {}
    """.format(context.event_id, context.timestamp))

    if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
    else:
        name = 'World'
    print('Hello {}!'.format(name))

Go


// Package helloworld provides a set of Cloud Functions samples.
package helloworld

import (
	"context"
	"log"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	name := string(m.Data)
	if name == "" {
		name = "World"
	}
	log.Printf("Hello, %s!", name)
	return nil
}

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import functions.eventpojos.PubSubMessage;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloPubSub implements BackgroundFunction<PubSubMessage> {
  private static final Logger logger = Logger.getLogger(HelloPubSub.class.getName());

  @Override
  public void accept(PubSubMessage message, Context context) {
    String name = "world";
    if (message != null && message.getData() != null) {
      name = new String(
          Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
          StandardCharsets.UTF_8);
    }
    logger.info(String.format("Hello %s!", name));
    return;
  }
}

関数内からメッセージを公開する

関数内からメッセージを Pub/Sub トピックに公開することもできます。これにより、Cloud Pub/Sub メッセージを使用して、後続の Cloud Function の呼び出しをトリガーできます。この手法は、以下の処理に使用できます。

  • 順次関数呼び出しを一緒に行う。
  • 複数の Cloud Functions の関数インスタンス間に同時に、タスクのグループを分配(または「ファンアウト」)する。

次の例では、HTTP publish 関数によって、Pub/Sub トピックにメッセージが送信され、それによって subscribe 関数がトリガーされます。

このスニペットは、Pub/Sub トピックにメッセージを公開する publish 関数を示します。

Node.js

/**
 * Publishes a message to a Cloud Pub/Sub Topic.
 *
 * @example
 * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
 *
 *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
 *
 * @param {object} req Cloud Function request context.
 * @param {object} req.body The request body.
 * @param {string} req.body.topic Topic name on which to publish.
 * @param {string} req.body.message Message to publish.
 * @param {object} res Cloud Function response context.
 */
exports.publish = async (req, res) => {
  if (!req.body.topic || !req.body.message) {
    res
      .status(500)
      .send(
        'Missing parameter(s); include "topic" and "subscription" properties in your request.'
      );
    return;
  }

  console.log(`Publishing message to topic ${req.body.topic}`);

  // References an existing topic
  const topic = pubsub.topic(req.body.topic);

  const messageObject = {
    data: {
      message: req.body.message,
    },
  };
  const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

  // Publishes a message
  try {
    await topic.publish(messageBuffer);
    res.status(200).send('Message published.');
  } catch (err) {
    console.error(err);
    res.status(500).send(err);
    return Promise.reject(err);
  }
};

このスニペットは、Pub/Sub トピックにメッセージが公開されるとトリガーされる subscribe 関数を示しています。

Node.js

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {object} pubsubMessage The Cloud Pub/Sub Message object.
 * @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
 */
exports.subscribe = (pubsubMessage) => {
  // Print out the data from Pub/Sub, to prove that it worked
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
};

本番環境では、次のように、cURL コマンドライン ユーティリティを使用しても publish 関数を呼び出せます。

curl https://GCF_REGION-GCP_PROJECT_ID.cloudfunctions.net/publish -X POST  -d "{\"topic\": \"PUBSUB_TOPIC\", \"message\":\"YOUR_MESSAGE\"}" -H "Content-Type: application/json"

テストとデバッグ以外の場合は、gcloud functions call コマンドを代わりに使用して、関数を直接呼び出すことができます。次の手順では、gcloud functions call コマンドを使用して、上記のサンプルを実行する方法について説明します。

  1. Pub/Sub トピックを作成します。ここで、MY_TOPIC は、作成する新しいトピックの名前です。

    gcloud pubsub topics create MY_TOPIC
  2. publish 関数をデプロイします。ここで、RUNTIME は、使用するランタイムの名前(nodejs8 など)です。

    gcloud functions deploy publish --trigger-http --runtime RUNTIME
  3. subscribe 関数をデプロイします。

    gcloud functions deploy subscribe --trigger-topic MY_TOPIC --runtime RUNTIME
  4. gcloud functions call コマンドを使用して publish 関数を直接呼び出し、必要なデータは、--data 引数で JSON 形式として指定します。

    gcloud functions call publish --data '{"topic":"MY_TOPIC","message":"Hello World!"}'
  5. ログで subscribe 関数を確認します。ログに結果が表示されるまで、数分間かかることがあります。

    gcloud functions logs read subscribe

次のような出力が表示されます。

D    ...Function execution started
I    ...{"data":{"message":"Hello World!"}}
D    ...Function execution took 753 ms, finished with status: 'ok'

関数のデプロイ

次の gcloud コマンドは、Pub/Sub トピックにメッセージが公開されるとトリガーされる関数をデプロイします。

gcloud functions deploy FUNCTION_NAME --trigger-topic TOPIC_NAME FLAGS...
引数 説明
FUNCTION_NAME 関数名。
--trigger-topic TOPIC_NAME 関数が登録されている Pub/Sub トピックの名前。トピックが存在しない場合は、デプロイ時に作成されます。
FLAGS... デプロイ時に指定する必要がある --runtime などの追加フラグ。網羅的なリファレンスについては、gcloud functions deploy のドキュメントをご覧ください。

Pub/Sub トリガーの使用法を示す完全な例については、Pub/Sub チュートリアルをご覧ください。

レガシー Cloud Pub/Sub トリガー

次の gcloud コマンドは、特定のトピックでレガシー Pub/Sub 通知によってトリガーされる関数をデプロイします。これらのイベントをすでに使用しているレガシー関数では、これらの通知がサポートされます。ただし、将来、レガシー通知は削除される可能性があるため、代わりに --trigger-topic フラグの使用をおすすめします。

gcloud functions deploy FUNCTION_NAME \
--trigger-resource TOPIC_NAME \
--trigger-event providers/cloud.pubsub/eventTypes/topic.publish \
FLAGS...

次のステップ

Pub/Sub によってトリガーされるバックグラウンド関数を実装する方法の例については、Pub/Sub チュートリアルをご覧ください。