パブリッシャー ガイド

パブリッシャー アプリケーションによって、メッセージが作成され、トピックに送信されます。 Pub/Sub では、既存のサブスクライバーに対して、最低 1 回のメッセージ配信と、ベストエフォートの配信を行うことができます(詳しくは、サブスクライバーの概要をご覧ください)。パブリッシャー アプリケーションの一般的な流れは次のとおりです。

  1. データを含むメッセージを作成します。
  2. リクエストを Pub/Sub Server に送信し、目的のトピックにメッセージを公開します。

トピックの作成と管理の詳細については、トピックとサブスクリプションの管理をご覧ください。

セットアップ

ご使用のプログラミング言語で環境をセットアップする方法については、クライアント ライブラリのスタートガイドをご覧ください。

トピックへのメッセージの公開

REST で JSON を使用する場合、メッセージ データを base64 でエンコードする必要があります。デコード後のリクエスト全体のサイズ(メッセージを含む)は 10 MB 未満にする必要があります。メッセージ ペイロードは空にできません。空でないデータ フィールドか、1 つ以上の属性が含まれている必要があります。

クライアント ライブラリは、選択したプログラミング言語に応じて、メッセージを同期的または非同期的に公開できます。非同期的に公開する場合はアプリケーションのバッチ処理が可能になり、スループットが高まります。

すべてのクライアント ライブラリでは、非同期的なメッセージの公開がサポートされます。同期的にメッセージを公開したい場合に、選択したプログラミング言語のクライアント ライブラリでそれがサポートされているか確認するには、その言語の API リファレンス ドキュメントをご覧ください。メッセージの公開が成功すると、サーバーによって生成された ID(トピック内で一意)が返されます。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}

レスポンス:

200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

PublisherClient publisherClient = PublisherClient.Create();
SimplePublisher publisher = SimplePublisher.Create(
    new TopicName(projectId, topicId), new[] { publisherClient });
var publishTasks = new List<Task<string>>();
// SimplePublisher collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

TopicName topicName = TopicName.create("my-project-id", "my-topic-id");
Publisher publisher = null;
List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.defaultBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  // schedule publishing one message at a time : messages get automatically batched
  for (String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
    messageIdFutures.add(messageIdFuture);
  }
} finally {
  // wait on any pending publish requests.
  List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

  for (String messageId : messageIds) {
    System.out.println("published with message ID: " + messageId);
  }

  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
  }
}

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

function publishMessage (topicName, data) {
  // Instantiates a client
  const pubsub = PubSub();

  // References an existing topic, e.g. "my-topic"
  const topic = pubsub.topic(topicName);

  // Create a publisher for the topic (which can include additional batching configuration)
  const publisher = topic.publisher();

  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);
  return publisher.publish(dataBuffer)
    .then((results) => {
      const messageId = results[0];

      console.log(`Message ${messageId} published.`);

      return messageId;
    });
}

PHP

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def publish_messages(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

pubsub = Google::Cloud::Pubsub.new project: "my-gcp-project-id"
topic  = pubsub.topic "my-topic"

topic.publish "A Message"

バッチ処理の設定

Pub/Sub クライアント ライブラリを使用すると、メッセージのバッチ処理方法を言語ごとに自動的に設定できます。アプリケーションの要件に合わせて値を設定し、メッセージのレイテンシとスループットを調整できます。

メッセージは、リクエストのサイズ(バイト単位)、メッセージの数、時間に応じてバッチ処理されます。このサンプルに示すように、デフォルト設定をオーバーライドできます。

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

PublisherClient publisherClient = PublisherClient.Create();
SimplePublisher publisher = SimplePublisher.Create(
    new TopicName(projectId, topicId), new[] { publisherClient },
    new SimplePublisher.Settings()
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
var publishTasks = new List<Task<string>>();
// SimplePublisher collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1kb
long messageCountBatchSize = 10L; // default : 100

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
    .setElementCountThreshold(messageCountBatchSize)
    .setRequestByteThreshold(requestBytesThreshold)
    .setDelayThreshold(publishDelayThreshold)
    .build();

Publisher publisher = Publisher.defaultBuilder(topicName)
    .setBatchingSettings(batchingSettings).build();

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def publish_messages_with_batch_settings(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic with batch settings."""
    # Configure the batch to publish once there is one kilobyte of data or
    # 1 second has passed.
    batch_settings = pubsub_v1.types.BatchSettings(
        max_bytes=1024,  # One kilobyte
        max_latency=1,  # One second
    )
    publisher = pubsub_v1.PublisherClient(batch_settings)
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 10):
        data = u'Message number {}'.format(n)
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

リクエストの再試行

公開に失敗した場合、再試行を妨げるエラーが発生しない限り自動的にリクエストが再試行されます。このサンプルコードは、再試行設定をカスタマイズしたパブリッシャーを作成しています(クライアント ライブラリによっては、再試行設定のカスタマイズがサポートされない場合があります。選択した言語の API リファレンス ドキュメントをご覧ください)。

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRetryDelay(retryDelay)
    .setRetryDelayMultiplier(retryDelayMultiplier)
    .setMaxRetryDelay(maxRetryDelay)
    .build();

Publisher publisher = Publisher.defaultBuilder(topicName)
    .setRetrySettings(retrySettings).build();

同時実行制御

同時実行のサポートは、プログラミング言語によって異なります。詳しくは API リファレンス ドキュメントをご覧ください。

以下のサンプルでは、パブリッシャーの同時実行を制御する方法を示しています。

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// create a publisher with a single threaded executor
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(1).build();
Publisher publisher = Publisher.defaultBuilder(topicName)
    .setExecutorProvider(executorProvider).build();

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

フィードバックを送信...