パブリッシャー ガイド

パブリッシャー アプリケーションによって、メッセージが作成され、トピックに送信されます。Pub/Sub では、既存のサブスクライバーに対して、最低 1 回のメッセージ配信と、ベストエフォートの配信を行うことができます(詳細については、サブスクライバーの概要をご覧ください)。パブリッシャー アプリケーションの一般的な流れは次のとおりです。

  1. データを含むメッセージを作成します。
  2. リクエストを Pub/Sub Server に送信し、目的のトピックにメッセージを公開します。

トピックの作成と管理の詳細については、トピックとサブスクリプションの管理をご覧ください。

セットアップ

ご使用のプログラミング言語で環境をセットアップする方法については、クライアント ライブラリのスタートガイドをご覧ください。

トピックへのメッセージの公開

REST で JSON を使用する場合、メッセージ データを base64 でエンコードする必要があります。デコード後のリクエスト全体のサイズ(メッセージを含む)は 10 MB 未満にする必要があります。メッセージ ペイロードは空にできません。空でないデータ フィールドか、1 つ以上の属性が含まれている必要があります。

クライアント ライブラリは、選択したプログラミング言語に応じて、メッセージを同期的または非同期的に公開できます。非同期的に公開する場合はアプリケーションのバッチ処理が可能になり、スループットが高まります。

すべてのクライアント ライブラリでは、非同期的なメッセージの公開がサポートされます。同期的にメッセージを公開したい場合に、選択したプログラミング言語のクライアント ライブラリでそれがサポートされているか確認するには、その言語の API リファレンス ドキュメントをご覧ください。メッセージの公開が成功すると、サーバーによって生成された ID(トピック内で一意)が返されます。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}

レスポンス:

200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

PublisherServiceApiClient publisherClient = PublisherServiceApiClient.Create();
PublisherClient publisher = PublisherClient.Create(
    new TopicName(projectId, topicId), new[] { publisherClient });
var publishTasks = new List<Task<string>>();
// SimplePublisher collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  // schedule publishing one message at a time : messages get automatically batched
  for (String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
    messageIdFutures.add(messageIdFuture);
  }
} finally {
  // wait on any pending publish requests.
  List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

  for (String messageId : messageIds) {
    System.out.println("published with message ID: " + messageId);
  }

  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
  }
}

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher()
  .publish(dataBuffer)
  .then(messageId => {
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

PHP

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def publish_messages(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  if result.succeeded?
    puts "Message published asynchronously."
  else
    raise "Failed to publish the message."
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

バッチ処理によるレイテンシとスループットの調整

Pub/Sub クライアント ライブラリは、複数のメッセージをバッチにまとめて 1 回の呼び出しでサービスに送信します。バッチのサイズを大きくすると、メッセージのスループット(CPU あたりのメッセージ送信量)が向上します。バッチ処理の費用で問題になるのは、個々のメッセージのレイテンシです。バッチがいっぱいになり、ネットワーク経由で送信可能になるまで、メッセージはメモリ内のキューに格納されています。レイテンシを最小にするには、バッチ処理をオフにする必要があります。リクエスト / レスポンスのシーケンスで単一のメッセージを発行するアプリケーションの場合、この点は特に重要になります。たとえば、Cloud Functions や App Engine でサーバーレスのイベント駆動型アプリケーションで問題になります。

メッセージは、リクエストのサイズ(バイト単位)、メッセージの数、時間に応じてバッチ処理されます。このサンプルに示すように、デフォルト設定をオーバーライドできます。

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

PublisherServiceApiClient publisherClient = PublisherServiceApiClient.Create();
PublisherClient publisher = PublisherClient.Create(
    new TopicName(projectId, topicId), new[] { publisherClient },
    new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
var publishTasks = new List<Task<string>>();
// SimplePublisher collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1kb
long messageCountBatchSize = 10L; // default : 100

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
    .setElementCountThreshold(messageCountBatchSize)
    .setRequestByteThreshold(requestBytesThreshold)
    .setDelayThreshold(publishDelayThreshold)
    .build();

Publisher publisher = Publisher.newBuilder(topicName)
    .setBatchingSettings(batchingSettings).build();

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer)
  .then(results => {
    const messageId = results[0];
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def publish_messages_with_batch_settings(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic with batch settings."""
    # Configure the batch to publish once there is one kilobyte of data or
    # 1 second has passed.
    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  :max_bytes => 1000000,
  :max_messages => 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

リクエストの再試行

公開に失敗した場合、再試行を妨げるエラーが発生しない限り自動的にリクエストが再試行されます。このサンプルコードは、再試行設定をカスタマイズしたパブリッシャーを作成しています(クライアント ライブラリによっては、再試行設定のカスタマイズがサポートされない場合があります。選択した言語の API リファレンス ドキュメントをご覧ください)。

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRetryDelay(retryDelay)
    .setRetryDelayMultiplier(retryDelayMultiplier)
    .setMaxRetryDelay(maxRetryDelay)
    .build();

Publisher publisher = Publisher.newBuilder(topicName)
    .setRetrySettings(retrySettings).build();

同時実行制御

同時実行のサポートは、プログラミング言語によって異なります。詳しくは API リファレンス ドキュメントをご覧ください。

以下のサンプルでは、パブリッシャーの同時実行を制御する方法を示しています。

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// create a publisher with a single threaded executor
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(1).build();
Publisher publisher = Publisher.newBuilder(topicName)
    .setExecutorProvider(executorProvider).build();

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  :threads => {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    :publish => 1,
    :callback => 1
  }
}
topic.publish_async "This is a test message." do |result|
  if result.succeeded?
    puts "Message published asynchronously."
  else
    raise "Failed to publish the message."
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...