単一パーティション Lite トピックにメッセージをパブリッシュして、今後のパブリッシュを非同期的に解決します。
このコードサンプルが含まれるドキュメント ページ
コンテキストで使用されているコードサンプルを表示するには、次のドキュメントをご覧ください。
コードサンプル
Java
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class PublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
}
// Publish messages to a topic.
public static void publisherExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
throws ApiException, ExecutionException, InterruptedException {
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
System.out.println("Publisher is shut down.");
}
}
}
}
Python
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
PublishMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
publish_metadata = PublishMetadata.decode(message_id)
print(
f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
)