このドキュメントでは、メッセージのパブリッシュについて説明します。
パブリッシャー アプリケーションによって、メッセージが作成され、トピックに送信されます。Pub/Sub では、既存のサブスクライバーに対して、メッセージの at-least-once 配信とベストエフォートの順序指定を実施しています。
パブリッシャー アプリケーションの一般的なフローは次のとおりです。
- データを含むメッセージを作成します。
- リクエストを Pub/Sub サーバーに送信し、指定されたトピックにメッセージをパブリッシュします。
始める前に
パブリッシュ ワークフローを構成する前に、次のタスクを完了していることを確認してください。
メッセージの形式
メッセージは、メッセージ データとメタデータを含むフィールドで構成されます。メッセージに少なくとも次のいずれかを指定します。
REST API を使用している場合、メッセージ データは Base64 でエンコードされている必要があります。
Pub/Sub サービスは、次のフィールドをメッセージに追加します。
- トピックに一意のメッセージ ID
- Pub/Sub サービスがメッセージを受信した時点のタイムスタンプ
メッセージをパブリッシュする
メッセージは、Google Cloud CLI または Pub/Sub API を使用してパブリッシュできます。クライアント ライブラリは、メッセージを非同期的にパブリッシュできます。
Console
メッセージをパブリッシュする手順は次のとおりです。
Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。
トピック ID をクリックします。
[トピックの詳細] ページの [メッセージ] で、[メッセージをパブリッシュ] をクリックします。
[メッセージ本文] フィールドにメッセージのデータを入力します。
省略可: メッセージ属性を追加します。
[属性を追加] をクリックします。
属性に対応するキーと値を入力します。
[発行] をクリックします。
gcloud
メッセージをパブリッシュするには、gcloud pubsub topics publish コマンドを使用します。
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE_DATA \ [--attribute=KEY="VALUE",...]
以下を置き換えます。
- TOPIC_ID: トピックの ID
- MESSAGE_DATA: メッセージ データを含む文字列
- KEY: メッセージ属性のキー
- VALUE: メッセージ属性のキーの値
REST
メッセージをパブリッシュするには、次のような POST リクエストを送信します。
POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish Content-Type: application/json Authorization: Bearer $(gcloud auth application-default print-access-token)
以下を置き換えます。
- PROJECT_ID: トピックがあるプロジェクトのプロジェクト ID
- TOPIC_ID: トピックの ID
リクエスト本文に次のフィールドを指定します。
{ "messages": [ { "attributes": { "KEY": "VALUE", ... }, "data": "MESSAGE_DATA", } ] }
以下を置き換えます。
- KEY: メッセージ属性のキー
- VALUE: メッセージ属性のキーの値
- MESSAGE_DATA: Base64 でエンコードされたメッセージ データの文字列
メッセージには、空でないデータ フィールドか、少なくとも 1 つの属性が含まれている必要があります。
リクエストが成功した場合のレスポンスは、メッセージ ID が含まれる JSON オブジェクトです。次の例は、メッセージ ID が含まれるレスポンスを示しています。
{ "messageIds": [ "19916711285", ] }
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}.SetData("Hello World!").Build());
auto done = message_id.then([](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Hello World! published with id=" << *id << "\n";
});
// Block until the message is published
done.get();
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishMessagesAsyncSample
{
public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error ocurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
)
func publishThatScales(w io.Writer, projectID, topicID string, n int) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topicID)
for i := 0; i < n; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
wg.Add(1)
go func(i int, res *pubsub.PublishResult) {
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
id, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
atomic.AddUint64(&totalErrors, 1)
return
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}(i, result)
}
wg.Wait()
if totalErrors > 0 {
return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, n)
}
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class PublishWithErrorHandlerExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithErrorHandlerExample(projectId, topicId);
}
public static void publishWithErrorHandlerExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
List<String> messages = Arrays.asList("first message", "second message");
for (final String message : messages) {
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle success / failure
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + message);
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println("Published message ID: " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessage(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
try {
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer});
console.log(`Message ${messageId} published.`);
} catch (error) {
console.error(`Received error while publishing: ${error.message}`);
process.exitCode = 1;
}
}
PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。
use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message($projectId, $topicName, $message)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$topic->publish((new MessageBuilder)->setData($message)->build());
print('Message published' . PHP_EOL);
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
"""Publishes multiple messages to a Pub/Sub topic with an error handler."""
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Callable
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
def get_callback(
publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
try:
# Wait 60 seconds for the publish call to succeed.
print(publish_future.result(timeout=60))
except futures.TimeoutError:
print(f"Publishing {data} timed out.")
return callback
for i in range(10):
data = str(i)
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data.encode("utf-8"))
# Non-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
begin
topic.publish_async "This is a test message." do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
rescue StandardError => e
puts "Received error while publishing: #{e.message}"
end
メッセージをパブリッシュすると、Pub/Sub サービスはパブリッシャーにメッセージ ID を返します。
属性を使用する
カスタム属性を Pub/Sub メッセージにメタデータとして埋め込むことができます。属性はテキスト文字列かバイト文字列で指定します。メッセージあたりの設定可能な属性の最大数は 100 です。属性キーは goog
で始まり、256 バイトを超えないようにする必要があります。属性値は 1,024 バイトを超えないようにする必要があります。メッセージ スキーマは次のように表します。
{ "data": string, "attributes": { string: string, ... }, "messageId": string, "publishTime": string, "orderingKey": string }
PubsubMessage
JSON スキーマは、REST および RPC ドキュメントの一部としてパブリッシュされます。
gcloud
gcloud pubsub topics publish my-topic --message="hello" \ --attribute="origin=gcloud-sample,username=gcp"
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<future<void>> done;
for (int i = 0; i != 10; ++i) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}
.SetData("Hello World! [" + std::to_string(i) + "]")
.SetAttribute("origin", "cpp-sample")
.SetAttribute("username", "gcp")
.Build());
done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Message " << i << " published with id=" << *id << "\n";
}));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Threading.Tasks;
public class PublishMessageWithCustomAttributesAsyncSample
{
public async Task PublishMessageWithCustomAttributesAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
var pubsubMessage = new PubsubMessage
{
// The data is any arbitrary ByteString. Here, we're using text.
Data = ByteString.CopyFromUtf8(messageText),
// The attributes provide metadata in a string-to-string dictionary.
Attributes =
{
{ "year", "2020" },
{ "author", "unknown" }
}
};
string message = await publisher.PublishAsync(pubsubMessage);
Console.WriteLine($"Published message {message}");
}
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func publishCustomAttributes(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
t := client.Topic(topicID)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Hello world!"),
Attributes: map[string]string{
"origin": "golang",
"username": "gcp",
},
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Get: %v", err)
}
fmt.Fprintf(w, "Published message with custom attributes; msg ID: %v\n", id)
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithCustomAttributesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithCustomAttributesExample(projectId, topicId);
}
public static void publishWithCustomAttributesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(data)
.putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
.build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with custom attributes: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishMessageWithCustomAttributes(topicNameOrId, data) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
origin: 'nodejs-sample',
username: 'gcp',
};
const messageId = await pubSubClient
.topic(topicNameOrId)
.publishMessage({data: dataBuffer, attributes: customAttributes});
console.log(`Message ${messageId} published.`);
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
# Add two attributes, origin and username, to the message
future = publisher.publish(
topic_path, data, origin="python-sample", username="gcp"
)
print(future.result())
print(f"Published messages with custom attributes to {topic_path}.")
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
origin: "ruby-sample",
username: "gcp" do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message with custom attributes published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
順序指定キーを使用する
メッセージの順序指定キーが同じであり、同じリージョンにメッセージをパブリッシュする場合、サブスクライバーは順番にメッセージを受信できます。順序指定キーが含まれるメッセージをパブリッシュすると、レイテンシが増加する可能性があります。同じリージョンにメッセージをパブリッシュするには、リージョン エンドポイントを使用します。
Google Cloud コンソール、Google Cloud CLI、または Pub/Sub API を使用して、順序指定キーでメッセージをパブリッシュできます。
Console
Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。
トピック ID をクリックします。
[トピックの詳細] ページの [メッセージ] で、[メッセージをパブリッシュ] をクリックします。
[メッセージ本文] フィールドにメッセージのデータを入力します。
[メッセージの順序指定] フィールドに順序指定キーを入力します。
[発行] をクリックします。
gcloud
順序指定キーを使用してメッセージをパブリッシュするには、gcloud pubsub topics publish
コマンドと --ordering-key
フラグを使用します。
gcloud pubsub topics publish TOPIC_ID \ --message=MESSAGE_DATA \ --ordering-key=ORDERING_KEY
以下を置き換えます。
- TOPIC_ID: トピックの ID
- MESSAGE_DATA: メッセージ データを含む文字列
- ORDERING_KEY: 順序指定キーが含まれる文字列
REST
順序指定キーを使用してメッセージをパブリッシュするには、次のような POST リクエストを送信します。
POST https://pubsub.googleapis.com/v1/projects/PROJECT_ID/topics/TOPIC_ID:publish Content-Type: application/json Authorization: Bearer $(gcloud auth application-default print-access-token)
以下を置き換えます。
- PROJECT_ID: トピックがあるプロジェクトのプロジェクト ID
- TOPIC_ID: トピックの ID
リクエスト本文に次のフィールドを指定します。
{ "messages": [ { "attributes": { "KEY": "VALUE", ... }, "data": "MESSAGE_DATA", "ordering_key": "ORDERING_KEY", } ] }
以下を置き換えます。
- KEY: メッセージ属性のキー
- VALUE: メッセージ属性のキーの値
- MESSAGE_DATA: Base64 でエンコードされたメッセージ データの文字列
- ORDERING_KEY: 順序指定キーが含まれる文字列
メッセージには、空でないデータ フィールドか、少なくとも 1 つの属性が含まれている必要があります。
リクエストが成功した場合のレスポンスは、メッセージ ID が含まれる JSON オブジェクトです。次の例は、メッセージ ID が含まれるレスポンスを示しています。
{ "messageIds": [ "19916711285", ] }
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto& datum : data) {
auto message_id =
publisher.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build());
std::string ack_id = datum.ordering_key + "#" + datum.data;
done.push_back(message_id.then([ack_id](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Message " << ack_id << " published with id=" << *id
<< "\n";
}));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishOrderedMessagesAsyncSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
// Sending messages to the same region ensures they are received in order even when multiple publishers are used.
Endpoint = "us-east1-pubsub.googleapis.com:443",
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
func publishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topicID)
t.EnableMessageOrdering = true
messages := []struct {
message string
orderingKey string
}{
{
message: "message1",
orderingKey: "key1",
},
{
message: "message2",
orderingKey: "key2",
},
{
message: "message3",
orderingKey: "key1",
},
{
message: "message4",
orderingKey: "key2",
},
}
for _, m := range messages {
res := t.Publish(ctx, &pubsub.Message{
Data: []byte(m.message),
OrderingKey: m.orderingKey,
})
wg.Add(1)
go func(res *pubsub.PublishResult) {
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
_, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
return
}
}(res)
}
wg.Wait()
if totalErrors > 0 {
fmt.Fprintf(w, "%d of 4 messages did not publish successfully", totalErrors)
return
}
fmt.Fprint(w, "Published 4 messages with ordering keys successfully\n")
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class PublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
publishWithOrderingKeysExample(projectId, topicId);
}
public static void publishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.setEnableMessageOrdering(true)
.build();
try {
Map<String, String> messages = new LinkedHashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub({
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
apiEndpoint: 'us-east1-pubsub.googleapis.com:443',
});
async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Be sure to set an ordering key that matches other messages
// you want to receive in order, relative to each other.
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const publishOptions = {
messageOrdering: true,
};
// Publishes the message
const messageId = await pubSubClient
.topic(topicNameOrId, publishOptions)
.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
print(future.result())
print(f"Published messages with ordering keys to {topic_path}.")
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
topic.publish_async "This is message \##{i}.",
ordering_key: "ordering-key"
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
puts "Messages published with ordering key."
順序指定キーを使用したパブリッシュが失敗すると、パブリッシャー内の同じ順序指定キーのキューに入れられたメッセージが失敗し、この順序指定キーの今後のパブリッシュ リクエストも失敗します。このような障害が発生した場合は、順序指定キーを使用してパブリッシュを再開する必要があります。パブリッシュ オペレーションを再開する例については、順序指定キーを使用したリクエストの再試行をご覧ください。
スキーマを使用する
スキーマに関連付けられているトピックにメッセージを公開できます。詳細については、スキーマの作成と管理をご覧ください。トピックの作成時に指定したスキーマと形式で、メッセージをエンコードする必要があります。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Avronamespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto constexpr kNewYork =
R"js({ "name": "New York", "post_abbr": "NY" })js";
auto constexpr kPennsylvania =
R"js({ "name": "Pennsylvania", "post_abbr": "PA" })js";
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto const* data : {kNewYork, kPennsylvania}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder{}.SetData(data).Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<std::pair<std::string, std::string>> states{
{"New York", "NY"},
{"Pennsylvania", "PA"},
};
std::vector<future<void>> done;
auto handler = [](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
};
for (auto& data : states) {
google::cloud::pubsub::samples::State state;
state.set_name(data.first);
state.set_post_abbr(data.second);
done.push_back(publisher
.Publish(pubsub::MessageBuilder{}
.SetData(state.SerializeAsString())
.Build())
.then(handler));
}
// Block until all messages are published.
for (auto& d : done) d.get();
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Avro
using Avro.IO;
using Avro.Specific;
using Google.Cloud.PubSub.V1;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishAvroMessagesAsyncSample
{
public async Task<int> PublishAvroMessagesAsync(string projectId, string topicId, IEnumerable<AvroUtilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
using (var ms = new MemoryStream())
{
var encoder = new BinaryEncoder(ms);
var writer = new SpecificDefaultWriter(state.Schema);
writer.Write(state, encoder);
messageId = await publisher.PublishAsync(ms.ToArray());
}
break;
case Encoding.Json:
var jsonMessage = AvroUtilities.StateUtils.StateToJsonString(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error ocurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
using Google.Cloud.PubSub.V1;
using Google.Protobuf;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishProtoMessagesAsyncSample
{
public async Task<int> PublishProtoMessagesAsync(string projectId, string topicId, IEnumerable<Utilities.State> messageStates)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
PublisherClient publisher = await PublisherClient.CreateAsync(topicName);
PublisherServiceApiClient publishApi = PublisherServiceApiClient.Create();
var topic = publishApi.GetTopic(topicName);
int publishedMessageCount = 0;
var publishTasks = messageStates.Select(async state =>
{
try
{
string messageId = null;
switch (topic.SchemaSettings.Encoding)
{
case Encoding.Binary:
var binaryMessage = state.ToByteString();
messageId = await publisher.PublishAsync(binaryMessage);
break;
case Encoding.Json:
var jsonMessage = JsonFormatter.Default.Format(state);
messageId = await publisher.PublishAsync(jsonMessage);
break;
}
Console.WriteLine($"Published message {messageId}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error ocurred when publishing message {state}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Avroimport (
"context"
"fmt"
"io"
"os"
"cloud.google.com/go/pubsub"
"github.com/linkedin/goavro/v2"
)
func publishAvroRecords(w io.Writer, projectID, topicID, avscFile string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
avroSource, err := os.ReadFile(avscFile)
if err != nil {
return fmt.Errorf("ioutil.ReadFile err: %v", err)
}
codec, err := goavro.NewCodec(string(avroSource))
if err != nil {
return fmt.Errorf("goavro.NewCodec err: %v", err)
}
record := map[string]interface{}{"name": "Alaska", "post_abbr": "AK"}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %v", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = codec.BinaryFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.BinaryFromNative err: %v", err)
}
case pubsub.EncodingJSON:
msg, err = codec.TextualFromNative(nil, record)
if err != nil {
return fmt.Errorf("codec.TextualFromNative err: %v", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %v", err)
}
fmt.Fprintf(w, "Published avro record: %s\n", string(msg))
return nil
}
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
)
func publishProtoMessages(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
state := &statepb.State{
Name: "Alaska",
PostAbbr: "AK",
}
// Get the topic encoding type.
t := client.Topic(topicID)
cfg, err := t.Config(ctx)
if err != nil {
return fmt.Errorf("topic.Config err: %v", err)
}
encoding := cfg.SchemaSettings.Encoding
var msg []byte
switch encoding {
case pubsub.EncodingBinary:
msg, err = proto.Marshal(state)
if err != nil {
return fmt.Errorf("proto.Marshal err: %v", err)
}
case pubsub.EncodingJSON:
msg, err = protojson.Marshal(state)
if err != nil {
return fmt.Errorf("protojson.Marshal err: %v", err)
}
default:
return fmt.Errorf("invalid encoding: %v", encoding)
}
result := t.Publish(ctx, &pubsub.Message{
Data: msg,
})
_, err = result.Get(ctx)
if err != nil {
return fmt.Errorf("result.Get: %v", err)
}
fmt.Fprintf(w, "Published proto message with %#v encoding: %s\n", encoding, string(msg))
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Avro
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import utilities.State;
public class PublishAvroRecordsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with an Avro schema.
String topicId = "your-topic-id";
publishAvroRecordsExample(projectId, topicId);
}
public static void publishAvroRecordsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
// Instantiate an avro-tools-generated class defined in `us-states.avsc`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
Publisher publisher = null;
block:
try {
publisher = Publisher.newBuilder(topicName).build();
// Prepare to serialize the object to the output stream.
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
Encoder encoder = null;
// Prepare an appropriate encoder for publishing to the topic.
switch (encoding) {
case BINARY:
System.out.println("Preparing a BINARY encoder...");
encoder = EncoderFactory.get().directBinaryEncoder(byteStream, /*reuse=*/ null);
break;
case JSON:
System.out.println("Preparing a JSON encoder...");
encoder = EncoderFactory.get().jsonEncoder(State.getClassSchema(), byteStream);
break;
default:
break block;
}
// Encode the object and write it to the output stream.
state.customEncode(encoder);
encoder.flush();
// Publish the encoded object as a Pub/Sub message.
ByteString data = ByteString.copyFrom(byteStream.toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(data).build();
System.out.println("Publishing message: " + message);
ApiFuture<String> future = publisher.publish(message);
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.Encoding;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import utilities.StateProto.State;
public class PublishProtobufMessagesExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Use a topic created with a proto schema.
String topicId = "your-topic-id";
publishProtobufMessagesExample(projectId, topicId);
}
public static void publishProtobufMessagesExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
Encoding encoding = null;
TopicName topicName = TopicName.of(projectId, topicId);
// Get the topic encoding type.
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
encoding = topicAdminClient.getTopic(topicName).getSchemaSettings().getEncoding();
}
Publisher publisher = null;
// Instantiate a protoc-generated class defined in `us-states.proto`.
State state = State.newBuilder().setName("Alaska").setPostAbbr("AK").build();
block:
try {
publisher = Publisher.newBuilder(topicName).build();
PubsubMessage.Builder message = PubsubMessage.newBuilder();
// Prepare an appropriately formatted message based on topic encoding.
switch (encoding) {
case BINARY:
message.setData(state.toByteString());
System.out.println("Publishing a BINARY-formatted message:\n" + message);
break;
case JSON:
String jsonString = JsonFormat.printer().omittingInsignificantWhitespace().print(state);
message.setData(ByteString.copyFromUtf8(jsonString));
System.out.println("Publishing a JSON-formatted message:\n" + message);
break;
default:
break block;
}
// Publish the message.
ApiFuture<String> future = publisher.publish(message.build());
System.out.println("Published message ID: " + future.get());
} finally {
if (publisher != null) {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Avro/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the Apache Avro library
const avro = require('avro-js');
const fs = require('fs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishAvroRecords(topicNameOrId) {
// Get the topic metadata to learn about its schema encoding.
const topic = pubSubClient.topic(topicNameOrId);
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Make an encoder using the official avro-js library.
const definition = fs
.readFileSync('system-test/fixtures/provinces.avsc')
.toString();
const type = avro.parse(definition);
// Encode the message.
const province = {
name: 'Ontario',
post_abbr: 'ON',
};
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = type.toBuffer(province);
break;
case Encodings.Json:
dataBuffer = Buffer.from(type.toString(province));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Avro record ${messageId} published.`);
}
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub, Encodings} = require('@google-cloud/pubsub');
// And the protobufjs library
const protobuf = require('protobufjs');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishProtobufMessages(topicNameOrId) {
// Get the topic metadata to learn about its schema.
const topic = pubSubClient.topic(topicNameOrId);
const [topicMetadata] = await topic.getMetadata();
const topicSchemaMetadata = topicMetadata.schemaSettings;
if (!topicSchemaMetadata) {
console.log(`Topic ${topicNameOrId} doesn't seem to have a schema.`);
return;
}
const schemaEncoding = topicSchemaMetadata.encoding;
// Encode the message.
const province = {
name: 'Ontario',
postAbbr: 'ON',
};
// Make an encoder using the protobufjs library.
//
// Since we're providing the test message for a specific schema here, we'll
// also code in the path to a sample proto definition.
const root = await protobuf.load('system-test/fixtures/provinces.proto');
const Province = root.lookupType('utilities.Province');
const message = Province.create(province);
let dataBuffer;
switch (schemaEncoding) {
case Encodings.Binary:
dataBuffer = Buffer.from(Province.encode(message).finish());
break;
case Encodings.Json:
dataBuffer = Buffer.from(JSON.stringify(message.toJSON()));
break;
default:
console.log(`Unknown schema encoding: ${schemaEncoding}`);
break;
}
if (!dataBuffer) {
console.log(`Invalid encoding ${schemaEncoding} on the topic.`);
return;
}
const messageId = await topic.publish(dataBuffer);
console.log(`Protobuf message ${messageId} published.`);
}
PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。
Avrouse Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use AvroStringIO;
use AvroSchema;
use AvroIODatumWriter;
use AvroDataIOWriter;
/**
* Publish a message using an AVRO schema.
*
* This sample uses `wikimedia/avro` for AVRO encoding.
*
* @param string $projectId
* @param string $topicId
* @param string $definitionFile
* @return void
*/
function publish_avro_records($projectId, $topicId, $definitionFile)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$definition = file_get_contents($definitionFile);
$messageData = [
'name' => 'Alaska',
'post_abbr' => 'AK',
];
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as AVRO binary.
$io = new AvroStringIO();
$schema = AvroSchema::parse($definition);
$writer = new AvroIODatumWriter($schema);
$dataWriter = new AvroDataIOWriter($io, $writer, $schema);
$dataWriter->append($messageData);
$dataWriter->close();
// AVRO binary data must be base64-encoded.
$encodedMessageData = base64_encode($io->string());
} else {
// encode as JSON.
$encodedMessageData = json_encode($messageData);
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
use Google\Cloud\PubSub\PubSubClient;
use Google\Cloud\PubSub\V1\Encoding;
use Utilities\StateProto;
/**
* Publish a message using a protocol buffer schema.
*
* Relies on a proto message of the following form:
* ```
* syntax = "proto3";
*
* package utilities;
*
* message StateProto {
* string name = 1;
* string post_abbr = 2;
* }
* ```
*
* @param string $projectId
* @param string $topicId
* @return void
*/
function publish_proto_messages($projectId, $topicId)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$messageData = new StateProto([
'name' => 'Alaska',
'post_abbr' => 'AK',
]);
$topic = $pubsub->topic($topicId);
// get the encoding type.
$topicInfo = $topic->info();
$encoding = '';
if (isset($topicInfo['schemaSettings']['encoding'])) {
$encoding = $topicInfo['schemaSettings']['encoding'];
}
// if encoding is not set, we can't continue.
if ($encoding === '') {
printf('Topic %s does not have schema enabled', $topicId);
return;
}
// If you are using gRPC, encoding may be an integer corresponding to an
// enum value on Google\Cloud\PubSub\V1\Encoding.
if (!is_string($encoding)) {
$encoding = Encoding::name($encoding);
}
$encodedMessageData = '';
if ($encoding == 'BINARY') {
// encode as protobuf binary.
$encodedMessageData = $messageData->serializeToString();
} else {
// encode as JSON.
$encodedMessageData = $messageData->serializeToJsonString();
}
$topic->publish(['data' => $encodedMessageData]);
printf('Published message with %s encoding', $encoding);
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Avrofrom avro.io import BinaryEncoder, DatumWriter
import avro.schema as schema
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.pubsub_v1.types import Encoding
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
# Prepare to write Avro records to the binary output stream.
avro_schema = schema.parse(open(avsc_file, "rb").read())
writer = DatumWriter(avro_schema)
bout = io.BytesIO()
# Prepare some data using a Python dictionary that matches the Avro schema
record = {"name": "Alaska", "post_abbr": "AK"}
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
encoder = BinaryEncoder(bout)
writer.write(record, encoder)
data = bout.getvalue()
print(f"Preparing a binary-encoded message:\n{data.decode()}")
elif encoding == Encoding.JSON:
data_str = json.dumps(record)
print(f"Preparing a JSON-encoded message:\n{data_str}")
data = data_str.encode("utf-8")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import PublisherClient
from google.protobuf.json_format import MessageToJson
from google.pubsub_v1.types import Encoding
from utilities import us_states_pb2 # type: ignore
# TODO(developer): Replace these variables before running the sample.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_client = PublisherClient()
topic_path = publisher_client.topic_path(project_id, topic_id)
try:
# Get the topic encoding type.
topic = publisher_client.get_topic(request={"topic": topic_path})
encoding = topic.schema_settings.encoding
# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()
state.name = "Alaska"
state.post_abbr = "AK"
# Encode the data according to the message serialization type.
if encoding == Encoding.BINARY:
data = state.SerializeToString()
print(f"Preparing a binary-encoded message:\n{data}")
elif encoding == Encoding.JSON:
json_object = MessageToJson(state)
data = str(json_object).encode("utf-8")
print(f"Preparing a JSON-encoded message:\n{data}")
else:
print(f"No encoding specified in {topic_path}. Abort.")
exit(0)
future = publisher_client.publish(topic_path, data)
print(f"Published message ID: {future.result()}")
except NotFound:
print(f"{topic_id} not found.")
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
Avro# topic_id = "your-topic-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
record = { "name" => "Alaska", "post_abbr" => "AK" }
if topic.message_encoding_binary?
require "avro"
avro_schema = Avro::Schema.parse File.read(avsc_file)
writer = Avro::IO::DatumWriter.new avro_schema
buffer = StringIO.new
encoder = Avro::IO::BinaryEncoder.new buffer
writer.write record, encoder
topic.publish buffer
puts "Published binary-encoded AVRO message."
elsif topic.message_encoding_json?
require "json"
topic.publish record.to_json
puts "Published JSON-encoded AVRO message."
else
raise "No encoding specified in #{topic.name}."
end
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
require_relative "utilities/us-states_pb"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id
state = Utilities::StateProto.new name: "Alaska", post_abbr: "AK"
if topic.message_encoding_binary?
topic.publish Utilities::StateProto.encode(state)
puts "Published binary-encoded protobuf message."
elsif topic.message_encoding_json?
topic.publish Utilities::StateProto.encode_json(state)
puts "Published JSON-encoded protobuf message."
else
raise "No encoding specified in #{topic.name}."
end
パブリッシュ リクエスト内のメッセージのバッチ処理
パブリッシャー用の Pub/Sub クライアント ライブラリを使用して、トピックにメッセージをパブリッシュできます。クライアント ライブラリはバッチ機能を使用して、1 つのサービス呼び出しで複数のメッセージを一緒にパブリッシュします。メッセージのバッチ処理やグループ化により、Pub/Sub のメッセージのスループットが向上します。バッチサイズはビジネスニーズに応じて調整できます。
バッチ メッセージングは、クライアント ライブラリでデフォルトで有効になっています。バッチ メッセージングは、個々のメッセージのレイテンシを作成します。個々のメッセージは、対応するバッチがいっぱいになるまでメモリにキューに入れる必要があります。この場合、メッセージはトピックにパブリッシュされます。
コストが問題にならない場合は、複数のパブリッシャー クライアントを作成して、バッチ メッセージを無効にすることができます。このプロセスでは、パブリッシャーの数を水平方向にスケーリングすることで、レイテンシを最小限に抑え、スループットを最大化します。ただし、コストを考慮する必要があります。より少ないパブリッシャーで同等のスループットを実現する方法の 1 つとして、単一のパブリッシュ リクエストで複数のメッセージを送信する方法があります。特にアプリケーションで短期間に大量のメッセージを扱う場合は、バッチ メッセージングを使用すると、レイテンシを短縮してレイテンシを抑えることができます。
バッチ メッセージングでは、バッチサイズ(バイト数またはメッセージ数)とバッチがパブリッシュされてからの時間を構成できます。パブリッシュ時間のピークでメッセージの小規模なバッチは、レイテンシを制御するのに役立ちます。
クライアント ライブラリでバッチ メッセージングを構成する
メッセージ リクエストのサイズ、メッセージの数、時間に基づいてメッセージをバッチ処理できます。バッチ メッセージ変数のデフォルト値と変数名は、クライアント ライブラリによって異なる場合があります。たとえば、Python クライアント ライブラリの次の変数は、バッチ メッセージングを制御します。
変数 | 説明 | 値 |
---|---|---|
max_messages | バッチ内のメッセージの数。 | デフォルトは 100 です |
max_bytes | バッチの最大サイズ(MB)。 | デフォルトは 1 MB です |
max_latency | バッチがいっぱいになったとしても、バッチがパブリッシュされてからの時間。 | デフォルトは 10 ms です |
クライアント ライブラリには、1 つまたはすべての値を指定できます。バッチ メッセージング変数のいずれかの値が満たされると、クライアント ライブラリは次のメッセージのバッチをパブリッシュします。
パブリッシャーのバッチ メッセージング設定の構成方法については、次のコードサンプルをご覧ください。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default, the publisher will flush a batch after 10ms, after it
// contains more than 100 message, or after it contains more than 1MiB of
// data, whichever comes first. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
.set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
.set<pubsub::MaxBatchMessagesOption>(200)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PublishBatchedMessagesAsyncSample
{
public async Task<int> PublishBatchMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Default Settings:
// byteCountThreshold: 1000000
// elementCountThreshold: 100
// delayThreshold: 10 milliseconds
var customSettings = new PublisherClient.Settings
{
BatchingSettings = new BatchingSettings(
elementCountThreshold: 50,
byteCountThreshold: 10240,
delayThreshold: TimeSpan.FromMilliseconds(500))
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = messageTexts.Select(async text =>
{
try
{
string message = await publisher.PublishAsync(text);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {text}: {exception.Message}");
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"strconv"
"time"
"cloud.google.com/go/pubsub"
)
func publishWithSettings(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
var results []*pubsub.PublishResult
var resultErrors []error
t := client.Topic(topicID)
t.PublishSettings.ByteThreshold = 5000
t.PublishSettings.CountThreshold = 10
t.PublishSettings.DelayThreshold = 100 * time.Millisecond
for i := 0; i < 10; i++ {
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("Message " + strconv.Itoa(i)),
})
results = append(results, result)
}
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
for i, res := range results {
id, err := res.Get(ctx)
if err != nil {
resultErrors = append(resultErrors, err)
fmt.Fprintf(w, "Failed to publish: %v", err)
continue
}
fmt.Fprintf(w, "Published message %d; msg ID: %v\n", i, id)
}
if len(resultErrors) != 0 {
return fmt.Errorf("Get: %v", resultErrors[len(resultErrors)-1])
}
fmt.Fprintf(w, "Published messages with batch settings.")
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithBatchSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithBatchSettingsExample(projectId, topicId);
}
public static void publishWithBatchSettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1000 bytes
long messageCountBatchSize = 100L; // default : 100 message
Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms
// Publish request get triggered based on request size, messages count & time since last
// publish, whichever condition is met first.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(messageCountBatchSize)
.setRequestByteThreshold(requestBytesThreshold)
.setDelayThreshold(publishDelayThreshold)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with batch settings.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicName = 'YOUR_TOPIC_NAME';
// const data = JSON.stringify({foo: 'bar'});
// const maxMessages = 10;
// const maxWaitTime = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishBatchedMessages(
topicNameOrId,
data,
maxMessages,
maxWaitTime
) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
batching: {
maxMessages: maxMessages,
maxMilliseconds: maxWaitTime * 1000,
},
};
const batchPublisher = pubSubClient.topic(topicNameOrId, publishOptions);
const promises = [];
for (let i = 0; i < 10; i++) {
promises.push(
(async () => {
const messageId = await batchPublisher.publishMessage({
data: dataBuffer,
});
console.log(`Message ${messageId} published.`);
})()
);
}
await Promise.all(promises);
}
PHP
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の PHP の設定手順を実施してください。詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。
use Google\Cloud\PubSub\PubSubClient;
/**
* Publishes a message for a Pub/Sub topic.
*
* The publisher should be used in conjunction with the `google-cloud-batch`
* daemon, which should be running in the background.
*
* To start the daemon, from your project root call `vendor/bin/google-cloud-batch daemon`.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $message The message to publish.
*/
function publish_message_batch($projectId, $topicName, $message)
{
// Check if the batch daemon is running.
if (getenv('IS_BATCH_DAEMON_RUNNING') !== 'true') {
trigger_error(
'The batch daemon is not running. Call ' .
'`vendor/bin/google-cloud-batch daemon` from ' .
'your project root to start the daemon.',
E_USER_NOTICE
);
}
$batchOptions = [
'batchSize' => 100, // Max messages for each batch.
'callPeriod' => 0.01, // Max time in seconds between each batch publish.
];
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
$publisher = $topic->batchPublisher([
'batchOptions' => $batchOptions
]);
for ($i = 0; $i < 10; $i++) {
$publisher->publish(['data' => $message]);
}
print('Messages enqueued for publication.' . PHP_EOL);
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from concurrent import futures
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the batch to publish as soon as there are 10 messages
# or 1 KiB of data, or 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_messages=10, # default 100
max_bytes=1024, # default 1 MB
max_latency=1, # default 10 ms
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
# Resolve the publish future in a separate thread.
def callback(future: pubsub_v1.publisher.futures.Future) -> None:
message_id = future.result()
print(message_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch multiple messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with batch settings to {topic_path}.")
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
10.times do |i|
topic.publish_async "This is message \##{i}."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."
バッチ メッセージングを無効にする
クライアント ライブラリでバッチ処理をオフにするには、max_messages
の値を 1 に設定します。
バッチ メッセージングと順序が指定された配信
順序が指定された配信では、バッチ内のメッセージの確認応答に失敗すると、確認応答されなかったメッセージよりも前に送信されたメッセージを含め、バッチ内のすべてのメッセージが再配信されます。
バッチ メッセージングの割り当てと上限
バッチ メッセージングを構成する前に、パブリッシュのスループット割り当てやバッチの最大サイズなどの要素の影響を考慮してください。高レベルのクライアント ライブラリにより、バッチ リクエストが指定した上限内に収められます。
- 1,000 バイトは、実際のメッセージ サイズが 1,000 バイト未満であっても、コストの観点から考慮される最小リクエスト サイズです。
- Pub/Sub には、単一のバッチ パブリッシュ リクエストに対して 10 MB のサイズまたは 1,000 メッセージの上限があります。
詳細については、Pub/Sub の割り当てと上限をご覧ください。
メッセージを圧縮する
Pub/Sub を使用して大量のデータをパブリッシュする場合は、gRPC を使用してデータを圧縮することで、パブリッシャーのクライアントがパブリッシュ リクエストを送信する前にネットワーク費用を削減できます。 gRPC の Pub/Sub 圧縮では Gzip アルゴリズムが使用されます。
gRPC クライアント側の圧縮機能を使用するための圧縮率は、パブリッシャー クライアントおよび次の要因によって異なります。
データの量。ペイロードのサイズが数百バイトから数キロバイトに増加すると、圧縮率が向上します。パブリッシュ リクエストのバッチ設定によって、各パブリッシュ リクエストに含まれるデータの量が決まります。最適な結果を得るには、gRPC 圧縮と組み合わせてバッチ設定を有効にすることをおすすめします。
データの種類。JSON や XML などのテキストベースのデータは、画像などのバイナリデータよりも圧縮性が高くなります。
パブリッシャー クライアントが Google Cloud 上に存在する場合は、送信バイト数(instance/network/sent_bytes_count
)指標を使用して、パブリッシュ スループットをバイト単位で測定できます。パブリッシャー クライアントが別のアプリケーション上に存在する場合は、測定を行うためにクライアント固有のツールを使用する必要があります。
このセクションのコードサンプルは、gRPC 圧縮も含まれている Java クライアント ライブラリのコード スニペットの例を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace g = ::google::cloud;
namespace pubsub = ::google::cloud::pubsub;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
g::Options{}
// Compress any batch of messages over 10 bytes. By default, no
// messages are compressed, set this to 0 to compress all batches,
// regardless of their size.
.set<pubsub::CompressionThresholdOption>(10)
// Compress using the GZIP algorithm. By default, the library uses
// GRPC_COMPRESS_DEFLATE.
.set<pubsub::CompressionAlgorithmOption>(GRPC_COMPRESS_GZIP)));
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}.SetData("Hello World!").Build());
auto done = message_id.then([](g::future<g::StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Hello World! published with id=" << *id << "\n";
});
// Block until the message is published
done.get();
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithGrpcCompressionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
publishWithGrpcCompressionExample(projectId, topicId);
}
public static void publishWithGrpcCompressionExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set enable compression to true.
Publisher publisher = null;
try {
// Enable compression and configure the compression threshold to 10 bytes (default to 240 B).
// Publish requests of sizes > 10 B (excluding the request headers) will get compressed.
// The number of messages in a publish request is determined by publisher batch settings.
// Batching is turned off by default, i.e. each publish request contains only one message.
publisher =
Publisher.newBuilder(topicName)
.setEnableCompression(true)
.setCompressionBytesThreshold(10L)
.build();
byte[] bytes = new byte[1024];
ByteString data = ByteString.copyFrom(bytes);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic).
// You can look up the actual size of the outbound data using the Java Logging API.
// Configure logging properties as shown in
// https://github.com/googleapis/java-pubsub/tree/main/samples/snippets/src/main/resources/logging.properties
// and look for "OUTBOUND DATA" with "length=" in the output log.
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a compressed message of message ID: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
リクエストを再試行する
パブリッシュに失敗した場合、再試行を妨げるエラーが発生しない限り、自動的にリクエストが再試行されます。このサンプルコードは、再試行設定をカスタマイズしたパブリッシャーを作成しています(クライアント ライブラリによっては、再試行設定のカスタマイズがサポートされない場合があります。選択した言語の API リファレンス ドキュメントをご覧ください)。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::RetryPolicyOption>(
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(10))
.clone())
.set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([](future<StatusOr<std::string>> f) {
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Api.Gax.Grpc;
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Threading.Tasks;
public class PublishMessageWithRetrySettingsAsyncSample
{
public async Task PublishMessageWithRetrySettingsAsync(string projectId, string topicId, string messageText)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
// Retry settings control how the publisher handles retry-able failures
var maxAttempts = 3;
var initialBackoff = TimeSpan.FromMilliseconds(110); // default: 100 ms
var maxBackoff = TimeSpan.FromSeconds(70); // default : 60 seconds
var backoffMultiplier = 1.3; // default: 1.0
var totalTimeout = TimeSpan.FromSeconds(100); // default: 600 seconds
var publisher = await new PublisherClientBuilder
{
TopicName = topicName,
ApiSettings = new PublisherServiceApiSettings
{
PublishSettings = CallSettings.FromRetry(RetrySettings.FromExponentialBackoff(
maxAttempts: maxAttempts,
initialBackoff: initialBackoff,
maxBackoff: maxBackoff,
backoffMultiplier: backoffMultiplier,
retryFilter: RetrySettings.FilterForStatusCodes(StatusCode.Unavailable)))
.WithTimeout(totalTimeout)
}
}.BuildAsync();
string message = await publisher.PublishAsync(messageText);
Console.WriteLine($"Published message {message}");
}
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.threeten.bp.Duration;
public class PublishWithRetrySettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithRetrySettingsExample(projectId, topicId);
}
public static void publishWithRetrySettingsExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
try {
// Retry settings control how the publisher handles retry-able failures
Duration initialRetryDelay = Duration.ofMillis(100); // default: 100 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures, default: 1.3
Duration maxRetryDelay = Duration.ofSeconds(60); // default : 60 seconds
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 5 seconds
double rpcTimeoutMultiplier = 1.0; // default: 1.0
Duration maxRpcTimeout = Duration.ofSeconds(600); // default: 600 seconds
Duration totalTimeout = Duration.ofSeconds(600); // default: 600 seconds
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(initialRetryDelay)
.setRetryDelayMultiplier(retryDelayMultiplier)
.setMaxRetryDelay(maxRetryDelay)
.setInitialRpcTimeout(initialRpcTimeout)
.setRpcTimeoutMultiplier(rpcTimeoutMultiplier)
.setMaxRpcTimeout(maxRpcTimeout)
.setTotalTimeout(totalTimeout)
.build();
// Create a publisher instance with default settings bound to the topic
publisher = Publisher.newBuilder(topicName).setRetrySettings(retrySettings).build();
String message = "first message";
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
String messageId = messageIdFuture.get();
System.out.println("Published a message with retry settings: " + messageId);
} finally {
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID'
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a publisher client.
const publisherClient = new v1.PublisherClient({
// optional auth parameters
});
async function publishWithRetrySettings(projectId, topicNameOrId, data) {
const formattedTopic = publisherClient.projectTopicPath(
projectId,
topicNameOrId
);
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const messagesElement = {
data: dataBuffer,
};
const messages = [messagesElement];
// Build the request
const request = {
topic: formattedTopic,
messages: messages,
};
// Retry settings control how the publisher handles retryable failures. Default values are shown.
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
// The `backoffSettings` object lets you specify the behaviour of retries over time.
const retrySettings = {
retryCodes: [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
],
backoffSettings: {
// The initial delay time, in milliseconds, between the completion
// of the first failed request and the initiation of the first retrying request.
initialRetryDelayMillis: 100,
// The multiplier by which to increase the delay time between the completion
// of failed requests, and the initiation of the subsequent retrying request.
retryDelayMultiplier: 1.3,
// The maximum delay time, in milliseconds, between requests.
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
maxRetryDelayMillis: 60000,
// The initial timeout parameter to the request.
initialRpcTimeoutMillis: 5000,
// The multiplier by which to increase the timeout parameter between failed requests.
rpcTimeoutMultiplier: 1.0,
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
maxRpcTimeoutMillis: 600000,
// The total time, in milliseconds, starting from when the initial request is sent,
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
totalTimeoutMillis: 600000,
},
};
const [response] = await publisherClient.publish(request, {
retry: retrySettings,
});
console.log(`Message ${response.messageIds} published.`);
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from google import api_core
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure the retry settings. Defaults shown in comments are values applied
# by the library by default, instead of default values in the Retry object.
custom_retry = api_core.retry.Retry(
initial=0.250, # seconds (default: 0.1)
maximum=90.0, # seconds (default: 60.0)
multiplier=1.45, # default: 1.3
deadline=300.0, # seconds (default: 60.0)
predicate=api_core.retry.if_exception_type(
api_core.exceptions.Aborted,
api_core.exceptions.DeadlineExceeded,
api_core.exceptions.InternalServerError,
api_core.exceptions.ResourceExhausted,
api_core.exceptions.ServiceUnavailable,
api_core.exceptions.Unknown,
api_core.exceptions.Cancelled,
),
)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
for n in range(1, 10):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
future = publisher.publish(topic=topic_path, data=data, retry=custom_retry)
print(future.result())
print(f"Published messages with retry settings to {topic_path}.")
再試行の設定では、Pub/Sub クライアント ライブラリがパブリッシュ リクエストを再試行する方法を制御できます。クライアント ライブラリには、次の再試行設定があります。
- 初期リクエストのタイムアウト: クライアント ライブラリが最初のパブリッシュ リクエスト完了の待機を停止するまでの時間。
- 再試行遅延: リクエストがタイムアウトしてからクライアント ライブラリがリクエストの再試行を待機する時間。
- 合計タイムアウト: クライアント ライブラリがパブリッシュ リクエストの再試行を停止してからの時間。
パブリッシュ リクエストを再試行するには、初期リクエスト タイムアウトが合計タイムアウトよりも短い必要があります。たとえば、指数バックオフを使用している場合、クライアント ライブラリは次のようにリクエスト タイムアウトと再試行遅延を計算します。
- 各パブリッシュ リクエストの後に、リクエスト タイムアウトは、最大リクエスト タイムアウトまでリクエスト タイムアウトの乗数で増加します。
- 再試行のたびに、再試行の遅延が最大再試行遅延まで再試行遅延の乗数で増加します。
順序指定キーを使用してリクエストを再試行する
クライアント ライブラリがリクエストを再試行し、メッセージに順序指定キーがある場合、クライアント ライブラリは再試行の設定にかかわらずリクエストを繰り返し再試行します。
再試行不可能なエラーが発生すると、クライアント ライブラリはメッセージをパブリッシュせず、同じ順序指定キーが含まれる他のメッセージのパブリッシュを停止します。たとえば、存在しないトピックにパブリッシャーがメッセージを送信すると、再試行不可能なエラーが発生します。同じ順序指定キーが含まれるメッセージのパブリッシュを続行するには、パブリッシュを再開するためのメソッドを呼び出してから、パブリッシュをもう一度開始してください。
次のサンプルは、同じ順序指定キーを使用してメッセージのパブリッシュを再開する方法を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto& datum : data) {
auto const& da = datum; // workaround MSVC lambda capture confusion
auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
auto const msg = da.ordering_key + "#" + da.data;
auto id = f.get();
if (!id) {
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
C#
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C# の設定手順を実施してください。詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class ResumePublishSample
{
public async Task<int> PublishOrderedMessagesAsync(string projectId, string topicId, IEnumerable<(string, string)> keysAndMessages)
{
TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
var customSettings = new PublisherClient.Settings
{
EnableMessageOrdering = true
};
PublisherClient publisher = await new PublisherClientBuilder
{
TopicName = topicName,
Settings = customSettings
}.BuildAsync();
int publishedMessageCount = 0;
var publishTasks = keysAndMessages.Select(async keyAndMessage =>
{
try
{
string message = await publisher.PublishAsync(keyAndMessage.Item1, keyAndMessage.Item2);
Console.WriteLine($"Published message {message}");
Interlocked.Increment(ref publishedMessageCount);
}
catch (Exception exception)
{
Console.WriteLine($"An error occurred when publishing message {keyAndMessage.Item2}: {exception.Message}");
publisher.ResumePublish(keyAndMessage.Item1);
}
});
await Task.WhenAll(publishTasks);
return publishedMessageCount;
}
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
"google.golang.org/api/option"
)
func resumePublishWithOrderingKey(w io.Writer, projectID, topicID string) {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
// Sending messages to the same region ensures they are received in order
// even when multiple publishers are used.
client, err := pubsub.NewClient(ctx, projectID,
option.WithEndpoint("us-east1-pubsub.googleapis.com:443"))
if err != nil {
fmt.Fprintf(w, "pubsub.NewClient: %v", err)
return
}
defer client.Close()
t := client.Topic(topicID)
t.EnableMessageOrdering = true
key := "some-ordering-key"
res := t.Publish(ctx, &pubsub.Message{
Data: []byte("some-message"),
OrderingKey: key,
})
_, err = res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Printf("Failed to publish: %s\n", err)
// Resume publish on an ordering key that has had unrecoverable errors.
// After such an error publishes with this ordering key will fail
// until this method is called.
t.ResumePublish(key)
}
fmt.Fprint(w, "Published a message with ordering key successfully\n")
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class ResumePublishWithOrderingKeys {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// Choose an existing topic.
String topicId = "your-topic-id";
resumePublishWithOrderingKeysExample(projectId, topicId);
}
public static void resumePublishWithOrderingKeysExample(String projectId, String topicId)
throws IOException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
// Create a publisher and set message ordering to true.
Publisher publisher =
Publisher.newBuilder(topicName)
.setEnableMessageOrdering(true)
.setEndpoint("us-east1-pubsub.googleapis.com:443")
.build();
try {
Map<String, String> messages = new LinkedHashMap<String, String>();
messages.put("message1", "key1");
messages.put("message2", "key2");
messages.put("message3", "key1");
messages.put("message4", "key2");
for (Map.Entry<String, String> entry : messages.entrySet()) {
ByteString data = ByteString.copyFromUtf8(entry.getKey());
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder().setData(data).setOrderingKey(entry.getValue()).build();
ApiFuture<String> future = publisher.publish(pubsubMessage);
// Add an asynchronous callback to handle publish success / failure.
ApiFutures.addCallback(
future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// Details on the API exception.
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + pubsubMessage.getData());
// (Beta) Must call resumePublish to reset key and continue publishing with order.
publisher.resumePublish(pubsubMessage.getOrderingKey());
}
@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic).
System.out.println(pubsubMessage.getData() + " : " + messageId);
}
},
MoreExecutors.directExecutor());
}
} finally {
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});
// const orderingKey = 'key1';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function resumePublish(topicNameOrId, data, orderingKey) {
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
const publishOptions = {
messageOrdering: true,
};
// Publishes the message
const publisher = pubSubClient.topic(topicNameOrId, publishOptions);
try {
const message = {
data: dataBuffer,
orderingKey: orderingKey,
};
const messageId = await publisher.publishMessage(message);
console.log(`Message ${messageId} published.`);
return messageId;
} catch (e) {
console.log(`Could not publish: ${e}`);
publisher.resumePublishing(orderingKey);
return null;
}
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from google.cloud import pubsub_v1
# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
publisher_options = pubsub_v1.types.PublisherOptions(enable_message_ordering=True)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": "us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options, client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)
for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data, ordering_key=ordering_key)
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)
print(f"Resumed publishing messages with ordering keys to {topic_path}.")
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_id, async: {
max_bytes: 1_000_000,
max_messages: 20
}
topic.enable_message_ordering!
10.times do |i|
topic.publish_async "This is message \##{i}.",
ordering_key: "ordering-key" do |result|
if result.succeeded?
puts "Message \##{i} successfully published."
else
puts "Message \##{i} failed to publish"
# Allow publishing to continue on "ordering-key" after processing the
# failure.
topic.resume_publish "ordering-key"
end
end
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop!
フロー制御
パブリッシャー クライアントは、Pub/Sub サービスへのデータ送信よりも速くメッセージをパブリッシュしようとする場合があります。クライアントは、次のような多くの要因によって制限されます。
- マシンの CPU、RAM、ネットワーク容量
- ネットワーク設定(未処理のリクエストの数、使用可能な帯域幅など)
- 各パブリッシュ リクエストのレイテンシ。主に、Pub/Sub サービス、クライアント、Google Cloud 間のネットワーク接続によって決まります
パブリッシュ リクエストのレートがこれらの上限を超えると、リクエストは DEADLINE_EXCEEDED
エラーで失敗するまでメモリに蓄積されます。これは特に、数万のメッセージがループ内にパブリッシュされ、数千のリクエストがミリ秒単位で生成される場合に発生する可能性があります。
この問題を診断するには、Monitoring でサーバー側の指標を確認します。DEADLINE_EXCEEDED
で失敗したリクエストは表示されず、成功したリクエストのみが表示されます。成功したリクエストのレートによりクライアント マシンのスループット容量が分かります。また、フロー制御を構成するベースラインとなります。
モニタリング ページに移動
フローレートの問題を軽減するには、パブリッシャー クライアントにフロー制御を構成して、パブリッシュ リクエストのレートを制限します。未処理のリクエストに割り当てる最大バイト数と、許可される未処理メッセージの最大数を構成できます。これらの上限は、クライアント マシンのスループット容量に応じて設定します。
パブリッシャーのフロー制御は、以下の言語の Pub/Sub クライアント ライブラリを介して利用できます。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// Configure the publisher to block if either (1) 100 or more messages, or
// (2) messages with 100MiB worth of data have not been acknowledged by the
// service. By default the publisher never blocks, and its capacity is only
// limited by the system's memory.
auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
std::move(topic),
Options{}
.set<pubsub::MaxPendingMessagesOption>(100)
.set<pubsub::MaxPendingBytesOption>(100 * 1024 * 1024L)
.set<pubsub::FullPublisherActionOption>(
pubsub::FullPublisherAction::kBlocks)));
std::vector<future<void>> ids;
for (char const* data : {"a", "b", "c"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"strconv"
"sync"
"sync/atomic"
"cloud.google.com/go/pubsub"
)
func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
t := client.Topic(topicID)
t.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
MaxOutstandingMessages: 100, // default 1000
MaxOutstandingBytes: 10 * 1024 * 1024, // default 0 (unlimited)
LimitExceededBehavior: pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
}
var wg sync.WaitGroup
var totalErrors uint64
numMsgs := 1000
// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
for i := 0; i < numMsgs; i++ {
wg.Add(1)
result := t.Publish(ctx, &pubsub.Message{
Data: []byte("message #" + strconv.Itoa(i)),
})
go func(i int, res *pubsub.PublishResult) {
fmt.Fprintf(w, "Publishing message %d\n", i)
defer wg.Done()
// The Get method blocks until a server-generated ID or
// an error is returned for the published message.
_, err := res.Get(ctx)
if err != nil {
// Error handling code can be added here.
fmt.Fprintf(w, "Failed to publish: %v", err)
atomic.AddUint64(&totalErrors, 1)
return
}
}(i, result)
}
wg.Wait()
if totalErrors > 0 {
return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
}
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithFlowControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithFlowControlExample(projectId, topicId);
}
public static void publishWithFlowControlExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Configure how many messages the publisher client can hold in memory
// and what to do when messages exceed the limit.
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
// Block more messages from being published when the limit is reached. The other
// options are Ignore (or continue publishing) and ThrowException (or error out).
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB
.setMaxOutstandingElementCount(100L) // 100 messages
.build();
// By default, messages are not batched.
BatchingSettings batchingSettings =
BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();
publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();
// Publish 1000 messages in quick succession may be constrained by publisher flow control.
for (int i = 0; i < 1000; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println(
"Published " + messageIds.size() + " messages with flow control settings.");
if (publisher != null) {
// When finished with the publisher, shut down to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Node.js
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
/**
* TODO(developer): Uncomment this variable before running the sample.
*/
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function publishWithFlowControl(topicNameOrId) {
// Create publisher options
const options = {
flowControlOptions: {
maxOutstandingMessages: 50,
maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
},
};
// Get a publisher.
const topic = pubSubClient.topic(topicNameOrId, options);
// For flow controlled publishing, we'll use a publisher flow controller
// instead of `topic.publish()`.
const flow = topic.flowControlled();
// Publish messages in a fast loop.
const testMessage = {data: Buffer.from('test!')};
for (let i = 0; i < 1000; i++) {
// You can also just `await` on `publish()` unconditionally, but if
// you want to avoid pausing to the event loop on each iteration,
// you can manually check the return value before doing so.
const wait = flow.publish(testMessage);
if (wait) {
await wait;
}
}
// Wait on any pending publish requests. Note that you can call `all()`
// earlier if you like, and it will return a Promise for all messages
// that have been sent to `flowController.publish()` so far.
const messageIds = await flow.all();
console.log(`Published ${messageIds.length} with flow control settings.`);
}
Python
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Python の設定手順を実施してください。詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
LimitExceededBehavior,
PublisherOptions,
PublishFlowControl,
)
# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
message_limit=100, # 100 messages
byte_limit=10 * 1024 * 1024, # 10 MiB
limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []
# Resolve the publish future in a separate thread.
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
message_id = publish_future.result()
print(message_id)
# Publish 1000 messages in quick succession may be constrained by
# publisher flow control.
for n in range(1, 1000):
data_str = f"Message number {n}"
# Data must be a bytestring
data = data_str.encode("utf-8")
publish_future = publisher.publish(topic_path, data)
# Non-blocking. Allow the publisher client to batch messages.
publish_future.add_done_callback(callback)
publish_futures.append(publish_future)
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with flow control settings to {topic_path}.")
同時実行制御
同時実行のサポートは、プログラミング言語によって異なります。詳細については、API リファレンス ドキュメントをご覧ください。
以下のサンプルでは、パブリッシャーの同時実行を制御する方法を示しています。
C++
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の C++ の設定手順を実施してください。詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// Override the default number of background (I/O) threads. By default the
// library uses `std::thread::hardware_concurrency()` threads.
auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
auto publisher = pubsub::Publisher(
pubsub::MakePublisherConnection(std::move(topic), std::move(options)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
// projectID := "my-project-id"
// topicID := "my-topic"
// msg := "Hello World"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
t := client.Topic(topicID)
t.PublishSettings.NumGoroutines = 1
result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
return fmt.Errorf("Get: %v", err)
}
fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
return nil
}
Java
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
public class PublishWithConcurrencyControlExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
publishWithConcurrencyControlExample(projectId, topicId);
}
public static void publishWithConcurrencyControlExample(String projectId, String topicId)
throws IOException, ExecutionException, InterruptedException {
TopicName topicName = TopicName.of(projectId, topicId);
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();
try {
// Provides an executor service for processing messages. The default
// `executorProvider` used by the publisher has a default thread count of
// 5 * the number of processors available to the Java virtual machine.
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
// `setExecutorProvider` configures an executor for the publisher.
publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();
// schedule publishing one message at a time : messages get automatically batched
for (int i = 0; i < 100; i++) {
String message = "message " + i;
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
// Once published, returns a server-assigned message id (unique within the topic)
ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
messageIdFutures.add(messageIdFuture);
}
} finally {
// Wait on any pending publish requests.
List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();
System.out.println("Published " + messageIds.size() + " messages with concurrency control.");
if (publisher != null) {
// When finished with the publisher, shutdown to free up resources.
publisher.shutdown();
publisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
}
}
Ruby
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Ruby の設定手順を実施してください。詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# topic_id = "your-topic-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
topic = pubsub.topic topic_id, async: {
threads: {
# Use exactly one thread for publishing message and exactly one thread
# for executing callbacks
publish: 1,
callback: 1
}
}
topic.publish_async "This is a test message." do |result|
raise "Failed to publish the message." unless result.succeeded?
puts "Message published asynchronously."
end
# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
パブリッシャーをモニタリングする
Cloud Monitoring には、トピックをモニタリングするための指標がいくつか用意されています。
トピックをモニタリングして正常なパブリッシャーを維持するには、正常なパブリッシャーの維持をご覧ください。
次のステップ
Pub/Sub がメッセージ データを保存するロケーションを制限するには、Pub/Sub リソースのロケーションの制限をご覧ください。
メッセージの受信の詳細については、サブスクリプション タイプの選択をご覧ください。