Activadores de Google Cloud Pub/Sub

Cloud Functions usa funciones controladas por eventos para controlar los eventos de tu infraestructura de Cloud. Por ejemplo, las funciones de Cloud Functions se pueden activar mediante mensajes publicados en temas de Pub/Sub en el mismo proyecto de Cloud en el que se encuentran las funciones. Pub/Sub es un bus de mensajes distribuido a nivel global que se escala de forma automática cuando lo necesitas y brinda la base para que compiles tus propios servicios globales sólidos.

Tipos de eventos

Cloud Functions usa un solo evento de Pub/Sub y tiene el valor de tipo de activador google.pubsub.topic.publish.

Este evento se envía cuando se publica un mensaje en un tema de Pub/Sub que se especifica cuando se implementa una función. Cada mensaje publicado en este tema activará la ejecución de la función con el contenido del mensaje pasado como datos de entrada.

Estructura de eventos

Las funciones de Cloud Functions activadas desde un tema de Pub/Sub recibirán eventos que se ajusten al tipo de PubsubMessage, con la salvedad de que publishTime y messageId no están disponibles directamente en el PubsubMessage. En cambio, puedes acceder a publishTime y messageId a través del ID del evento y las propiedades de marca de tiempo de los metadatos del evento. Se puede acceder a estos metadatos a través del objeto de contexto que se traslada a tu función cuando se invoca.

La carga útil del objeto PubsubMessage, los datos publicados sobre el tema, se almacena como una string codificada en base64 en el atributo data de PubsubMessage. Para extraer la carga útil del objeto PubsubMessage, es posible que debas decodificar el atributo de datos, como se muestra en los ejemplos a continuación.

Código de muestra

Node.js

/**
 * Background Cloud Function to be triggered by Pub/Sub.
 * This function is exported by index.js, and executed when
 * the trigger topic receives a message.
 *
 * @param {object} message The Pub/Sub message.
 * @param {object} context The event metadata.
 */
exports.helloPubSub = (message, context) => {
  const name = message.data
    ? Buffer.from(message.data, 'base64').toString()
    : 'World';

  console.log(`Hello, ${name}!`);
};

Python

def hello_pubsub(event, context):
    """Background Cloud Function to be triggered by Pub/Sub.
    Args:
         event (dict):  The dictionary with data specific to this type of
                        event. The `@type` field maps to
                         `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
                        The `data` field maps to the PubsubMessage data
                        in a base64-encoded string. The `attributes` field maps
                        to the PubsubMessage attributes if any is present.
         context (google.cloud.functions.Context): Metadata of triggering event
                        including `event_id` which maps to the PubsubMessage
                        messageId, `timestamp` which maps to the PubsubMessage
                        publishTime, `event_type` which maps to
                        `google.pubsub.topic.publish`, and `resource` which is
                        a dictionary that describes the service API endpoint
                        pubsub.googleapis.com, the triggering topic's name, and
                        the triggering event type
                        `type.googleapis.com/google.pubsub.v1.PubsubMessage`.
    Returns:
        None. The output is written to Cloud Logging.
    """
    import base64

    print("""This Function was triggered by messageId {} published at {} to {}
    """.format(context.event_id, context.timestamp, context.resource["name"]))

    if 'data' in event:
        name = base64.b64decode(event['data']).decode('utf-8')
    else:
        name = 'World'
    print('Hello {}!'.format(name))

Go


// Package helloworld provides a set of Cloud Functions samples.
package helloworld

import (
	"context"
	"log"
)

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Data []byte `json:"data"`
}

// HelloPubSub consumes a Pub/Sub message.
func HelloPubSub(ctx context.Context, m PubSubMessage) error {
	name := string(m.Data) // Automatically decoded from base64.
	if name == "" {
		name = "World"
	}
	log.Printf("Hello, %s!", name)
	return nil
}

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.events.cloud.pubsub.v1.Message;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Level;
import java.util.logging.Logger;

public class HelloPubSub implements BackgroundFunction<Message> {
  private static final Logger logger = Logger.getLogger(HelloPubSub.class.getName());

  @Override
  public void accept(Message message, Context context) {
    String name = "world";
    if (message != null && message.getData() != null) {
      name = new String(
          Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
          StandardCharsets.UTF_8);
    }
    logger.info(String.format("Hello %s!", name));
    return;
  }
}

Kotlin

import java.util.Base64
import java.util.logging.Logger

class EventExample {
    companion object {
        var log: Logger = Logger.getLogger(EventExample::class.java.name)
    }

    fun helloPubSub(message: PubSubMessage) {
        val data = String(Base64.getDecoder().decode(message.data))
        log.info(data)
    }
}

data class PubSubMessage(
    val data: String,
    val messageId: String,
    val publishTime: String,
    val attributes: Map<String, String>
)

C#

using CloudNative.CloudEvents;
using Google.Cloud.Functions.Framework;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Threading.Tasks;

namespace HelloPubSub
{
    public class Function : ICloudEventFunction<MessagePublishedData>
    {
        private readonly ILogger _logger;

        public Function(ILogger<Function> logger) =>
            _logger = logger;

        public Task HandleAsync(CloudEvent cloudEvent, MessagePublishedData data, CancellationToken cancellationToken)
        {
            string nameFromMessage = data.Message?.TextData;
            string name = string.IsNullOrEmpty(nameFromMessage) ? "world" : nameFromMessage;
            _logger.LogInformation("Hello {name}", name);
            return Task.CompletedTask;
        }
    }
}

Ruby

require "functions_framework"
require "base64"

FunctionsFramework.cloud_event "hello_pubsub" do |event|
  # The event parameter is a CloudEvents::Event::V1 object.
  # See https://cloudevents.github.io/sdk-ruby/latest/CloudEvents/Event/V1.html
  name = Base64.decode64 event.data["message"]["data"] rescue "World"

  # A cloud_event function does not return a response, but you can log messages
  # or cause side effects such as sending additional events.
  logger.info "Hello, #{name}!"
end

PHP


use Google\CloudFunctions\CloudEvent;

function helloworldPubsub(CloudEvent $event): void
{
    $log = fopen(getenv('LOGGER_OUTPUT') ?: 'php://stderr', 'wb');

    $cloudEventData = $event->getData();
    $pubSubData = base64_decode($cloudEventData['message']['data']);

    $name = $pubSubData ? htmlspecialchars($pubSubData) : 'World';
    fwrite($log, "Hello, $name!" . PHP_EOL);
}

Publica un mensaje desde una función

También puedes publicar un mensaje en un tema de Pub/Sub desde una función. Esto te permite activar invocaciones posteriores de Cloud Functions con mensajes de Cloud Pub/Sub. Puedes usar esta técnica para hacer lo siguiente:

  • Unir llamadas a funciones secuenciales
  • Distribuir (o “fan-out”) grupos de tareas en paralelo en varias instancias de Cloud Functions

En el siguiente ejemplo, una función de HTTP publish envía un mensaje a un tema de Pub/Sub y, a su vez, activa una función subscribe.

En este fragmento, se muestra la función publish que publica un mensaje en un tema de Pub/Sub:

.

Node.js

const {PubSub} = require('@google-cloud/pubsub');

// Instantiates a client
const pubsub = new PubSub();

/**
 * Publishes a message to a Cloud Pub/Sub Topic.
 *
 * @example
 * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
 *
 *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
 *
 * @param {object} req Cloud Function request context.
 * @param {object} req.body The request body.
 * @param {string} req.body.topic Topic name on which to publish.
 * @param {string} req.body.message Message to publish.
 * @param {object} res Cloud Function response context.
 */
exports.publish = async (req, res) => {
  if (!req.body.topic || !req.body.message) {
    res
      .status(400)
      .send(
        'Missing parameter(s); include "topic" and "message" properties in your request.'
      );
    return;
  }

  console.log(`Publishing message to topic ${req.body.topic}`);

  // References an existing topic
  const topic = pubsub.topic(req.body.topic);

  const messageObject = {
    data: {
      message: req.body.message,
    },
  };
  const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

  // Publishes a message
  try {
    await topic.publish(messageBuffer);
    res.status(200).send('Message published.');
  } catch (err) {
    console.error(err);
    res.status(500).send(err);
    return Promise.reject(err);
  }
};

Python

import base64
import json
import os

from google.cloud import pubsub_v1

# Instantiates a Pub/Sub client
publisher = pubsub_v1.PublisherClient()
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')

# Publishes a message to a Cloud Pub/Sub topic.
def publish(request):
    request_json = request.get_json(silent=True)

    topic_name = request_json.get("topic")
    message = request_json.get("message")

    if not topic_name or not message:
        return ('Missing "topic" and/or "message" parameter.', 400)

    print(f'Publishing message to topic {topic_name}')

    # References an existing topic
    topic_path = publisher.topic_path(PROJECT_ID, topic_name)

    message_json = json.dumps({
        'data': {'message': message},
    })
    message_bytes = message_json.encode('utf-8')

    # Publishes a message
    try:
        publish_future = publisher.publish(topic_path, data=message_bytes)
        publish_future.result()  # Verify the publish succeeded
        return 'Message published.'
    except Exception as e:
        print(e)
        return (e, 500)

Go


// Package contexttip is an example of how to use Pub/Sub and context.Context in
// a Cloud Function.
package contexttip

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/pubsub"
)

// GOOGLE_CLOUD_PROJECT is a user-set environment variable.
var projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")

// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client

func init() {
	// err is pre-declared to avoid shadowing client.
	var err error

	// client is initialized with context.Background() because it should
	// persist between function invocations.
	client, err = pubsub.NewClient(context.Background(), projectID)
	if err != nil {
		log.Fatalf("pubsub.NewClient: %v", err)
	}
}

type publishRequest struct {
	Topic   string `json:"topic"`
	Message string `json:"message"`
}

// PublishMessage publishes a message to Pub/Sub. PublishMessage only works
// with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request) {
	// Parse the request body to get the topic name and message.
	p := publishRequest{}

	if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
		log.Printf("json.NewDecoder: %v", err)
		http.Error(w, "Error parsing request", http.StatusBadRequest)
		return
	}

	if p.Topic == "" || p.Message == "" {
		s := "missing 'topic' or 'message' parameter"
		log.Println(s)
		http.Error(w, s, http.StatusBadRequest)
		return
	}

	m := &pubsub.Message{
		Data: []byte(p.Message),
	}
	// Publish and Get use r.Context() because they are only needed for this
	// function invocation. If this were a background function, they would use
	// the ctx passed as an argument.
	id, err := client.Topic(p.Topic).Publish(r.Context(), m).Get(r.Context())
	if err != nil {
		log.Printf("topic(%s).Publish.Get: %v", p.Topic, err)
		http.Error(w, "Error publishing message", http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "Message published: %v", id)
}

Java

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class PublishMessage implements HttpFunction {
  // TODO<developer> set this environment variable
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");

  private static final Logger logger = Logger.getLogger(PublishMessage.class.getName());

  @Override
  public void service(HttpRequest request, HttpResponse response) throws IOException {
    Optional<String> maybeTopicName = request.getFirstQueryParameter("topic");
    Optional<String> maybeMessage = request.getFirstQueryParameter("message");

    BufferedWriter responseWriter = response.getWriter();

    if (maybeTopicName.isEmpty() || maybeMessage.isEmpty()) {
      response.setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST);

      responseWriter.write("Missing 'topic' and/or 'message' parameter(s).");
      return;
    }

    String topicName = maybeTopicName.get();
    logger.info("Publishing message to topic: " + topicName);

    // Create the PubsubMessage object
    ByteString byteStr = ByteString.copyFrom(maybeMessage.get(), StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

    Publisher publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, topicName)).build();

    // Attempt to publish the message
    String responseMessage;
    try {
      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
    } catch (InterruptedException | ExecutionException e) {
      logger.log(Level.SEVERE, "Error publishing Pub/Sub message: " + e.getMessage(), e);
      responseMessage = "Error publishing Pub/Sub message; see logs for more info.";
    }

    responseWriter.write(responseMessage);
  }
}

En este fragmento, se muestra la función subscribe que se activa cuando el mensaje se publica en el tema de Pub/Sub:

Node.js

/**
 * Triggered from a message on a Cloud Pub/Sub topic.
 *
 * @param {object} pubsubMessage The Cloud Pub/Sub Message object.
 * @param {string} pubsubMessage.data The "data" property of the Cloud Pub/Sub Message.
 */
exports.subscribe = pubsubMessage => {
  // Print out the data from Pub/Sub, to prove that it worked
  console.log(Buffer.from(pubsubMessage.data, 'base64').toString());
};

Python

# Triggered from a message on a Cloud Pub/Sub topic.
def subscribe(event, context):
    # Print out the data from Pub/Sub, to prove that it worked
    print(base64.b64decode(event['data']))

Java


import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import com.google.events.cloud.pubsub.v1.Message;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.logging.Logger;

public class SubscribeToTopic implements BackgroundFunction<Message> {
  private static final Logger logger = Logger.getLogger(SubscribeToTopic.class.getName());

  @Override
  public void accept(Message message, Context context) {
    if (message.getData() == null) {
      logger.info("No message provided");
      return;
    }

    String messageString = new String(
        Base64.getDecoder().decode(message.getData().getBytes(StandardCharsets.UTF_8)),
        StandardCharsets.UTF_8);
    logger.info(messageString);
  }
}

En producción, puedes usar la utilidad de línea de comandos cURL para invocar la función publish, de la siguiente manera:

curl https://GCF_REGION-GCP_PROJECT_ID.cloudfunctions.net/publish -X POST  -d "{\"topic\": \"PUBSUB_TOPIC\", \"message\":\"YOUR_MESSAGE\"}" -H "Content-Type: application/json"

Sin embargo, en el caso de las pruebas y la depuración, puedes utilizar el comando gcloud functions call para llamar a la función directamente. En los siguientes pasos, se describe cómo ejecutar el ejemplo anterior con el comando gcloud functions call:

  1. Crea un tema de Pub/Sub, en el que MY_TOPIC sea el nombre del tema nuevo que estás creando:

    gcloud pubsub topics create MY_TOPIC
  2. Implementa la función publish, en la cual RUNTIME es el nombre del entorno de ejecución que estás utilizando, como nodejs8:

    gcloud functions deploy publish --trigger-http --runtime RUNTIME
  3. Implementa la función subscribe:

    gcloud functions deploy subscribe --trigger-topic MY_TOPIC --runtime RUNTIME
  4. Invoca la función publish directamente mediante el comando gcloud functions call y proporciona los datos obligatorios, como JSON en el argumento --data:

    gcloud functions call publish --data '{"topic":"MY_TOPIC","message":"Hello World!"}'
  5. Verifica los registros de la función subscribe. Ten en cuenta que los resultados aparecen en el registro luego de unos minutos:

    gcloud functions logs read subscribe

Deberías ver un resultado similar a este:

D    ...Function execution started
I    ...{"data":{"message":"Hello World!"}}
D    ...Function execution took 753 ms, finished with status: 'ok'

Implementa la función

Mediante el siguiente comando de gcloud, se implementa una función que se activa cuando un mensaje se publica en un tema de Pub/Sub:

gcloud functions deploy FUNCTION_NAME --entry-point ENTRY_POINT --trigger-topic TOPIC_NAME FLAGS...
Argumento Descripción
FUNCTION_NAME El nombre registrado de la función de Cloud Functions que estás implementando. Puede ser el nombre de una función en tu código fuente o una string arbitraria. Si FUNCTION_NAME es una string arbitraria, debes incluir la marca --entry-point.
--entry-point ENTRY_POINT El nombre de una función o clase en tu código fuente. Opcional, a menos que no hayas usado FUNCTION_NAME para especificar la función en tu código fuente que se ejecutará durante la implementación. En ese caso, debes usar --entry-point para proporcionar el nombre de la función ejecutable.
--trigger-topic TOPIC_NAME Es el nombre del tema de Pub/Sub al que se suscribe la función. Si el tema no existe, se crea durante la implementación.
FLAGS... Son marcas adicionales que debes especificar durante la implementación, como --runtime. Para obtener una referencia completa, consulta la documentación de gcloud functions deploy.

Consulta el Instructivo de Pub/Sub para ver un ejemplo completo de cómo usar los activadores de Pub/Sub.

Activadores de Cloud Pub/Sub heredados

Con el comando de gcloud que aparece a continuación, se implementa una función que se activa mediante notificaciones heredadas de Pub/Sub en un tema específico. Estas notificaciones son compatibles con las funciones heredadas que ya consumen estos eventos. Sin embargo, recomendamos usar la marca --trigger-topic en su lugar, ya que las notificaciones heredadas podrían quitarse en el futuro.

gcloud functions deploy FUNCTION_NAME \
--trigger-resource TOPIC_NAME \
--trigger-event providers/cloud.pubsub/eventTypes/topic.publish \
FLAGS...
.

Próximos pasos

Consulta el Instructivo de Pub/Sub para ver un ejemplo sobre cómo implementar una función controlada por eventos que activa Pub/Sub.