カスタム属性を使用してメッセージをパブリッシュする。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
Go
Pub/Sub Lite に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/pubsublite/pscompat"
)
func publishWithCustomAttributes(w io.Writer, projectID, zone, topicID string) error {
// projectID := "my-project-id"
// zone := "us-central1-a"
// topicID := "my-topic"
ctx := context.Background()
topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
// Create the publisher client.
publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
if err != nil {
return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
}
// Ensure the publisher will be shut down.
defer publisher.Stop()
// Publish a message with custom attributes.
result := publisher.Publish(ctx, &pubsub.Message{
Data: []byte("message-with-custom-attributes"),
Attributes: map[string]string{
"year": "2020",
"author": "unknown",
},
})
// Get blocks until the result is ready.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("publish error: %w", err)
}
fmt.Fprintf(w, "Published a message with custom attributes: %v\n", id)
return publisher.Error()
}
Java
Pub/Sub Lite に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
public class PublishWithCustomAttributesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String cloudRegion = "your-cloud-region";
char zoneId = 'b';
// Choose an existing topic for the publish example to work.
String topicId = "your-topic-id";
long projectNumber = Long.parseLong("123456789");
// True if using a regional location. False if using a zonal location.
// https://cloud.google.com/pubsub/lite/docs/topics
boolean regional = false;
publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId, regional);
}
// Publish messages to a topic with custom attributes.
public static void publishWithCustomAttributesExample(
String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
throws ApiException, ExecutionException, InterruptedException {
CloudRegionOrZone location;
if (regional) {
location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
} else {
location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
}
TopicPath topicPath =
TopicPath.newBuilder()
.setProject(ProjectNumber.of(projectNumber))
.setLocation(location)
.setName(TopicName.of(topicId))
.build();
PublisherSettings publisherSettings =
PublisherSettings.newBuilder().setTopicPath(topicPath).build();
Publisher publisher = Publisher.create(publisherSettings);
// Start the publisher. Upon successful starting, its state will become RUNNING.
publisher.startAsync().awaitRunning();
// Prepare the message data as a byte string.
String messageData = "message-with-custom-attributes";
ByteString data = ByteString.copyFromUtf8(messageData);
// Prepare a protobuf-encoded event timestamp for the message.
Instant now = Instant.now();
String eventTime =
MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
// Add two sets of custom attributes to the message.
.putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
// Add an event timestamp as an attribute.
.putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, eventTime)
.build();
// Publish a message.
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Shut down the publisher.
publisher.stopAsync().awaitTerminated();
String ackId = future.get();
MessageMetadata metadata = MessageMetadata.decode(ackId);
System.out.println("Published a message with custom attributes:\n" + metadata);
}
}
Python
Pub/Sub Lite に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
MessageMetadata,
TopicPath,
)
# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True
if regional:
location = CloudRegion(cloud_region)
else:
location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)
# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
data = "Hello world!"
api_future = publisher_client.publish(
topic_path,
data.encode("utf-8"),
year="2020",
author="unknown",
)
# result() blocks. To resolve api futures asynchronously, use add_done_callback().
message_id = api_future.result()
message_metadata = MessageMetadata.decode(message_id)
print(
f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
)
print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。