Déclencheurs Google Cloud Pub/Sub

Cloud Functions utilise des fonctions basées sur des événements pour gérer les événements de votre infrastructure Cloud. Par exemple, les fonctions Cloud Functions peuvent être déclenchées par des messages publiés dans les sujets Pub/Sub dans le même projet Cloud que la fonction. Pub/Sub est un service de messagerie distribué dans le monde entier qui effectue un scaling automatique en fonction des besoins et vous permet de développer des services robustes à l'échelle mondiale.

Types d'événement

Un seul événement Pub/Sub est utilisé par Cloud Functions. Il est associé à la valeur du type de déclencheur suivante : google.pubsub.topic.publish.

Cet événement est envoyé lorsqu'un message est publié dans un sujet Pub/Sub, qui est spécifié lorsqu'une fonction est déployée. Chaque message publié dans ce sujet déclenche l'exécution de la fonction avec le contenu des messages transmis en tant que données d'entrée.

Structure de l'événement

Les fonctions Cloud Functions déclenchées à partir d'un sujet Pub/Sub reçoivent des événements conformes au type PubsubMessage, avec la mise en garde suivante : publishTime et messageId ne sont pas directement disponibles dans le PubsubMessage. À la place, vous pouvez accéder à publishTime et messageId via l'ID de l'événement et les propriétés d'horodatage des métadonnées de l'événement. Ces métadonnées sont accessibles via l'objet de contexte transmis à votre fonction lorsqu'elle est appelée.

La charge utile de l'objet PubsubMessage (les données que vous avez publiées sur le sujet) est stockée sous la forme d'une chaîne encodée en base64 dans l'attribut data du PubsubMessage. Pour extraire la charge utile de l'objet PubsubMessage, vous devrez peut-être décoder l'attribut de données, comme indiqué dans les exemples ci-dessous.

Exemple de code

Node.js

/**
 * Background Cloud Function to be triggered by Pub/Sub.
 * This function is exported by index.js, and executed when
 * the trigger topic receives a message.
 *
 * @param {object} message The Pub/Sub message.
 * @param {object} context The event metadata.
 */
exports.helloPubSub = (message, context) => {
  const name = message.data
    ? Buffer.from(message.data, 'base64').toString()
    : 'World';

  console.log(`Hello, ${name}!`);
};

Python

def hello_pubsub(event, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         event (dict):  The dictionary with data specific to this type of
                        event. The `@type` field maps to
                         `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
                        The `data` field maps to the PubsubMessage data
                        in a base64-encoded string. The `attributes` field maps
                        to the PubsubMessage attributes if any is present.
         context (google.cloud.functions.Context): Metadata of triggering event
                        including `event_id` which maps to the PubsubMessage
                        messageId, `timestamp` which maps to the PubsubMessage
                        publishTime, `event_type` which maps to
                        `google.pubsub.topic.publish`, and `resource` which is
                        a dictionary that describes the service API endpoint
                        pubsub.googleapis.com, the triggering topic's name, and
                        the triggering event type
                        `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
    Returns:
        None. The output is written to Cloud Logging.
    """
    import base64

    print("""This Function was triggered by messageId {} published at {} to {}
    """.format(context.event_id, context.timestamp, context.resource["name"]))

    if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
    else:
        name = 'World'
    print('Hello {}!'.format(name))

Go


// Package helloworld provides a set of Cloud Functions samples.
package helloworld

import (
	"context"
	"log"
)

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	name := string(m.Data) // Automatically decoded from base64.
	if name == "" {
		name = "World"
	}
	log.Printf("Hello, %s!", name)
	return nil
}

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.events.cloud.pubsub.v1.Message;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloPubSub implements BackgroundFunction<Message> {
  private static final Logger logger = Logger.getLogger(HelloPubSub.class.getName());

  @Override
  public void accept(Message message, Context context) {
    String name = "world";
    if (message != null && message.getData() != null) {
      name = new String(
          Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
          StandardCharsets.UTF_8);
    }
    logger.info(String.format("Hello %s!", name));
    return;
  }
}

Kotlin

import java.util.Base64
import java.util.logging.Logger

class EventExample {
    companion object {
        var log: Logger = Logger.getLogger(EventExample::class.java.name)
    }

    fun helloPubSub(message: PubSubMessage) {
        val data = String(Base64.getDecoder().decode(message.data))
        log.info(data)
    }
}

data class PubSubMessage(
    val data: String,
    val messageId: String,
    val publishTime: String,
    val attributes: Map<String, String>
)

C#

using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace HelloPubSub
{
    public class Function : ICloudEventFunction<MessagePublishedData>
    {
        private readonly ILogger _logger;

        public Function(ILogger<Function> logger) =>
            _logger = logger;

        public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
        {
            string nameFromMessage = data.Message?.TextData;
            string name = string.IsNullOrEmpty(nameFromMessage) ? "world" : nameFromMessage;
            _logger.LogInformation("Hello {name}", name);
            return Task.CompletedTask;
        }
    }
}

Ruby

require "functions_framework"
require "base64"

FunctionsFramework.cloud_event "hello_pubsub" do |event|
  # The event parameter is a CloudEvents::Event::V1 object.
  # See https://cloudevents.github.io/sdk-ruby/latest/CloudEvents/Event/V1.html
  name = Base64.decode64 event.data["message"]["data"] rescue "World"

  # A cloud_event function does not return a response, but you can log messages
  # or cause side effects such as sending additional events.
  logger.info "Hello, #{name}!"
end

PHP


use Google\CloudFunctions\CloudEvent;

function helloworldPubsub(CloudEvent $event): void
{
    $log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');

    $cloudEventData = $event->getData();
    $pubSubData = base64_decode($cloudEventData['message']['data']);

    $name = $pubSubData ? htmlspecialchars($pubSubData) : 'World';
    fwrite($log, "Hello, $name!" . PHP_EOL);
}

Publier un message à partir d'une fonction

Vous pouvez également publier un message dans un sujet Pub/Sub à partir d'une fonction. Cela vous permet de déclencher des appels de fonctions Cloud ultérieurs à l'aide des messages Cloud Pub/Sub. Cette technique vous permet :

  • d'associer des appels de fonction séquentiels ;
  • de distribuer (ou d'effectuer une distribution ramifiée) des groupes de tâches en parallèle sur plusieurs instances de Cloud Functions.

Dans l'exemple suivant, une fonction publish HTTP envoie un message à un sujet Pub/Sub, qui à son tour déclenche une fonction subscribe.

Dans cet extrait, la fonction publish publie un message sur un sujet Pub/Sub.

Node.js

const {PubSub} = require('@google-cloud/pubsub');

// Instantiates a client
const pubsub = new PubSub();

/**
 * Publishes a message to a Cloud Pub/Sub Topic.
 *
 * @example
 * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
 *
 *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
 *
 * @param {object} req Cloud Function request context.
 * @param {object} req.body The request body.
 * @param {string} req.body.topic Topic name on which to publish.
 * @param {string} req.body.message Message to publish.
 * @param {object} res Cloud Function response context.
 */
exports.publish = async (req, res) => {
  if (!req.body.topic || !req.body.message) {
    res
      .status(400)
      .send(
        'Missing parameter(s); include "topic" and "message" properties in your request.'
      );
    return;
  }

  console.log(`Publishing message to topic ${req.body.topic}`);

  // References an existing topic
  const topic = pubsub.topic(req.body.topic);

  const messageObject = {
    data: {
      message: req.body.message,
    },
  };
  const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

  // Publishes a message
  try {
    await topic.publish(messageBuffer);
    res.status(200).send('Message published.');
  } catch (err) {
    console.error(err);
    res.status(500).send(err);
    return Promise.reject(err);
  }
};

Python

import base64
import json
import os

from google.cloud import pubsub_v1

# Instantiates a Pub/Sub client
publisher = pubsub_v1.PublisherClient()
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')

# Publishes a message to a Cloud Pub/Sub topic.
def publish(request):
    request_json = request.get_json(silent=True)

    topic_name = request_json.get("topic")
    message = request_json.get("message")

    if not topic_name or not message:
        return ('Missing "topic" and/or "message" parameter.', 400)

    print(f'Publishing message to topic {topic_name}')

    # References an existing topic
    topic_path = publisher.topic_path(PROJECT_ID, topic_name)

    message_json = json.dumps({
        'data': {'message': message},
    })
    message_bytes = message_json.encode('utf-8')

    # Publishes a message
    try:
        publish_future = publisher.publish(topic_path, data=message_bytes)
        publish_future.result()  # Verify the publish succeeded
        return 'Message published.'
    except Exception as e:
        print(e)
        return (e, 500)

Go


// Package contexttip is an example of how to use Pub/Sub and context.Context in
// a Cloud Function.
package contexttip

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/pubsub"
)

// GOOGLE_CLOUD_PROJECT is a user-set environment variable.
var projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")

// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client

func init() {
	// err is pre-declared to avoid shadowing client.
	var err error

	// client is initialized with context.Background() because it should
	// persist between function invocations.
	client, err = pubsub.NewClient(context.Background(), projectID)
	if err != nil {
		log.Fatalf("pubsub.NewClient: %v", err)
	}
}

type publishRequest struct {
	Topic   string `json:"topic"`
	Message string `json:"message"`
}

// PublishMessage publishes a message to Pub/Sub. PublishMessage only works
// with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request) {
	// Parse the request body to get the topic name and message.
	p := publishRequest{}

	if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
		log.Printf("json.NewDecoder: %v", err)
		http.Error(w, "Error parsing request", http.StatusBadRequest)
		return
	}

	if p.Topic == "" || p.Message == "" {
		s := "missing 'topic' or 'message' parameter"
		log.Println(s)
		http.Error(w, s, http.StatusBadRequest)
		return
	}

	m := &pubsub.Message{
		Data: []byte(p.Message),
	}
	// Publish and Get use r.Context() because they are only needed for this
	// function invocation. If this were a background function, they would use
	// the ctx passed as an argument.
	id, err := client.Topic(p.Topic).Publish(r.Context(), m).Get(r.Context())
	if err != nil {
		log.Printf("topic(%s).Publish.Get: %v", p.Topic, err)
		http.Error(w, "Error publishing message", http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "Message published: %v", id)
}

Java

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class PublishMessage implements HttpFunction {
  // TODO<developer> set this environment variable
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");

  private static final Logger logger = Logger.getLogger(PublishMessage.class.getName());

  @Override
  public void service(HttpRequest request, HttpResponse response) throws IOException {
    Optional<String> maybeTopicName = request.getFirstQueryParameter("topic");
    Optional<String> maybeMessage = request.getFirstQueryParameter("message");

    BufferedWriter responseWriter = response.getWriter();

    if (maybeTopicName.isEmpty() || maybeMessage.isEmpty()) {
      response.setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST);

      responseWriter.write("Missing 'topic' and/or 'message' parameter(s).");
      return;
    }

    String topicName = maybeTopicName.get();
    logger.info("Publishing message to topic: " + topicName);

    // Create the PubsubMessage object
    ByteString byteStr = ByteString.copyFrom(maybeMessage.get(), StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

    Publisher publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, topicName)).build();

    // Attempt to publish the message
    String responseMessage;
    try {
      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
    } catch (InterruptedException | ExecutionException e) {
      logger.log(Level.SEVERE, "Error publishing Pub/Sub message: " + e.getMessage(), e);
      responseMessage = "Error publishing Pub/Sub message; see logs for more info.";
    }

    responseWriter.write(responseMessage);
  }
}

Cet extrait illustre la fonction subscribe qui est déclenchée lorsque le message est publié dans le sujet Pub/Sub :

Node.js

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {object} pubsubMessage The Cloud Pub/Sub Message object.
 * @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
 */
exports.subscribe = pubsubMessage => {
  // Print out the data from Pub/Sub, to prove that it worked
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
};

Python

# Triggered from a message on a Cloud Pub/Sub topic.
def subscribe(event, context):
    # Print out the data from Pub/Sub, to prove that it worked
    print(base64.b64decode(event['data']))

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.events.cloud.pubsub.v1.Message;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Logger;

public class SubscribeToTopic implements BackgroundFunction<Message> {
  private static final Logger logger = Logger.getLogger(SubscribeToTopic.class.getName());

  @Override
  public void accept(Message message, Context context) {
    if (message.getData() == null) {
      logger.info("No message provided");
      return;
    }

    String messageString = new String(
        Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
        StandardCharsets.UTF_8);
    logger.info(messageString);
  }
}

En production, vous pouvez utiliser l'utilitaire de ligne de commande cURL pour appeler la fonction publish, comme suit :

curl https://GCF_REGION-GCP_PROJECT_ID.cloudfunctions.net/publish -X POST  -d "{\"topic\": \"PUBSUB_TOPIC\", \"message\":\"YOUR_MESSAGE\"}" -H "Content-Type: application/json"

Toutefois, à des fins de test et de débogage, utilisez plutôt la commande gcloud functions call pour appeler directement la fonction. Dans les étapes suivantes, vous allez apprendre à exécuter l'exemple ci-dessus à l'aide de la commande gcloud functions call :

  1. Créez un sujet Pub/Sub, où MY_TOPIC est le nom du sujet que vous créez :

    gcloud pubsub topics create MY_TOPIC
  2. Déployez la fonction publish, où RUNTIME est le nom de l'environnement d'exécution que vous utilisez (par exemple, nodejs8) :

    gcloud functions deploy publish --trigger-http --runtime RUNTIME
  3. Déployez la fonction subscribe :

    gcloud functions deploy subscribe --trigger-topic MY_TOPIC --runtime RUNTIME
  4. Appelez directement la fonction publish à l'aide de la commande gcloud functions call et fournissez les données requises au format JSON dans l'argument --data :

    gcloud functions call publish --data '{"topic":"MY_TOPIC","message":"Hello World!"}'
  5. Recherchez la fonction subscribe dans les journaux. L'affichage de vos résultats dans le journal peut prendre quelques minutes :

    gcloud functions logs read subscribe

Un résultat semblable à celui-ci doit s'afficher :

D    ...Function execution started
I    ...{"data":{"message":"Hello World!"}}
D    ...Function execution took 753 ms, finished with status: 'ok'

Déployer votre fonction

La commande gcloud suivante déploie une fonction qui se déclenche lorsqu'un message est publié dans un sujet Pub/Sub :

gcloud functions deploy FUNCTION_NAME --trigger-topic TOPIC_NAME FLAGS...
Argument Description
FUNCTION_NAME Nom de la fonction.
--trigger-topic TOPIC_NAME Nom du sujet Pub/Sub auquel la fonction est abonnée. Si le sujet n'existe pas, il est créé lors du déploiement.
FLAGS... Options supplémentaires à spécifier lors du déploiement, telles que --runtime. Pour accéder aux informations complètes, consultez la documentation sur gcloud functions deploy.

Consultez le Tutoriel Pub/Sub pour obtenir un exemple complet d'utilisation des déclencheurs Pub/Sub.

Déclencheurs de l'ancien service Cloud Pub/Sub

La commande gcloud ci-dessous déploie une fonction déclenchée par les notifications de l'ancien service Pub/Sub sur un sujet spécifique. Ces notifications sont compatibles avec les anciennes fonctions qui utilisent déjà ces événements. Nous vous recommandons cependant d'utiliser l'option --trigger-topic, car les anciennes notifications risquent d'être supprimées plus tard.

gcloud functions deploy FUNCTION_NAME \
--trigger-resource TOPIC_NAME \
--trigger-event providers/cloud.pubsub/eventTypes/topic.publish \
FLAGS...

Étapes suivantes

Reportez-vous au tutoriel Pub/Sub pour obtenir un exemple de mise en œuvre d'une fonction basée sur des événements déclenchée par Pub/Sub.