本页面介绍了如何向精简版主题发布消息。您可以使用 Java 版 Pub/Sub Lite 客户端库发布消息。
在发布精简版主题的消息并创建精简版订阅后,您可以从精简版订阅中接收消息。
消息格式
消息由包含消息数据和元数据的字段组成。 在消息中指定以下任一项:
客户端库自动将消息分配给分区,Pub/Sub Lite 服务会向消息中添加以下字段:
- 分区中唯一的消息 ID
- Pub/Sub Lite 服务将消息存储在分区中的时间的时间戳
发布消息
要发布消息,请请求串流连接到精简版主题,然后通过串流连接发送消息。
以下示例显示了如何将消息发布到精简版主题:
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--message=MESSAGE_DATA
替换以下内容:
- TOPIC_ID:精简版主题的 ID
- LITE_LOCATION:精简版主题的位置
- MESSAGE_DATA:包含消息数据的字符串
Go
运行此示例之前,请按照 Pub/Sub 精简版客户端库中的 Go 设置说明进行操作。
package main
import (
"context"
"flag"
"fmt"
"log"
"sync"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
"golang.org/x/sync/errgroup"
)
func main() {
// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
// sample.
projectID := flag.String("project_id", "", "Cloud Project ID")
zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
messageCount := flag.Int("message_count", 100, "The number of messages to send")
flag.Parse()
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)
// Create the publisher client.
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
log.Fatalf("pscompat.NewPublisherClient error: %v", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Collect any messages that need to be republished with a new publisher
// client.
var toRepublish []*pubsub.Message
var toRepublishMu sync.Mutex
// Publish messages. Messages are automatically batched.
g := new(errgroup.Group)
for i := 0; i < *messageCount; i++ {
msg := &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
}
result := publisher.Publish(ctx, msg)
g.Go(func() error {
// Get blocks until the result is ready.
id, err := result.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client
// encountered a fatal error and has permanently terminated. After the
// fatal error has been resolved, a new publisher client instance must
// be created to republish failed messages.
fmt.Printf("Publish error: %v\n", err)
toRepublishMu.Lock()
toRepublish = append(toRepublish, msg)
toRepublishMu.Unlock()
return err
}
// Metadata decoded from the id contains the partition and offset.
metadata, err := pscompat.ParseMessageMetadata(id)
if err != nil {
fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
return err
}
fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Publishing finished with error: %v\n", err)
}
fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))
// Print the error that caused the publisher client to terminate (if any),
// which may contain more context than PublishResults.
if err := publisher.Error(); err != nil {
fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
}
}
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class PublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
}
// Publish messages to a topic.
public static void publisherExample(
String cloudRegion,
char zoneId,
long projectNumber,
String topicId,
int messageCount,
boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(MessageMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
System.out.println("Publisher is shut down.");
}
}
}
}
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
客户端库会异步发送消息并处理错误。如果发生错误,则客户端库会再次发送消息。
- Pub/Sub Lite 服务会关闭信息流。
- 客户端库会缓冲消息并重新建立到精简版主题的连接。
- 客户端库按顺序发送消息。
发布消息后,Pub/Sub Lite 服务会将消息存储在分区中,并将消息 ID 返回给发布者。
使用排序键
如果消息具有相同的排序键,则客户端库会将消息分配给同一分区。排序键必须最多为 1024 个字节的字符串。
排序键位于消息的 key
字段中。您可以使用客户端库设置排序键。
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--ordering-key=ORDERING_KEY \
--message=MESSAGE_DATA
替换以下内容:
- TOPIC_ID:精简版主题的 ID
- LITE_LOCATION:精简版主题的位置
- ORDERING_KEY:用于向分区分配消息的字符串
- MESSAGE_DATA:包含消息数据的字符串
Go
运行此示例之前,请按照 Pub/Sub 精简版客户端库中的 Go 设置说明进行操作。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
func publishWithOrderingKey(w io.Writer, projectID, zone, topicID string, messageCount int) error {
// projectID := "my-project-id"
// zone := "us-central1-a"
// topicID := "my-topic"
// messageCount := 10
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
// Create the publisher client.
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Messages of the same ordering key will always get published to the same
// partition. When OrderingKey is unset, messages can get published to
// different partitions if more than one partition exists for the topic.
var results []*pubsub.PublishResult
for i := 0; i < messageCount; i++ {
r := publisher.Publish(ctx, &pubsub.Message{
OrderingKey: "test_ordering_key",
Data: []byte(fmt.Sprintf("message-%d", i)),
})
results = append(results, r)
}
// Print publish results.
var publishedCount int
for _, r := range results {
// Get blocks until the result is ready.
id, err := r.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client
// encountered a fatal error and has permanently terminated. After the
// fatal error has been resolved, a new publisher client instance must be
// created to republish failed messages.
fmt.Fprintf(w, "Publish error: %v\n", err)
continue
}
// Metadata decoded from the id contains the partition and offset.
metadata, err := pscompat.ParseMessageMetadata(id)
if err != nil {
return fmt.Errorf("failed to parse message metadata %q: %w", id, err)
}
fmt.Fprintf(w, "Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
publishedCount++
}
fmt.Fprintf(w, "Published %d messages with ordering key\n", publishedCount)
return publisher.Error()
}
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
public class PublishWithOrderingKeyExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId, regional);
}
// Publish a message to a topic with an ordering key.
public static void publishWithOrderingKeyExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
Publisher publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
String message = "message-with-ordering-key";
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
// Messages of the same ordering key will always get published to the
// same partition. When OrderingKey is unset, messages can get published
// to different partitions if more than one partition exists for the topic.
.setOrderingKey("testing")
.build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
String ackId = future.get();
MessageMetadata metadata = MessageMetadata.decode(ackId);
System.out.println("Published a message with ordering key:\n" + metadata);
}
}
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
for message in range(num_messages):
data = f"{message}"
# Messages of the same ordering key will always get published to the same partition.
# When ordering_key is unset, messsages can get published ot different partitions if
# more than one partition exists for the topic.
api_future = publisher_client.publish(
topic_path, data.encode("utf-8"), ordering_key="testing"
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
print(
f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)
您可以使用排序键将多条消息发送到同一分区,以便订阅者按顺序接收消息。客户端库可能会为同一分区分配多个排序键。
设置事件时间
您可以使用事件时间来发布精简版消息。“事件时间”是可添加到消息中的自定义属性。
您可以使用客户端库或 gcloud CLI 设置事件时间戳。
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--event-time=EVENT_TIME \
--message=MESSAGE_DATA
替换以下内容:
TOPIC_ID:精简版主题的 ID
LITE_LOCATION:精简版主题的位置
EVENT_TIME:用户指定的事件时间。如需详细了解时间格式,请运行
gcloud topic datetimes
。MESSAGE_DATA:包含消息数据的字符串
使用属性
消息属性是包含消息相关元数据的键值对。属性可以是文本或字节字符串。
这些属性位于消息的 attributes
字段中。您可以使用客户端库设置属性。
gcloud
此命令需要使用 Python 3.6 或更高版本,并且需要安装 grpcio Python 软件包。对于 MacOS、Linux 和 Cloud Shell 用户,请运行以下命令:
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
如需发布消息,请使用 gcloud pubsub lite-topics publish 命令:
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--message=MESSAGE_DATA \
--attribute=KEY=VALUE,...
替换以下内容:
Go
运行此示例之前,请按照 Pub/Sub 精简版客户端库中的 Go 设置说明进行操作。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
func publishWithCustomAttributes(w io.Writer, projectID, zone, topicID string) error {
// projectID := "my-project-id"
// zone := "us-central1-a"
// topicID := "my-topic"
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
// Create the publisher client.
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Publish a message with custom attributes.
result := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("message-with-custom-attributes"),
Attributes: map[string]string{
"year": "2020",
"author": "unknown",
},
})
// Get blocks until the result is ready.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("publish error: %w", err)
}
fmt.Fprintf(w, "Published a message with custom attributes: %v\n", id)
return publisher.Error()
}
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
public class PublishWithCustomAttributesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId, regional);
}
// Publish messages to a topic with custom attributes.
public static void publishWithCustomAttributesExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
Publisher publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
// Prepare the message data as a byte string.
String messageData = "message-with-custom-attributes";
ByteString data = ByteString.copyFromUtf8(messageData);
// Prepare a protobuf-encoded event timestamp for the message.
Instant now = Instant.now();
String eventTime =
MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
// Add two sets of custom attributes to the message.
.putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
// Add an event timestamp as an attribute.
.putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, eventTime)
.build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
String ackId = future.get();
MessageMetadata metadata = MessageMetadata.decode(ackId);
System.out.println("Published a message with custom attributes:\n" + metadata);
}
}
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(
topic_path,
data.encode("utf-8"),
year="2020",
author="unknown",
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")
属性可以指示如何处理消息。订阅者可以解析消息的 attributes
字段,并根据其属性处理消息。
批处理消息
客户端库批量发布消息。较大的批次会占用较少的计算资源,但会增加延迟时间。您可以通过批处理设置更改批次大小。
下表列出了您可以配置的批处理设置:
设置 | 说明 | 默认 |
---|---|---|
请求大小 | 批次的大小上限(以字节为单位)。 | 3.5 MiB |
消息数量 | 一批次的最大消息数。 | 1000 封信息 |
发布延迟 | 向一批次添加消息与向精简版主题发送一批次消息之间的时间量(以毫秒为单位)。 | 50 毫秒 |
您可以使用客户端库配置批处理设置。
Go
运行此示例之前,请按照 Pub/Sub 精简版客户端库中的 Go 设置说明进行操作。
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
func publishWithBatchSettings(w io.Writer, projectID, zone, topicID string, messageCount int) error {
// projectID := "my-project-id"
// zone := "us-central1-a"
// topicID := "my-topic"
// messageCount := 10
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
// Batch settings control how the publisher batches messages. These settings
// apply per partition.
// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables
// for DefaultPublishSettings.
settings := pscompat.PublishSettings{
ByteThreshold: 5 * 1024, // 5 KiB
CountThreshold: 1000, // 1,000 messages
DelayThreshold: 100 * time.Millisecond,
}
// Create the publisher client.
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topicPath, settings)
if err != nil {
return fmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Publish requests are sent to the server based on request size, message
// count and time since last publish, whichever condition is met first.
var results []*pubsub.PublishResult
for i := 0; i < messageCount; i++ {
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
})
results = append(results, r)
}
// Print publish results.
var publishedCount int
for _, r := range results {
// Get blocks until the result is ready.
id, err := r.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client
// encountered a fatal error and has permanently terminated. After the
// fatal error has been resolved, a new publisher client instance must be
// created to republish failed messages.
fmt.Fprintf(w, "Publish error: %v\n", err)
continue
}
fmt.Fprintf(w, "Published: %v\n", id)
publishedCount++
}
fmt.Fprintf(w, "Published %d messages with batch settings\n", publishedCount)
return publisher.Error()
}
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publishWithBatchSettingsExample(
cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
}
// Publish messages to a topic with batch settings.
public static void publishWithBatchSettingsExample(
String cloudRegion,
char zoneId,
long projectNumber,
String topicId,
int messageCount,
boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
long messageCountBatchSize = 100L; // default : 1000L message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setRequestByteThreshold(requestBytesThreshold)
.setElementCountThreshold(messageCountBatchSize)
.setDelayThreshold(publishDelayThreshold)
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder()
.setTopicPath(topicPath)
.setBatchingSettings(batchingSettings)
.build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
System.out.println("Published " + ackIds.size() + " messages with batch settings.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
}
}
}
}
Python
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
# 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
max_bytes=2 * 1024 * 1024,
# 100 ms. Default to 50 ms.
max_latency=0.1,
# Default to 1000.
max_messages=100,
)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
per_partition_batching_settings=batch_setttings
) as publisher_client:
for message in range(num_messages):
data = f"{message}"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
print(
f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)
发布者应用启动时,客户端库会为精简版主题中的每个分区创建一个批次。例如,如果精简版主题有两个分区,则发布者会创建两个批次并将每个批次发送到一个分区。
发布消息后,客户端库会对其进行缓冲,直到批量超出请求大小上限、消息数上限或发布延迟时间。
为消息排序
精简版主题会按照您发布消息的时间顺序对每个分区中的消息进行排序。要向同一分区分配消息,请使用排序键。
Pub/Sub Lite 按顺序传送来自分区的消息,订阅者可以按顺序处理消息。如需了解详情,请参阅接收消息。
发布幂等性
以下版本的 Pub/Sub 精简版客户端库支持幂等发布:
- java-pubsublite:版本 1.10.0。
- python-pubsublite:版本 1.8.0。
- google-cloud-go:pubsublite 版本 1.7.0。
如果由于网络或服务器错误而重新尝试发布消息,该消息只会存储一次。只有同一会话内才能保证幂等性;如果使用新的发布者客户端重新发布同一消息,则无法保证幂等性。它不会产生任何额外的服务费用,也不会增加发布延迟时间。
启用或停用幂等发布
Pub/Sub Lite 客户端库中默认启用幂等发布功能。您可以通过相应客户端库中的发布商客户端设置将其停用。
如果启用了幂等发布,发布结果中返回的偏移量可能是 -1
。如果消息被识别为与已成功发布的消息重复,但服务器没有足够的信息在发布时返回消息的偏移量,则返回此值。订阅者收到的消息始终具有有效的偏移量。
问题排查
收到重复内容
由于幂等性仅限于单个会话,如果您重新创建发布者客户端以发布相同的消息,可能会收到重复项。
如果 Pub/Sub Lite 服务自动将分区分配给订阅者(默认设置),则订阅者客户端可能会多次收到同一消息。当发生重新分配时,消息可能会重新传送给另一个订阅者客户端。
发布商错误
发布者会话的状态会在处于非活跃状态 7 天后在服务器中进行垃圾回收。如果会话在此时间段后恢复,发布者客户端将终止,并显示类似于“失败前提条件:预期消息的发布序列号为...”的错误消息,并且不接受新消息。重新创建发布商客户端以解决此错误。