利用 Pub/Sub 服务,应用能够可靠、快速地异步交换消息。为了实现此目标,数据的生产者将消息发布到某 Pub/Sub 主题。然后,订阅者客户端创建对该主题的订阅,并处理来自订阅的消息。对于无法可靠地传送的消息,Pub/Sub 会将其最多保留七天。本页面介绍如何开始使用客户端库发布 Pub/Sub 消息。
准备工作
-
登录您的 Google 帐号。
如果您还没有 Google 帐号,请注册一个新帐号。
- 设置 Cloud Console 项目。
-
将环境变量
GOOGLE_APPLICATION_CREDENTIALS
设置为包含您的服务帐号密钥的 JSON 文件的路径。 此变量仅适用于当前的 shell 会话,因此,如果您打开新的会话,请重新设置该变量。示例:Linux 或 macOS
将 [PATH] 替换为包含您的服务帐号密钥的 JSON 文件的路径。
export GOOGLE_APPLICATION_CREDENTIALS="[PATH]"
例如:
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"
示例:Windows
将 [PATH] 替换为包含您的服务帐号密钥的 JSON 文件的路径。
使用 PowerShell:
$env:GOOGLE_APPLICATION_CREDENTIALS="[PATH]"
例如:
$env:GOOGLE_APPLICATION_CREDENTIALS="C:\Users\username\Downloads\my-key.json"
使用命令提示符:
set GOOGLE_APPLICATION_CREDENTIALS=[PATH]
- 安装并初始化 Cloud SDK。
安装客户端库
以下示例向您展示了如何安装客户端库:
Python
如需详细了解如何设置 Python 开发环境,请参阅 Python 开发环境设置指南。
# ensure that you are using virtualenv
# as described in the python dev setup guide
pip install --upgrade google-cloud-pubsub
C++
如需详细了解如何安装 C++ 库,请参阅 GitHub README
。
C#
Install-Package Google.Cloud.PubSub.V1 -Pre
Go
go get -u cloud.google.com/go/pubsub
Java
如果您使用的是 Maven,请将以下代码添加到您的 pom.xml
文件中。如需详细了解 BOM,请参阅 Google Cloud Platform 库 BOM。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>libraries-bom</artifactId>
<version>16.3.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
</dependency>
如果您使用的是 Gradle,请将以下代码添加到您的依赖项中:
compile 'com.google.cloud:google-cloud-pubsub:1.109.0'
如果您使用的是 sbt,请将以下代码添加到您的依赖项中:
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.109.0"
如果您使用的是 IntelliJ 或 Eclipse,请通过以下 IDE 插件将客户端库添加到您的项目中:
上述插件还提供其他功能,例如服务帐号密钥管理。如需了解详情,请参阅各个插件相应的文档。
Node.js
npm install --save @google-cloud/pubsub
PHP
composer require google/cloud-pubsub
Ruby
gem install google-cloud-pubsub
创建主题和订阅
创建主题后,您可以订阅该主题或向其发布消息。
使用 gcloud pubsub topics create 命令创建主题:
gcloud pubsub topics create my-topic
使用 gcloud pubsub subscriptions create 命令创建订阅。订阅者应用只能获取创建订阅后发布到该主题的消息。
gcloud pubsub subscriptions create my-sub --topic my-topic
如需详细了解如何命名您的主题和订阅,请参阅资源名称。
发布消息
在以下示例中,请使用 my-topic
作为主题 ID。
Python
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data = "Message number {}".format(n)
# Data must be a bytestring
data = data.encode("utf-8")
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data)
print(future.result())
print(f"Published messages to {topic_path}.")
C++
#include "google/cloud/pubsub/publisher.h"
#include <iostream>
#include <stdexcept>
int main(int argc, char* argv[]) try {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " <project-id> <topic-id>\n";
return 1;
}
std::string const project_id = argv[1];
std::string const topic_id = argv[2];
// Create a namespace alias to make the code easier to read.
namespace pubsub = google::cloud::pubsub;
auto publisher = pubsub::Publisher(
pubsub::MakePublisherConnection(pubsub::Topic(project_id, topic_id), {}));
auto id =
publisher
.Publish(pubsub::MessageBuilder{}.SetData("Hello World!").Build())
.get();
if (!id) throw std::runtime_error(id.status().message());
std::cout << "Hello World published with id=" << *id << "\n";
return 0;
} catch (std::exception const& ex) {
std::cerr << "Standard exception raised: " << ex.what() << "\n";
return 1;
}
C#
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishMessagesAsyncSample
{
public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error ocurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func publish(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Get: %v", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publisherExample(projectId, topicId);
}
public static void publisherExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
String message = "Hello World!";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published message ID: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessage() {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
try {
const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
process.exitCode = 1;
}
}
publishMessage();
PHP
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message($projectId, $topicName, $message)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$topic->publish(['data' => $message]);
print('Message published' . PHP_EOL);
}
Ruby
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_name
topic.publish "This is a test message."
puts "Message published."
接收消息
设置订阅者以拉取您刚刚发布的消息。每个订阅者必须在可配置的时间范围内确认每条消息。未确认的消息将被再次传送。请注意,Pub/Sub 偶尔会将一条消息传送多次,这是为了确保所有消息至少发送到订阅者一次。
在以下示例中,请使用 my-sub
作为订阅 ID。
Python
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
C++
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber) {
std::mutex mu;
std::condition_variable cv;
int message_count = 0;
auto session = subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::unique_lock<std::mutex> lk(mu);
++message_count;
lk.unlock();
cv.notify_one();
std::move(h).ack();
});
// Wait until at least one message has been received.
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&message_count] { return message_count > 0; });
lk.unlock();
// Cancel the subscription session.
session.cancel();
// Wait for the session to complete, no more callbacks can happen after this
// point.
auto status = session.get();
// Report any final status, blocking.
std::cout << "Message count: " << message_count << ", status: " << status
<< "\n";
}
C#
using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesAsyncSample
{
public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
int messageCount = 0;
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string text = Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine($"Message {message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
import (
"context"
"fmt"
"io"
"sync"
"cloud.google.com/go/pubsub"
)
func pullMsgs(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subID)
cctx, cancel := context.WithCancel(ctx)
err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
mu.Lock()
defer mu.Unlock()
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
received++
if received == 10 {
cancel()
}
})
if err != nil {
return fmt.Errorf("Receive: %v", err)
}
return nil
}
Java
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeAsyncExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeAsyncExample(projectId, subscriptionId);
}
public static void subscribeAsyncExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForMessages() {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionName);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
listenForMessages();
PHP
use Google\Cloud\PubSub\PubSubClient;
/**
* Pulls all Pub/Sub messages for a subscription.
*
* @param string $projectId The Google project ID.
* @param string $subscriptionName The Pub/Sub subscription name.
*/
function pull_messages($projectId, $subscriptionName)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionName);
foreach ($subscription->pull() as $message) {
printf('Message: %s' . PHP_EOL, $message->data());
// Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
$subscription->acknowledge($message);
}
}
Ruby
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
结果怎么样?
清理(可选)
为避免系统因本指南中使用的资源向您的 Google Cloud Platform 帐号收取费用,您可以使用命令行删除主题和订阅。
gcloud pubsub subscriptions delete my-sub gcloud pubsub topics delete my-topic
后续步骤
请参阅“什么是 Pub/Sub?”。
如需查看端到端示例,请参阅构建 Pub/Sub 系统。
如需查看其他用例,请参阅 Pub/Sub 教程。
要获取有关所选语言的客户端库的详细信息,请参阅客户端库。