Cloud Functions에서 Pub/Sub와 상호작용하는 방법을 보여줍니다.
코드 샘플
Go
Cloud Functions에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
// Package contexttip is an example of how to use Pub/Sub and context.Context in
// a Cloud Function.
package contexttip
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"sync"
"cloud.google.com/go/pubsub"
"github.com/GoogleCloudPlatform/functions-framework-go/functions"
)
// client is a global Pub/Sub client, initialized once per instance.
var client *pubsub.Client
var once sync.Once
// createClient creates the global pubsub Client
func createClient() {
// GOOGLE_CLOUD_PROJECT is a user-set environment variable.
var projectID = os.Getenv("GOOGLE_CLOUD_PROJECT")
// err is pre-declared to avoid shadowing client.
var err error
// client is initialized with context.Background() because it should
// persist between function invocations.
client, err = pubsub.NewClient(context.Background(), projectID)
if err != nil {
log.Fatalf("pubsub.NewClient: %v", err)
}
}
func init() {
// register http function
functions.HTTP("PublishMessage", PublishMessage)
}
type publishRequest struct {
Topic string `json:"topic"`
Message string `json:"message"`
}
// PublishMessage publishes a message to Pub/Sub. PublishMessage only works
// with topics that already exist.
func PublishMessage(w http.ResponseWriter, r *http.Request) {
// use of sync.Once ensures client is only created once.
once.Do(createClient)
// Parse the request body to get the topic name and message.
p := publishRequest{}
if err := json.NewDecoder(r.Body).Decode(&p); err != nil {
log.Printf("json.NewDecoder: %v", err)
http.Error(w, "Error parsing request", http.StatusBadRequest)
return
}
if p.Topic == "" || p.Message == "" {
s := "missing 'topic' or 'message' parameter"
log.Println(s)
http.Error(w, s, http.StatusBadRequest)
return
}
m := &pubsub.Message{
Data: []byte(p.Message),
}
// Publish and Get use r.Context() because they are only needed for this
// function invocation. If this were a background function, they would use
// the ctx passed as an argument.
id, err := client.Topic(p.Topic).Publish(r.Context(), m).Get(r.Context())
if err != nil {
log.Printf("topic(%s).Publish.Get: %v", p.Topic, err)
http.Error(w, "Error publishing message", http.StatusInternalServerError)
return
}
fmt.Fprintf(w, "Message published: %v", id)
}
Java
Cloud Functions에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
import com.google.cloud.functions.HttpFunction;
import com.google.cloud.functions.HttpRequest;
import com.google.cloud.functions.HttpResponse;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import java.io.BufferedWriter;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class PublishMessage implements HttpFunction {
// TODO<developer> set this environment variable
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
private static final Logger logger = Logger.getLogger(PublishMessage.class.getName());
@Override
public void service(HttpRequest request, HttpResponse response) throws IOException {
Optional<String> maybeTopicName = request.getFirstQueryParameter("topic");
Optional<String> maybeMessage = request.getFirstQueryParameter("message");
BufferedWriter responseWriter = response.getWriter();
if (maybeTopicName.isEmpty() || maybeMessage.isEmpty()) {
response.setStatusCode(HttpURLConnection.HTTP_BAD_REQUEST);
responseWriter.write("Missing 'topic' and/or 'message' parameter(s).");
return;
}
String topicName = maybeTopicName.get();
logger.info("Publishing message to topic: " + topicName);
// Create the PubsubMessage object
// (This is different than the PubsubMessage POJO used in Pub/Sub-triggered functions)
ByteString byteStr = ByteString.copyFrom(maybeMessage.get(), StandardCharsets.UTF_8);
PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
Publisher publisher = Publisher.newBuilder(
ProjectTopicName.of(PROJECT_ID, topicName)).build();
// Attempt to publish the message
String responseMessage;
try {
publisher.publish(pubsubApiMessage).get();
responseMessage = "Message published.";
} catch (InterruptedException | ExecutionException e) {
logger.log(Level.SEVERE, "Error publishing Pub/Sub message: " + e.getMessage(), e);
responseMessage = "Error publishing Pub/Sub message; see logs for more info.";
}
responseWriter.write(responseMessage);
}
}
Node.js
Cloud Functions에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
const {PubSub} = require('@google-cloud/pubsub');
// Instantiates a client
const pubsub = new PubSub();
/**
* Publishes a message to a Cloud Pub/Sub Topic.
*
* @example
* gcloud functions call publish --data '{"topic":"[YOUR_TOPIC_NAME]","message":"Hello, world!"}'
*
* - Replace `[YOUR_TOPIC_NAME]` with your Cloud Pub/Sub topic name.
*
* @param {object} req Cloud Function request context.
* @param {object} req.body The request body.
* @param {string} req.body.topic Topic name on which to publish.
* @param {string} req.body.message Message to publish.
* @param {object} res Cloud Function response context.
*/
exports.publish = async (req, res) => {
if (!req.body.topic || !req.body.message) {
res
.status(400)
.send(
'Missing parameter(s); include "topic" and "message" properties in your request.'
);
return;
}
console.log(`Publishing message to topic ${req.body.topic}`);
// References an existing topic
const topic = pubsub.topic(req.body.topic);
const messageObject = {
data: {
message: req.body.message,
},
};
const messageBuffer = Buffer.from(JSON.stringify(messageObject), 'utf8');
// Publishes a message
try {
topic.publishMessage(messageBuffer);
res.status(200).send('Message published.');
} catch (err) {
console.error(err);
res.status(500).send(err);
return Promise.reject(err);
}
};
Python
Cloud Functions에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
import base64
import json
import os
from google.cloud import pubsub_v1
# TODO(developer): set this environment variable
PROJECT_ID = os.getenv("GOOGLE_CLOUD_PROJECT")
# Instantiates a Pub/Sub client
publisher = pubsub_v1.PublisherClient()
# Publishes a message to a Cloud Pub/Sub topic.
def publish(request):
request_json = request.get_json(silent=True)
topic_name = request_json.get("topic")
message = request_json.get("message")
if not topic_name or not message:
return ('Missing "topic" and/or "message" parameter.', 400)
print(f"Publishing message to topic {topic_name}")
# References an existing topic
topic_path = publisher.topic_path(PROJECT_ID, topic_name)
message_json = json.dumps(
{
"data": {"message": message},
}
)
message_bytes = message_json.encode("utf-8")
# Publishes a message
try:
publish_future = publisher.publish(topic_path, data=message_bytes)
publish_future.result() # Verify the publish succeeded
return "Message published."
except Exception as e:
print(e)
return (e, 500)
다음 단계
다른 Google Cloud 제품의 코드 샘플을 검색하고 필터링하려면 Google Cloud 샘플 브라우저를 참조하세요.