Google Cloud Pub/Sub トリガー

Cloud Functions は、関数と同じ Google Cloud プロジェクトで Pub/Sub トピックに公開されるメッセージによってトリガーできます。Pub/Sub はグローバルに分散されたメッセージバスであり、必要に応じて自動的にスケーリングし、堅牢でグローバルな独自のサービスを構築するための基盤を提供します。

イベントタイプ

Cloud Functions で使用される単一の Pub/Sub イベントが存在し、そのトリガータイプ値は google.pubsub.topic.publish です。

このイベントは、関数のデプロイ時に指定される Pub/Sub トピックにメッセージが公開されたときに送信されます。このトピックに公開されるすべてのメッセージによって関数の実行がトリガーされ、メッセージのコンテンツが入力データとして送られます。

イベントの構造

Pub/Sub トピックからトリガーされる Cloud Functions には、PubsubMessage タイプに準拠するイベントが送信されますが、PubsubMessage では publishTimemessageId を直接的に使用できないことに注意します。代わりに、publishTimemessageId にはイベント メタデータのイベント ID とタイムスタンプのプロパティを使用するとアクセスできます。このメタデータには、呼び出しの際に関数に渡されるコンテキスト オブジェクトを介してアクセスできます

トピックに公開されるデータである PubsubMessage オブジェクトのペイロードは、base64 エンコード文字列として PubsubMessagedata 属性に格納されます。PubsubMessage オブジェクトのペイロードを抽出するには、次の例に示されているように、データ属性のデコードが必要な場合もあります。

サンプルコード

Node.js 8+

/**
     * Background Cloud Function to be triggered by Pub/Sub.
     * This function is exported by index.js, and executed when
     * the trigger topic receives a message.
     *
     * @param {object} pubSubEvent The event payload.
     * @param {object} context The event metadata.
     */
    exports.helloPubSub = (pubSubEvent, context) => {
      const name = pubSubEvent.data
        ? Buffer.from(pubSubEvent.data, 'base64').toString()
        : 'World';

      console.log(`Hello, ${name}!`);
    };

Node.js 6(非推奨)

/**
     * Background Cloud Function to be triggered by Pub/Sub.
     * This function is exported by index.js, and executed when
     * the trigger topic receives a message.
     *
     * @param {object} event The Cloud Functions event.
     * @param {function} callback The callback function.
     */
    exports.helloPubSub = (event, callback) => {
      const pubsubMessage = event.data;
      const name = pubsubMessage.data
        ? Buffer.from(pubsubMessage.data, 'base64').toString()
        : 'World';

      console.log(`Hello, ${name}!`);

      callback();
    };

Python

def hello_pubsub(event, context):
        """Background Cloud Function to be triggered by Pub/Sub.
        Args:
             event (dict):  The dictionary with data specific to this type of
             event. The `data` field contains the PubsubMessage message. The
             `attributes` field will contain custom attributes if there are any.
             context (google.cloud.functions.Context): The Cloud Functions event
             metadata. The `event_id` field contains the Pub/Sub message ID. The
             `timestamp` field contains the publish time.
        """
        import base64

        print("""This Function was triggered by messageId {} published at {}
        """.format(context.event_id, context.timestamp))

        if 'data' in event:
            name = base64.b64decode(event['data']).decode('utf-8')
        else:
            name = 'World'
        print('Hello {}!'.format(name))

Go


    // Package helloworld provides a set of Cloud Functions samples.
    package helloworld

    import (
    	"context"
    	"log"
    )

    // PubSubMessage is the payload of a Pub/Sub event.
    type PubSubMessage struct {
    	Data []byte `json:"data"`
    }

    // HelloPubSub consumes a Pub/Sub message.
    func HelloPubSub(ctx context.Context, m PubSubMessage) error {
    	name := string(m.Data)
    	if name == "" {
    		name = "World"
    	}
    	log.Printf("Hello, %s!", name)
    	return nil
    }
    

関数内からメッセージを公開する

関数内からメッセージを Pub/Sub トピックに公開することもできます。これにより、Cloud Pub/Sub メッセージを使用して、後続の Cloud 関数の呼び出しをトリガーできます。この手法は、以下の処理に使用できます。

  • 順次関数呼び出しを一緒に行う。
  • 複数の Cloud 関数インスタンス間に同時に、タスクのグループを分配(または「ファンアウト」)する。

次の例では、HTTP publish 関数によって、Pub/Sub トピックにメッセージが送信され、それによって subscribe 関数がトリガーされます。

このスニペットは、Pub/Sub トピックにメッセージを公開する publish 関数を示します。

Node.js

/**
     * Publishes a message to a Cloud Pub/Sub Topic.
     *
     * @example
     * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
     *
     *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
     *
     * @param {object} req Cloud Function request context.
     * @param {object} req.body The request body.
     * @param {string} req.body.topic Topic name on which to publish.
     * @param {string} req.body.message Message to publish.
     * @param {object} res Cloud Function response context.
     */
    exports.publish = async (req, res) => {
      if (!req.body.topic || !req.body.message) {
        res
          .status(500)
          .send(
            'Missing parameter(s); include "topic" and "subscription" properties in your request.'
          );
        return;
      }

      console.log(`Publishing message to topic ${req.body.topic}`);

      // References an existing topic
      const topic = pubsub.topic(req.body.topic);

      const messageObject = {
        data: {
          message: req.body.message,
        },
      };
      const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

      // Publishes a message
      try {
        await topic.publish(messageBuffer);
        res.status(200).send('Message published.');
      } catch (err) {
        console.error(err);
        res.status(500).send(err);
        return Promise.reject(err);
      }
    };

このスニペットは、Pub/Sub トピックにメッセージが公開されるとトリガーされる subscribe 関数を示しています。

Node.js

/**
     * Triggered from a message on a Cloud Pub/Sub topic.
     *
     * @param {object} pubsubMessage The Cloud Pub/Sub Message object.
     * @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
     */
    exports.subscribe = pubsubMessage => {
      // Print out the data from Pub/Sub, to prove that it worked
      console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
    };

本番環境では、次のように、cURL コマンドライン ユーティリティを使用しても publish 関数を呼び出せます。

    curl https://GCF_REGION-GCP_PROJECT_ID.cloudfunctions.net/publish -X POST  -d "{\"topic\": \"PUBSUB_TOPIC\", \"message\":\"YOUR_MESSAGE\"}" -H "Content-Type: application/json"
    

一方で、テスト環境やデバッグでは、gcloud functions call コマンドを代わりに使用して、関数を直接呼び出すことができます。次の手順では、gcloud functions call コマンドを使用して、上記のサンプルを実行する方法について説明します。

  1. Pub/Sub トピックを作成します。ここで、MY_TOPIC は、作成する新しいトピックの名前です。

    gcloud pubsub topics create MY_TOPIC
  2. publish 関数をデプロイします。ここで、RUNTIME は、使用するランタイムの名前(nodejs8 など)です。

    gcloud functions deploy publish --trigger-http --runtime RUNTIME
  3. subscribe 関数をデプロイします。

    gcloud functions deploy subscribe --trigger-topic MY_TOPIC --runtime RUNTIME
  4. gcloud functions call コマンドを使用して publish 関数を直接呼び出し、必要なデータは、--data 引数で JSON 形式として指定します。

    gcloud functions call publish --data '{"topic":"MY_TOPIC","message":"Hello World!"}'
  5. ログで subscribe 関数を確認します。ログに結果が表示されるまで、数分間かかることがあります。

    gcloud functions logs read subscribe

次のような出力が表示されます。

    D    ...Function execution started
    I    ...{"data":{"message":"Hello World!"}}
    D    ...Function execution took 753 ms, finished with status: 'ok'
    

関数のデプロイ

次の gcloud コマンドは、Pub/Sub トピックにメッセージが公開されるとトリガーされる関数をデプロイします。

    gcloud functions deploy FUNCTION_NAME --trigger-topic TOPIC_NAME FLAGS...
    
引数 説明
FUNCTION_NAME 関数名。
--trigger-topic TOPIC_NAME 関数が登録されている Pub/Sub トピックの名前。トピックが存在しない場合は、デプロイ時に作成されます。
FLAGS... デプロイ時に指定する必要がある --runtime などの追加フラグ。詳細については、gcloud functions deploy ドキュメントをご覧ください。

Pub/Sub トリガーの使用法を示す完全な例については、Pub/Sub チュートリアルをご覧ください。

レガシー Cloud Pub/Sub トリガー

次の gcloud コマンドは、特定のトピックでレガシー Pub/Sub 通知によってトリガーされる関数をデプロイします。これらのイベントをすでに使用しているレガシー関数では、これらの通知がサポートされます。ただし、将来、レガシー通知は削除される可能性があるため、代わりに --trigger-topic フラグの使用をおすすめします。

    gcloud functions deploy FUNCTION_NAME \
    --trigger-resource TOPIC_NAME \
    --trigger-event providers/cloud.pubsub/eventTypes/topic.publish \
    FLAGS...
    

次のステップ

Pub/Sub によってトリガーされるバックグラウンド関数を実装する方法の例については、Pub/Sub チュートリアルをご覧ください。