发布带有错误处理程序的消息。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}.SetData("Hello World!").Build());
auto done = message_id.then([](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Hello World! published with id=" << *id << "\n";
});
// Block until the message is published
done.get();
}
C#
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C# 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C# API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishMessagesAsyncSample
{
public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
)
func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topicID)
for i := 0; i < n; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
wg.Add(1)
go func(i int, res *pubsub.PublishResult) {
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
id, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
atomic.AddUint64(&totalErrors, 1)
return
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}(i, result)
}
wg.Wait()
if totalErrors > 0 {
return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
}
return nil
}
Java
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PublishWithErrorHandlerExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithErrorHandlerExample(projectId, topicId);
}
public static void publishWithErrorHandlerExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
List<String> messages = Arrays.asList("first message", "second message");
for (final String message : messages) {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle success / failure
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + message);
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println("Published message ID: " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
try {
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
process.exitCode = 1;
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessage(topicNameOrId: string, data: string) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
try {
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(
`Received error while publishing: ${(error as Error).message}`
);
process.exitCode = 1;
}
}
Python
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
for i in range(10):
data = str(i)
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data.encode("utf-8"))
# Non-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
Ruby
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
# topic_id = "your-topic-id"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
begin
topic.publish_async "This is a test message." do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
rescue StandardError => e
puts "Received error while publishing: #{e.message}"
end
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。