使用多分区和流控制的订阅者(订阅者的集合)订阅多分区主题,并在回调中异步处理消息。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
Go
如需向 Pub/Sub Lite 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
package main
import (
"context"
"flag"
"fmt"
"log"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
func main() {
// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
// published messages when running this sample.
projectID := flag.String("project_id", "", "Cloud Project ID")
zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
flag.Parse()
ctx := context.Background()
subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)
// Configure flow control settings. These settings apply per partition.
// The message stream is paused based on the maximum size or number of
// messages that the subscriber has already received, whichever condition is
// met first.
settings := pscompat.ReceiveSettings{
// 10 MiB. Must be greater than the allowed size of the largest message
// (1 MiB).
MaxOutstandingBytes: 10 * 1024 * 1024,
// 1,000 outstanding messages. Must be > 0.
MaxOutstandingMessages: 1000,
}
// Create the subscriber client.
subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
if err != nil {
log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
}
// Listen for messages until the timeout expires.
log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
cctx, cancel := context.WithTimeout(ctx, *timeout)
defer cancel()
var receiveCount int32
// Receive blocks until the context is cancelled or an error occurs.
if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
// NOTE: May be called concurrently; synchronize access to shared memory.
atomic.AddInt32(&receiveCount, 1)
// Metadata decoded from the message ID contains the partition and offset.
metadata, err := pscompat.ParseMessageMetadata(msg.ID)
if err != nil {
log.Fatalf("Failed to parse %q: %v", msg.ID, err)
}
fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
msg.Ack()
}); err != nil {
log.Fatalf("SubscriberClient.Receive error: %v", err)
}
fmt.Printf("Received %d messages\n", receiveCount)
}
Java
如需向 Pub/Sub Lite 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscriberExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing subscription for the subscribe example to work.
String subscriptionId = "your-subscription-id";
long projectNumber = Long.parseLong("123456789");
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
}
public static void subscriberExample(
String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
throws ApiException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
SubscriptionPath subscriptionPath =
SubscriptionPath.newBuilder()
.setLocation(location)
.setProject(ProjectNumber.of(projectNumber))
.setName(SubscriptionName.of(subscriptionId))
.build();
// The message stream is paused based on the maximum size or number of messages that the
// subscriber has already received, whichever condition is met first.
FlowControlSettings flowControlSettings =
FlowControlSettings.builder()
// 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
.setBytesOutstanding(10 * 1024 * 1024L)
// 1,000 outstanding messages. Must be >0.
.setMessagesOutstanding(1000L)
.build();
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
System.out.println("Data : " + message.getData().toStringUtf8());
System.out.println("Ordering key : " + message.getOrderingKey());
System.out.println("Attributes : ");
message
.getAttributesMap()
.forEach(
(key, value) -> {
if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
System.out.println(key + " = " + ts.toString());
} else {
System.out.println(key + " = " + value);
}
});
// Acknowledge the message.
consumer.ack();
};
SubscriberSettings subscriberSettings =
SubscriberSettings.newBuilder()
.setSubscriptionPath(subscriptionPath)
.setReceiver(receiver)
// Flow control settings are set at the partition level.
.setPerPartitionFlowControlSettings(flowControlSettings)
.build();
Subscriber subscriber = Subscriber.create(subscriberSettings);
// Start the subscriber. Upon successful starting, its state will become RUNNING.
subscriber.startAsync().awaitRunning();
System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");
try {
System.out.println(subscriber.state());
// Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
// unrecoverable errors before then, its state will change to FAILED and an
// IllegalStateException will be thrown.
subscriber.awaitTerminated(90, TimeUnit.SECONDS);
} catch (TimeoutException t) {
// Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
subscriber.stopAsync().awaitTerminated();
System.out.println("Subscriber is shut down: " + subscriber.state());
}
}
}
Python
如需向 Pub/Sub Lite 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证。
from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
FlowControlSettings,
MessageMetadata,
SubscriptionPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
# 1,000 outstanding messages. Must be >0.
messages_outstanding=1000,
# 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
bytes_outstanding=10 * 1024 * 1024,
)
def callback(message: PubsubMessage):
message_data = message.data.decode("utf-8")
metadata = MessageMetadata.decode(message.message_id)
print(
f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
)
message.ack()
# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=per_partition_flow_control_settings,
)
print(f"Listening for messages on {str(subscription_path)}...")
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError or KeyboardInterrupt:
streaming_pull_future.cancel()
assert streaming_pull_future.done()
后续步骤
如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器。