Mensagem de publicação do Pub/Sub

Demonstra como interagir com o Pub/Sub a partir do Cloud Functions.

Páginas de documentação que incluem esta amostra de código-

Para visualizar o exemplo de código usado em contexto, consulte a seguinte documentação:

Amostra de código

C++

#include <google/cloud/functions/http_request.h>
#include <google/cloud/functions/http_response.h>
#include <google/cloud/pubsub/publisher.h>
#include <nlohmann/json.hpp>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>

namespace gcf = ::google::cloud::functions;
namespace pubsub = ::google::cloud::pubsub;

namespace {
pubsub::Publisher GetPublisher(pubsub::Topic topic) {
  using Map = std::unordered_map<std::string,
                                 std::shared_ptr<pubsub::PublisherConnection>>;

  static std::mutex mu;
  static Map connections;

  std::lock_guard<std::mutex> lk(mu);
  auto [pos, inserted] = connections.emplace(
      topic.FullName(), std::shared_ptr<pubsub::PublisherConnection>());
  if (inserted) {
    pos->second = pubsub::MakePublisherConnection(std::move(topic), {});
  }
  return pubsub::Publisher(pos->second);
}
}  // namespace

gcf::HttpResponse tips_gcp_apis(gcf::HttpRequest request) {  // NOLINT
  auto const* project = std::getenv("GCP_PROJECT");
  if (project == nullptr) throw std::runtime_error("GCP_PROJECT is not set");

  auto const body = nlohmann::json::parse(request.payload());
  auto const topic_id = body.value("topic", "");
  if (topic_id.empty()) throw std::runtime_error("Missing topic in request");

  auto publisher = GetPublisher(pubsub::Topic(project, topic_id));
  auto id = publisher.Publish(
      pubsub::MessageBuilder().SetData("Test message").Build());

  gcf::HttpResponse response;
  if (!id.get()) {
    response.set_result(gcf::HttpResponse::kInternalServerError);
  } else {
    response.set_payload("1 message published");
    response.set_header("content-type", "text/plain");
  }
  return response;
}

Go


// Package contexttip is an example of how to use Pub/Sub and context.Context in
// a Cloud Function.
package contexttip

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"

	"cloud.google.com/go/pubsub"
)

// GOOGLE_CLOUD_PROJECT is a user-set environment variable.
var projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")

// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client

func init() {
	// err is pre-declared to avoid shadowing client.
	var err error

	// client is initialized with context.Background() because it should
	// persist between function invocations.
	client, err = pubsub.NewClient(context.Background(), projectID)
	if err != nil {
		log.Fatalf("pubsub.NewClient: %v", err)
	}
}

type publishRequest struct {
	Topic   string `json:"topic"`
	Message string `json:"message"`
}

// PublishMessage publishes a message to Pub/Sub. PublishMessage only works
// with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request) {
	// Parse the request body to get the topic name and message.
	p := publishRequest{}

	if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
		log.Printf("json.NewDecoder: %v", err)
		http.Error(w, "Error parsing request", http.StatusBadRequest)
		return
	}

	if p.Topic == "" || p.Message == "" {
		s := "missing 'topic' or 'message' parameter"
		log.Println(s)
		http.Error(w, s, http.StatusBadRequest)
		return
	}

	m := &pubsub.Message{
		Data: []byte(p.Message),
	}
	// Publish and Get use r.Context() because they are only needed for this
	// function invocation. If this were a background function, they would use
	// the ctx passed as an argument.
	id, err := client.Topic(p.Topic).Publish(r.Context(), m).Get(r.Context())
	if err != nil {
		log.Printf("topic(%s).Publish.Get: %v", p.Topic, err)
		http.Error(w, "Error publishing message", http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "Message published: %v", id)
}

Java

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class PublishMessage implements HttpFunction {
  // TODO<developer> set this environment variable
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");

  private static final Logger logger = Logger.getLogger(PublishMessage.class.getName());

  @Override
  public void service(HttpRequest request, HttpResponse response) throws IOException {
    Optional<String> maybeTopicName = request.getFirstQueryParameter("topic");
    Optional<String> maybeMessage = request.getFirstQueryParameter("message");

    BufferedWriter responseWriter = response.getWriter();

    if (maybeTopicName.isEmpty() || maybeMessage.isEmpty()) {
      response.setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST);

      responseWriter.write("Missing 'topic' and/or 'message' parameter(s).");
      return;
    }

    String topicName = maybeTopicName.get();
    logger.info("Publishing message to topic: " + topicName);

    // Create the PubsubMessage object
    ByteString byteStr = ByteString.copyFrom(maybeMessage.get(), StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

    Publisher publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, topicName)).build();

    // Attempt to publish the message
    String responseMessage;
    try {
      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
    } catch (InterruptedException | ExecutionException e) {
      logger.log(Level.SEVERE, "Error publishing Pub/Sub message: " + e.getMessage(), e);
      responseMessage = "Error publishing Pub/Sub message; see logs for more info.";
    }

    responseWriter.write(responseMessage);
  }
}

Node.js

const {PubSub} = require('@google-cloud/pubsub');

// Instantiates a client
const pubsub = new PubSub();

/**
 * Publishes a message to a Cloud Pub/Sub Topic.
 *
 * @example
 * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
 *
 *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
 *
 * @param {object} req Cloud Function request context.
 * @param {object} req.body The request body.
 * @param {string} req.body.topic Topic name on which to publish.
 * @param {string} req.body.message Message to publish.
 * @param {object} res Cloud Function response context.
 */
exports.publish = async (req, res) => {
  if (!req.body.topic || !req.body.message) {
    res
      .status(400)
      .send(
        'Missing parameter(s); include "topic" and "message" properties in your request.'
      );
    return;
  }

  console.log(`Publishing message to topic ${req.body.topic}`);

  // References an existing topic
  const topic = pubsub.topic(req.body.topic);

  const messageObject = {
    data: {
      message: req.body.message,
    },
  };
  const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

  // Publishes a message
  try {
    await topic.publish(messageBuffer);
    res.status(200).send('Message published.');
  } catch (err) {
    console.error(err);
    res.status(500).send(err);
    return Promise.reject(err);
  }
};

Python

import base64
import json
import os

from google.cloud import pubsub_v1

# Instantiates a Pub/Sub client
publisher = pubsub_v1.PublisherClient()
PROJECT_ID = os.getenv('GOOGLE_CLOUD_PROJECT')

# Publishes a message to a Cloud Pub/Sub topic.
def publish(request):
    request_json = request.get_json(silent=True)

    topic_name = request_json.get("topic")
    message = request_json.get("message")

    if not topic_name or not message:
        return ('Missing "topic" and/or "message" parameter.', 400)

    print(f'Publishing message to topic {topic_name}')

    # References an existing topic
    topic_path = publisher.topic_path(PROJECT_ID, topic_name)

    message_json = json.dumps({
        'data': {'message': message},
    })
    message_bytes = message_json.encode('utf-8')

    # Publishes a message
    try:
        publish_future = publisher.publish(topic_path, data=message_bytes)
        publish_future.result()  # Verify the publish succeeded
        return 'Message published.'
    except Exception as e:
        print(e)
        return (e, 500)

A seguir

Para pesquisar e filtrar exemplos de código de outros produtos do Google Cloud, consulte o navegador de exemplos do Google Cloud.