Pesan publikasi Pub/Sub

Menunjukkan cara berinteraksi dengan Pub/Sub dari Cloud Functions.

Contoh kode

Go

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


// Package contexttip is an example of how to use Pub/Sub and context.Context in
// a Cloud Function.
package contexttip

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"sync"

	"cloud.google.com/go/pubsub"
	"github.com/GoogleCloudPlatform/functions-framework-go/functions"
)

// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client
var once sync.Once

// createClient creates the global pubsub Client
func createClient() {
	// GOOGLE_CLOUD_PROJECT is a user-set environment variable.
	var projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")
	// err is pre-declared to avoid shadowing client.
	var err error

	// client is initialized with context.Background() because it should
	// persist between function invocations.
	client, err = pubsub.NewClient(context.Background(), projectID)
	if err != nil {
		log.Fatalf("pubsub.NewClient: %v", err)
	}
}

func init() {
	// register http function
	functions.HTTP("PublishMessage", PublishMessage)
}

type publishRequest struct {
	Topic   string `json:"topic"`
	Message string `json:"message"`
}

// PublishMessage publishes a message to Pub/Sub. PublishMessage only works
// with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request) {
	// use of sync.Once ensures client is only created once.
	once.Do(createClient)
	// Parse the request body to get the topic name and message.
	p := publishRequest{}

	if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
		log.Printf("json.NewDecoder: %v", err)
		http.Error(w, "Error parsing request", http.StatusBadRequest)
		return
	}

	if p.Topic == "" || p.Message == "" {
		s := "missing 'topic' or 'message' parameter"
		log.Println(s)
		http.Error(w, s, http.StatusBadRequest)
		return
	}

	m := &pubsub.Message{
		Data: []byte(p.Message),
	}
	// Publish and Get use r.Context() because they are only needed for this
	// function invocation. If this were a background function, they would use
	// the ctx passed as an argument.
	id, err := client.Topic(p.Topic).Publish(r.Context(), m).Get(r.Context())
	if err != nil {
		log.Printf("topic(%s).Publish.Get: %v", p.Topic, err)
		http.Error(w, "Error publishing message", http.StatusInternalServerError)
		return
	}
	fmt.Fprintf(w, "Message published: %v", id)
}

Java

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;

public class PublishMessage implements HttpFunction {
  // TODO<developer> set this environment variable
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");

  private static final Logger logger = Logger.getLogger(PublishMessage.class.getName());

  @Override
  public void service(HttpRequest request, HttpResponse response) throws IOException {
    Optional<String> maybeTopicName = request.getFirstQueryParameter("topic");
    Optional<String> maybeMessage = request.getFirstQueryParameter("message");

    BufferedWriter responseWriter = response.getWriter();

    if (maybeTopicName.isEmpty() || maybeMessage.isEmpty()) {
      response.setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST);

      responseWriter.write("Missing 'topic' and/or 'message' parameter(s).");
      return;
    }

    String topicName = maybeTopicName.get();
    logger.info("Publishing message to topic: " + topicName);

    // Create the PubsubMessage object
    // (This is different than the PubsubMessage POJO used in Pub/Sub-triggered functions)
    ByteString byteStr = ByteString.copyFrom(maybeMessage.get(), StandardCharsets.UTF_8);
    PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();

    Publisher publisher = Publisher.newBuilder(
        ProjectTopicName.of(PROJECT_ID, topicName)).build();

    // Attempt to publish the message
    String responseMessage;
    try {
      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
    } catch (InterruptedException | ExecutionException e) {
      logger.log(Level.SEVERE, "Error publishing Pub/Sub message: " + e.getMessage(), e);
      responseMessage = "Error publishing Pub/Sub message; see logs for more info.";
    }

    responseWriter.write(responseMessage);
  }
}

Node.js

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

const {PubSub} = require('@google-cloud/pubsub');

// Instantiates a client
const pubsub = new PubSub();

/**
 * Publishes a message to a Cloud Pub/Sub Topic.
 *
 * @example
 * gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
 *
 *   - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
 *
 * @param {object} req Cloud Function request context.
 * @param {object} req.body The request body.
 * @param {string} req.body.topic Topic name on which to publish.
 * @param {string} req.body.message Message to publish.
 * @param {object} res Cloud Function response context.
 */
exports.publish = async (req, res) => {
  if (!req.body.topic || !req.body.message) {
    res
      .status(400)
      .send(
        'Missing parameter(s); include "topic" and "message" properties in your request.'
      );
    return;
  }

  console.log(`Publishing message to topic ${req.body.topic}`);

  // References an existing topic
  const topic = pubsub.topic(req.body.topic);

  const messageObject = {
    data: {
      message: req.body.message,
    },
  };
  const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');

  // Publishes a message
  try {
    topic.publishMessage(messageBuffer);
    res.status(200).send('Message published.');
  } catch (err) {
    console.error(err);
    res.status(500).send(err);
    return Promise.reject(err);
  }
};

Python

Untuk melakukan autentikasi ke Cloud Functions, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import base64
import json
import os

from google.cloud import pubsub_v1

# TODO(developer): set this environment variable
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")

# Instantiates a Pub/Sub client
publisher = pubsub_v1.PublisherClient()

# Publishes a message to a Cloud Pub/Sub topic.
def publish(request):
    request_json = request.get_json(silent=True)

    topic_name = request_json.get("topic")
    message = request_json.get("message")

    if not topic_name or not message:
        return ('Missing "topic" and/or "message" parameter.', 400)

    print(f"Publishing message to topic {topic_name}")

    # References an existing topic
    topic_path = publisher.topic_path(PROJECT_ID, topic_name)

    message_json = json.dumps(
        {
            "data": {"message": message},
        }
    )
    message_bytes = message_json.encode("utf-8")

    # Publishes a message
    try:
        publish_future = publisher.publish(topic_path, data=message_bytes)
        publish_future.result()  # Verify the publish succeeded
        return "Message published."
    except Exception as e:
        print(e)
        return (e, 500)

Langkah selanjutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contoh Google Cloud.