在订阅抛出错误时处理错误。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
C++
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。如需了解详情,请参阅 Pub/Sub C++ API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber
.Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
})
// Setup an error handler for the subscription session
.then([](future<google::cloud::Status> f) {
std::cout << "Subscription session result: " << f.get() << "\n";
});
};
Go
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Go API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func pullMsgsError(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
})
if err != nil {
return fmt.Errorf("Receive: %w", err)
}
return nil
}
Java
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Java API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithErrorListenerExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithErrorListenerExample(projectId, subscriptionId);
}
public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
// Provides an executor service for processing messages.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setExecutorProvider(executorProvider)
.build();
// Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
// when the current subscriber encounters permanent errors.
subscriber.addListener(
new Subscriber.Listener() {
public void failed(Subscriber.State from, Throwable failure) {
System.out.println(failure.getStackTrace());
if (!executorProvider.getExecutor().isShutdown()) {
subscribeWithErrorListenerExample(projectId, subscriptionId);
}
}
},
MoreExecutors.directExecutor());
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForErrors(subscriptionNameOrId, timeout) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
const messageHandler = message => {
// Do something with the message
console.log(`Message: ${message}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Create an event handler to handle errors
const errorHandler = error => {
// Do something with the error
console.error(`ERROR: ${error}`);
throw error;
};
// Listen for new messages/errors until timeout is hit
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;
// Imports the Google Cloud client library
import {Message, PubSub} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForErrors(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
const messageHandler = (message: Message) => {
// Do something with the message
console.log(`Message: ${message}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Create an event handler to handle errors
const errorHandler = (error: Error) => {
// Do something with the error
console.error(`ERROR: ${error}`);
throw error;
};
// Listen for new messages/errors until timeout is hit
subscription.on('message', messageHandler);
subscription.on('error', errorHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
subscription.removeListener('error', errorHandler);
}, timeout * 1000);
}
Python
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Python API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except Exception as e:
print(
f"Listening for messages on {subscription_path} threw an exception: {e}."
)
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
试用此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Ruby 设置说明进行操作。如需了解详情,请参阅 Pub/Sub Ruby API 参考文档。
要向 Pub/Sub 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅为本地开发环境设置身份验证。
# subscription_id = "your-subscription-id"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true
begin
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
rescue StandardError => e
puts "Exception #{e.inspect}: #{e.message}"
raise "Stopped listening for messages."
end
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。