Publicar mensagens com configurações de lote.
Mais informações
Para ver a documentação detalhada que inclui este exemplo de código, consulte:
Exemplo de código
Go
Para autenticar no Pub/Sub Lite, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
func publishWithBatchSettings(w io.Writer, projectID, zone, topicID string, messageCount int) error {
// projectID := "my-project-id"
// zone := "us-central1-a"
// topicID := "my-topic"
// messageCount := 10
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
// Batch settings control how the publisher batches messages. These settings
// apply per partition.
// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables
// for DefaultPublishSettings.
settings := pscompat.PublishSettings{
ByteThreshold: 5 * 1024, // 5 KiB
CountThreshold: 1000, // 1,000 messages
DelayThreshold: 100 * time.Millisecond,
}
// Create the publisher client.
publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topicPath, settings)
if err != nil {
return fmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Publish requests are sent to the server based on request size, message
// count and time since last publish, whichever condition is met first.
var results []*pubsub.PublishResult
for i := 0; i < messageCount; i++ {
r := publisher.Publish(ctx, &pubsub.Message{
Data: []byte(fmt.Sprintf("message-%d", i)),
})
results = append(results, r)
}
// Print publish results.
var publishedCount int
for _, r := range results {
// Get blocks until the result is ready.
id, err := r.Get(ctx)
if err != nil {
// NOTE: A failed PublishResult indicates that the publisher client
// encountered a fatal error and has permanently terminated. After the
// fatal error has been resolved, a new publisher client instance must be
// created to republish failed messages.
fmt.Fprintf(w, "Publish error: %v\n", err)
continue
}
fmt.Fprintf(w, "Published: %v\n", id)
publishedCount++
}
fmt.Fprintf(w, "Published %d messages with batch settings\n", publishedCount)
return publisher.Error()
}
Java
Para autenticar no Pub/Sub Lite, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
int messageCount = 100;
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publishWithBatchSettingsExample(
cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
}
// Publish messages to a topic with batch settings.
public static void publishWithBatchSettingsExample(
String cloudRegion,
char zoneId,
long projectNumber,
String topicId,
int messageCount,
boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
Publisher publisher = null;
List<ApiFuture<String>> futures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
long messageCountBatchSize = 100L; // default : 1000L message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setRequestByteThreshold(requestBytesThreshold)
.setElementCountThreshold(messageCountBatchSize)
.setDelayThreshold(publishDelayThreshold)
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder()
.setTopicPath(topicPath)
.setBatchingSettings(batchingSettings)
.build();
publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
for (int i = 0; i < messageCount; i++) {
String message = "message-" + i;
// Convert the message to a byte string.
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
futures.add(future);
}
} finally {
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
System.out.println("Published " + ackIds.size() + " messages with batch settings.");
if (publisher != null) {
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
}
}
}
}
Python
Para autenticar no Pub/Sub Lite, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
# 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
max_bytes=2 * 1024 * 1024,
# 100 ms. Default to 50 ms.
max_latency=0.1,
# Default to 1000.
max_messages=100,
)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
per_partition_batching_settings=batch_setttings
) as publisher_client:
for message in range(num_messages):
data = f"{message}"
api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
# result() blocks. To resolve API futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
print(
f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)
A seguir
Para pesquisar e filtrar amostras de código para outros produtos do Google Cloud, consulte o navegador de amostra do Google Cloud.