Google Cloud Pub/Sub 触发器

Cloud Functions 函数使用事件驱动的函数来处理来自 Cloud 基础架构的事件。Cloud Functions 函数可以由发布到与该函数属于同一 Cloud 项目的 Pub/Sub 主题的消息触发。Pub/Sub 是一个覆盖全球的分布式消息总线,可根据需要自动扩缩规模,并可为您自行构建强大的全球服务提供基础。

事件类型

Cloud Functions 函数仅使用一个 Pub/Sub 事件,该事件的触发器类型值为 google.pubsub.topic.publish

每当有消息发布到部署函数时指定的 Pub/Sub 主题时,系统都会发送此事件。发布到此主题的每条消息都将触发函数的执行,同时,消息内容将作为输入数据进行传递。

事件结构

Pub/Sub 主题触发的 Cloud Functions 函数将收到符合 PubsubMessage 类型的事件,需要注意的是,publishTimemessageId 未直接在 PubsubMessage 中提供。您可以改为通过事件元数据的事件 ID 和时间戳属性访问 publishTimemessageId。此元数据可以通过上下文对象进行访问,该上下文对象会在函数被调用时向其进行传递。

PubsubMessage 对象的负载(即发布到主题的数据)将以采用 base64 编码的字符串形式存储在 PubsubMessagedata 特性中。如需提取 PubsubMessage 对象的负载,您可能需要对数据特性进行解码,如下列示例所示。

示例代码

Node.js

/**
 * Background Cloud Function to be triggered by Pub/Sub.
 * This function is exported by index.js, and executed when
 * the trigger topic receives a message.
 *
 * @param {object} message The Pub/Sub message.
 * @param {object} context The event metadata.
 */
exports.helloPubSub = (message, context) => {
  const name = message.data
    ? Buffer.from(message.data, 'base64').toString()
    : 'World';

  console.log(`Hello, ${name}!`);
};

Python

def hello_pubsub(event, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         event (dict):  The dictionary with data specific to this type of
                        event. The `@type` field maps to
                         `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
                        The `data` field maps to the PubsubMessage data
                        in a base64-encoded string. The `attributes` field maps
                        to the PubsubMessage attributes if any is present.
         context (google.cloud.functions.Context): Metadata of triggering event
                        including `event_id` which maps to the PubsubMessage
                        messageId, `timestamp` which maps to the PubsubMessage
                        publishTime, `event_type` which maps to
                        `google.pubsub.topic.publish`, and `resource` which is
                        a dictionary that describes the service API endpoint
                        pubsub.googleapis.com, the triggering topic's name, and
                        the triggering event type
                        `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
    Returns:
        None. The output is written to Cloud Logging.
    """
    import base64

    print("""This Function was triggered by messageId {} published at {} to {}
    """.format(context.event_id, context.timestamp, context.resource["name"]))

    if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
    else:
        name = 'World'
    print('Hello {}!'.format(name))

Go


// Package helloworld provides a set of Cloud Functions samples.
package helloworld

import (
	"context"
	"log"
)

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	name := string(m.Data) // Automatically decoded from base64.
	if name == "" {
		name = "World"
	}
	log.Printf("Hello, %s!", name)
	return nil
}

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.events.cloud.pubsub.v1.Message;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloPubSub implements BackgroundFunction<Message> {
  private static final Logger logger = Logger.getLogger(HelloPubSub.class.getName());

  @Override
  public void accept(Message message, Context context) {
    String name = "world";
    if (message != null && message.getData() != null) {
      name = new String(
          Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
          StandardCharsets.UTF_8);
    }
    logger.info(String.format("Hello %s!", name));
    return;
  }
}

Kotlin

import java.util.Base64
import java.util.logging.Logger

class EventExample {
    companion object {
        var log: Logger = Logger.getLogger(EventExample::class.java.name)
    }

    fun helloPubSub(message: PubSubMessage) {
        val data = String(Base64.getDecoder().decode(message.data))
        log.info(data)
    }
}

data class PubSubMessage(
    val data: String,
    val messageId: String,
    val publishTime: String,
    val attributes: Map<String, String>
)

C#

using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace HelloPubSub
{
    public class Function : ICloudEventFunction<MessagePublishedData>
    {
        private readonly ILogger _logger;

        public Function(ILogger<Function> logger) =>
            _logger = logger;

        public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
        {
            string nameFromMessage = data.Message?.TextData;
            string name = string.IsNullOrEmpty(nameFromMessage) ? "world" : nameFromMessage;
            _logger.LogInformation("Hello {name}", name);
            return Task.CompletedTask;
        }
    }
}

Ruby

require "functions_framework"
require "base64"

FunctionsFramework.cloud_event "hello_pubsub" do |event|
  # The event parameter is a CloudEvents::Event::V1 object.
  # See https://cloudevents.github.io/sdk-ruby/latest/CloudEvents/Event/V1.html
  name = Base64.decode64 event.data["message"]["data"] rescue "World"

  # A cloud_event function does not return a response, but you can log messages
  # or cause side effects such as sending additional events.
  logger.info "Hello, #{name}!"
end

PHP


use CloudEvents\V1\CloudEventInterface;
use Google\CloudFunctions\FunctionsFramework;

// Register the function with Functions Framework.
// This enables omitting the `FUNCTIONS_SIGNATURE_TYPE=cloudevent` environment
// variable when deploying. The `FUNCTION_TARGET` environment variable should
// match the first parameter.
FunctionsFramework::cloudEvent('helloworldPubsub', 'helloworldPubsub');

function helloworldPubsub(CloudEventInterface $event): void
{
    $log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');

    $cloudEventData = $event->getData();
    $pubSubData = base64_decode($cloudEventData['message']['data']);

    $name = $pubSubData ? htmlspecialchars($pubSubData) : 'World';
    fwrite($log, "Hello, $name!" . PHP_EOL);
}

从函数内部发布消息

您还可以从函数内部向 Pub/Sub 主题发布消息。这样,您就可以使用 Cloud Pub/Sub 消息触发后续的 Cloud Functions 函数调用。您可以使用此方法执行以下操作:

  • 将连续的函数调用链接在一起。
  • 跨多个 Cloud Functions 函数实例并行分配(或“扇出”)任务组。

在以下示例中,HTTP publish 函数将消息发送到 Pub/Sub 主题,后者又进一步触发 subscribe 函数。

下面的代码段显示了用于向 Pub/Sub 主题发布消息的 publish 函数:

Node.js

const {PubSub} = require('@google-cloud/pubsub');

// Instantiates a client
const pubsub = new PubSub();

/**
 * Publishes a message to a Cloud Pub/Sub Topic.
 *
 * @example
 * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
 *
 *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
 *
 * @param {object} req Cloud Function request context.
 * @param {object} req.body The request body.
 * @param {string} req.body.topic Topic name on which to publish.
 * @param {string} req.body.message Message to publish.
 * @param {object} res Cloud Function response context.
 */
exports.publish = async (req, res) => {
  if (!req.body.topic || !req.body.message) {
    res
      .status(400)
      .send(
        'Missing parameter(s); include "topic" and "message" properties in your request.'
      );
    return;
  }

  console.log(`Publishing message to topic ${req.body.topic}`);

  // References an existing topic
  const topic = pubsub.topic(req.body.topic);

  const messageObject = {
    data: {
      message: req.body.message,
    },
  };
  const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

  // Publishes a message
  try {
    await topic.publish(messageBuffer);
    res.status(200).send('Message published.');
  } catch (err) {
    console.error(err);
    res.status(500).send(err);
    return Promise.reject(err);
  }
};

Python

import base64
import json
import os

from google.cloud import pubsub_v1

# Instantiates a Pub/Sub client
publisher = pubsub_v1.PublisherClient()
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')

# Publishes a message to a Cloud Pub/Sub topic.
def publish(request):
    request_json = request.get_json(silent=True)

    topic_name = request_json.get("topic")
    message = request_json.get("message")

    if not topic_name or not message:
        return ('Missing "topic" and/or "message" parameter.', 400)

    print(f'Publishing message to topic {topic_name}')

    # References an existing topic
    topic_path = publisher.topic_path(PROJECT_ID, topic_name)

    message_json = json.dumps({
        'data': {'message': message},
    })
    message_bytes = message_json.encode('utf-8')

    # Publishes a message
    try:
        publish_future = publisher.publish(topic_path, data=message_bytes)
        publish_future.result()  # Verify the publish succeeded
        return 'Message published.'
    except Exception as e:
        print(e)
        return (e, 500)

Go


// Package contexttip is an example of how to use Pub/Sub and context.Context in
// a Cloud Function.
package contexttip

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/pubsub"
)

// GOOGLE_CLOUD_PROJECT is a user-set environment variable.
var projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")

// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client

func init() {
	// err is pre-declared to avoid shadowing client.
	var err error

	// client is initialized with context.Background() because it should
	// persist between function invocations.
	client, err = pubsub.NewClient(context.Background(), projectID)
	if err != nil {
		log.Fatalf("pubsub.NewClient: %v", err)
	}
}

type publishRequest struct {
	Topic   string `json:"topic"`
	Message string `json:"message"`
}

// PublishMessage publishes a message to Pub/Sub. PublishMessage only works
// with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request) {
	// Parse the request body to get the topic name and message.
	p := publishRequest{}

	if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
		log.Printf("json.NewDecoder: %v", err)
		http.Error(w, "Error parsing request", http.StatusBadRequest)
		return
	}

	if p.Topic == "" || p.Message == "" {
		s := "missing 'topic' or 'message' parameter"
		log.Println(s)
		http.Error(w, s, http.StatusBadRequest)
		return
	}

	m := &pubsub.Message{
		Data: []byte(p.Message),
	}
	// Publish and Get use r.Context() because they are only needed for this
	// function invocation. If this were a background function, they would use
	// the ctx passed as an argument.
	id, err := client.Topic(p.Topic).Publish(r.Context(), m).Get(r.Context())
	if err != nil {
		log.Printf("topic(%s).Publish.Get: %v", p.Topic, err)
		http.Error(w, "Error publishing message", http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "Message published: %v", id)
}

Java

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class PublishMessage implements HttpFunction {
  // TODO<developer> set this environment variable
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");

  private static final Logger logger = Logger.getLogger(PublishMessage.class.getName());

  @Override
  public void service(HttpRequest request, HttpResponse response) throws IOException {
    Optional<String> maybeTopicName = request.getFirstQueryParameter("topic");
    Optional<String> maybeMessage = request.getFirstQueryParameter("message");

    BufferedWriter responseWriter = response.getWriter();

    if (maybeTopicName.isEmpty() || maybeMessage.isEmpty()) {
      response.setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST);

      responseWriter.write("Missing 'topic' and/or 'message' parameter(s).");
      return;
    }

    String topicName = maybeTopicName.get();
    logger.info("Publishing message to topic: " + topicName);

    // Create the PubsubMessage object
    ByteString byteStr = ByteString.copyFrom(maybeMessage.get(), StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

    Publisher publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, topicName)).build();

    // Attempt to publish the message
    String responseMessage;
    try {
      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
    } catch (InterruptedException | ExecutionException e) {
      logger.log(Level.SEVERE, "Error publishing Pub/Sub message: " + e.getMessage(), e);
      responseMessage = "Error publishing Pub/Sub message; see logs for more info.";
    }

    responseWriter.write(responseMessage);
  }
}

下面的代码段显示了将消息发布到 Pub/Sub 主题时触发的 subscribe 函数:

Node.js

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {object} pubsubMessage The Cloud Pub/Sub Message object.
 * @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
 */
exports.subscribe = pubsubMessage => {
  // Print out the data from Pub/Sub, to prove that it worked
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
};

Python

# Triggered from a message on a Cloud Pub/Sub topic.
def subscribe(event, context):
    # Print out the data from Pub/Sub, to prove that it worked
    print(base64.b64decode(event['data']))

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.events.cloud.pubsub.v1.Message;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Logger;

public class SubscribeToTopic implements BackgroundFunction<Message> {
  private static final Logger logger = Logger.getLogger(SubscribeToTopic.class.getName());

  @Override
  public void accept(Message message, Context context) {
    if (message.getData() == null) {
      logger.info("No message provided");
      return;
    }

    String messageString = new String(
        Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
        StandardCharsets.UTF_8);
    logger.info(messageString);
  }
}

在生产环境中,您可能会使用 cURL 命令行实用程序调用 publish 函数,如下所示:

curl https://GCF_REGION-GCP_PROJECT_ID.cloudfunctions.net/publish -X POST  -d "{\"topic\": \"PUBSUB_TOPIC\", \"message\":\"YOUR_MESSAGE\"}" -H "Content-Type: application/json"

不过,对于测试和调试,您可以改用 gcloud functions call 命令直接调用该函数。以下步骤描述了如何使用 gcloud functions call 命令运行上述示例:

  1. 创建一个 Pub/Sub 主题,其中 MY_TOPIC 是您要创建的新主题的名称:

    gcloud pubsub topics create MY_TOPIC
  2. 部署 publish 函数,其中 RUNTIME 是您正在使用的运行时的名称,例如 nodejs8

    gcloud functions deploy publish --trigger-http --runtime RUNTIME
  3. 部署 subscribe 函数:

    gcloud functions deploy subscribe --trigger-topic MY_TOPIC --runtime RUNTIME
  4. 使用 gcloud functions call 命令直接调用 publish 函数,并在 --data 参数中以 JSON 格式提供所需的数据:

    gcloud functions call publish --data '{"topic":"MY_TOPIC","message":"Hello World!"}'
  5. 检查 subscribe 函数的日志。请注意,结果可能需要过几分钟才能显示在日志中:

    gcloud functions logs read subscribe

您应该会看到类似如下所示的输出:

D    ...Function execution started
I    ...{"data":{"message":"Hello World!"}}
D    ...Function execution took 753 ms, finished with status: 'ok'

部署函数

以下 gcloud 命令会部署一个将消息发布到 Pub/Sub 主题时触发的函数:

gcloud functions deploy FUNCTION_NAME --entry-point ENTRY_POINT --trigger-topic TOPIC_NAME FLAGS...
参数 说明
FUNCTION_NAME 您要部署的 Cloud Functions 函数的注册名称。 这可以是源代码中函数的名称,也可以是任意字符串。如果 FUNCTION_NAME 是任意字符串,则必须添加 --entry-point 标志。
--entry-point ENTRY_POINT 源代码中函数或类的名称。可选,除非您未使用 FUNCTION_NAME 指定源代码中要在部署期间执行的函数。在这种情况下,您必须使用 --entry-point 提供可执行函数的名称。
--trigger-topic TOPIC_NAME 函数订阅的 Pub/Sub 主题的名称。如果该主题不存在,则系统会在部署期间创建一个。
FLAGS... 您在部署期间必须指定的其他标志,例如 --runtime。如需查看完整参考信息,请参阅 gcloud functions deploy 文档

如需查看有关如何使用 Pub/Sub 触发器的完整示例,请参阅 Pub/Sub 教程

旧版 Cloud Pub/Sub 触发器

下面的 gcloud 命令将部署一个由特定主题的旧版 Pub/Sub 通知触发的函数。已在使用这些事件的旧版函数支持这些通知。但是,我们建议您改用 --trigger-topic 标志,因为我们今后可能会移除旧版通知。

gcloud functions deploy FUNCTION_NAME \
--trigger-resource TOPIC_NAME \
--trigger-event providers/cloud.pubsub/eventTypes/topic.publish \
FLAGS...

后续步骤

如需查看有关如何实现由 Pub/Sub 触发的事件驱动函数的示例,请参阅 Pub/Sub 教程