단일 파티션 라이트 주제에 메시지를 게시하고 이후 게시를 비동기식으로 확인합니다.
더 살펴보기
이 코드 샘플이 포함된 자세한 문서는 다음을 참조하세요.
코드 샘플
Go
Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
package main
import (
"context"
"flag"
"fmt"
"log"
"sync"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
"golang.org/x/sync/errgroup"
)
func main() {
// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
// sample.
projectID := flag.String("project_id", "", "Cloud Project ID")
zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
messageCount := flag.Int("message_count", 100, "The number of messages to send")
flag.Parse()
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)
// Create the publisher client.
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
log.Fatalf("pscompat.NewPublisherClient error: %v", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Collect any messages that need to be republished with a new publisher
// client.
var toRepublish []*pubsub.Message
var toRepublishMu sync.Mutex
// Publish messages. Messages are automatically batched.
g := new(errgroup.Group)
for i := 0; i < *messageCount; i++ {
msg := &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
}
result := publisher.Publish(ctx, msg)
g.Go(func() error {
// Get blocks until the result is ready.
id, err := result.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client
// encountered a fatal error and has permanently terminated. After the
// fatal error has been resolved, a new publisher client instance must
// be created to republish failed messages.
fmt.Printf("Publish error: %v\n", err)
toRepublishMu.Lock()
toRepublish = append(toRepublish, msg)
toRepublishMu.Unlock()
return err
}
// Metadata decoded from the id contains the partition and offset.
metadata, err := pscompat.ParseMessageMetadata(id)
if err != nil {
fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
return err
}
fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
return nil
})
}
if err := g.Wait(); err != nil {
fmt.Printf("Publishing finished with error: %v\n", err)
}
fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))
// Print the error that caused the publisher client to terminate (if any),
// which may contain more context than PublishResults.
if err := publisher.Error(); err != nil {
fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
}
}
Java
Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class PublisherExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
}
// Publish messages to a topic.
public static void publisherExample(
String cloudRegion,
char zoneId,
long projectNumber,
String topicId,
int messageCount,
boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message. Messages are automatically batched.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(MessageMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
System.out.println("Publisher is shut down.");
}
}
}
}
Python
Pub/Sub Lite에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
다음 단계
다른 Google Cloud 제품의 코드 샘플을 검색하고 필터링하려면 Google Cloud 샘플 브라우저를 참조하세요.