このページでは、Lite トピックにメッセージをパブリッシュする方法について説明します。メッセージは、Java 用の Pub/Sub Lite クライアント ライブラリを使用してパブリッシュできます。
Lite トピックにメッセージをパブリッシュして、Lite サブスクリプションを作成すると、Lite サブスクリプションからメッセージを受信できます。
メッセージの形式
メッセージは、メッセージ データとメタデータを含むフィールドで構成されます。 メッセージに指定できる項目は、次のとおりです。
- メッセージ データ
- 順序指定キー
- イベントのタイムスタンプ
- 追加のメタデータが含まれる属性
クライアント ライブラリによってメッセージがパーティションに自動的に割り当てられ、Pub/Sub Lite サービスによってメッセージに次のフィールドが追加されます。
- パーティション内で一意のメッセージ ID
- Pub/Sub Lite サービスでパーティションにメッセージが格納されるタイムスタンプ。
メッセージの公開
メッセージをパブリッシュするには、Lite トピックへのストリーミング接続をリクエストし、ストリーミング接続を介してメッセージを送信します。
次のサンプルは、Lite トピックにメッセージをパブリッシュする方法を示しています。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class PublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
}
// Publish messages to a topic.
public static void publisherExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
throws ApiException, ExecutionException, InterruptedException {
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
System.out.println("Publisher is shut down.");
}
}
}
}
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
print(
f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
)
クライアント ライブラリは非同期でメッセージを送信し、エラーを処理します。エラーが発生すると、クライアント ライブラリによりメッセージが再送されます。
- Pub/Sub Lite サービスはストリームを閉じます。
- クライアント ライブラリはメッセージをバッファして、Lite トピックへの接続を再確立します。
- クライアント ライブラリはメッセージを順番に送信します。
メッセージをパブリッシュすると、Pub/Sub Lite サービスによってメッセージはパーティションに格納され、メッセージ ID がパブリッシャーに返されます。
順序指定キーの使用
メッセージの順序指定キーが同じ場合、クライアント ライブラリは同じパーティションにメッセージを割り当てます。順序指定キーは最大 1,024 バイトの文字列にする必要があります。
順序指定キーは、メッセージの key
フィールドにあり、クライアント ライブラリを使って設定できます。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
public class PublishWithOrderingKeyExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId);
}
// Publish a message to a topic with an ordering key.
public static void publishWithOrderingKeyExample(
String cloudRegion, char zoneId, long projectNumber, String topicId)
throws ApiException, ExecutionException, InterruptedException {
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setName(TopicName.of(topicId))
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
Publisher publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
String message = "message-with-ordering-key";
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
// Messages of the same ordering key will always get published to the
// same partition. When OrderingKey is unset, messages can get published
// to different partitions if more than one partition exists for the topic.
.setOrderingKey("testing")
.build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
String ackId = future.get();
PublishMetadata metadata = PublishMetadata.decode(ackId);
System.out.println("Published a message with ordering key:\n" + metadata);
}
}
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
for message in range(num_messages):
data = f"{message}"
# Messages of the same ordering key will always get published to the same partition.
# When ordering_key is unset, messsages can get published ot different partitions if
# more than one partition exists for the topic.
api_future = publisher_client.publish(
topic_path, data.encode("utf-8"), ordering_key="testing"
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
)
print(
f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)
順序指定キーを使用して、複数のメッセージを同じパーティションに送信できます。これにより、サブスクライバーはメッセージを順番に受信します。クライアント ライブラリによって、同じパーティションに複数の順序指定キーが割り当てられることがあります。
属性の使用
メッセージ属性は、メッセージに関するメタデータが含まれる Key-Value ペアです。属性はテキスト文字列かバイト文字列で指定します。
属性はメッセージの attributes
フィールドにあります。クライアント ライブラリを使用して属性を設定できます。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
public class PublishWithCustomAttributesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId);
}
// Publish messages to a topic with custom attributes.
public static void publishWithCustomAttributesExample(
String cloudRegion, char zoneId, long projectNumber, String topicId)
throws ApiException, ExecutionException, InterruptedException {
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setName(TopicName.of(topicId))
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
Publisher publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
String message = "message-with-custom-attributes";
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
// Add two sets of custom attributes to the message.
.putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
.build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
String ackId = future.get();
PublishMetadata metadata = PublishMetadata.decode(ackId);
System.out.println("Published a message with custom attributes:\n" + metadata);
}
}
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(
topic_path, data.encode("utf-8"), year="2020", author="unknown",
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
)
print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")
属性によってメッセージの処理方法を知ることが可能です。サブスクライバーは、メッセージの attributes
フィールドを解析し、その属性に基づいてメッセージを処理できます。
メッセージのバッチ処理
クライアント ライブラリはメッセージをバッチで公開します。バッチが大きいほど、コンピューティング リソースの使用量は少なくなりますが、レイテンシが増加します。バッチサイズはバッチ設定で変更できます。
次の表に、構成できるバッチ処理の設定を一覧表示します。
設定 | 説明 | デフォルト |
---|---|---|
リクエスト サイズ | バッチの最大サイズ(バイト単位)。 | 3.5 MiB |
メッセージの数 | バッチ内のメッセージの最大数。 | メッセージ 1,000 件 |
パブリッシュの遅延 | メッセージをバッチに追加してからバッチを Lite トピックに送信するまでの時間(ミリ秒単位)。 | 50 ミリ秒 |
クライアント ライブラリでバッチ処理の設定を構成できます。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
publishWithBatchSettingsExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
}
// Publish messages to a topic with batch settings.
public static void publishWithBatchSettingsExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
throws ApiException, ExecutionException, InterruptedException {
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
long messageCountBatchSize = 100L; // default : 1000L message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setRequestByteThreshold(requestBytesThreshold)
.setElementCountThreshold(messageCountBatchSize)
.setDelayThreshold(publishDelayThreshold)
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder()
.setTopicPath(topicPath)
.setBatchingSettings(batchingSettings)
.build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
System.out.println("Published " + ackIds.size() + " messages with batch settings.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
}
}
}
}
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
# 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
max_bytes=2 * 1024 * 1024,
# 100 ms. Default to 50 ms.
max_latency=0.1,
# Default to 1000.
max_messages=100,
)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
per_partition_batching_settings=batch_setttings
) as publisher_client:
for message in range(num_messages):
data = f"{message}"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
print(
f"Published {data} to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
)
print(
f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)
パブリッシャー アプリケーションが開始されると、クライアント ライブラリによって Lite トピックの各パーティションのバッチが作成されます。たとえば、Lite トピックに 2 つのパーティションがある場合、パブリッシャーは 2 つのバッチを作成し、各バッチをパーティションに送信します。
メッセージをパブリッシュした後、クライアント ライブラリは、バッチが最大リクエスト サイズ、メッセージの最大数、またはパブリッシュの遅延を超えるまでメッセージをバッファします。
メッセージの順序指定
Lite トピックは、メッセージをパブリッシュするときに各パーティション内のメッセージを並べ替えます。同じパーティションにメッセージを割り当てるには、順序指定キーを使用します。
Pub/Sub Lite はパーティションのメッセージを順番に配信し、サブスクライバーは順番にメッセージを処理できます。詳細については、メッセージの受信をご覧ください。