パブリッシャー ガイド

パブリッシャー アプリケーションによって、メッセージが作成され、トピックに送信されます。Cloud Pub/Sub では、既存のサブスクライバーに対して、最低 1 回のメッセージ配信と、ベスト エフォートの配信を行うことができます(詳細については、サブスクライバーの概要をご覧ください)。パブリッシャー アプリケーションの一般的な流れは次のとおりです。

  1. データを含むメッセージを作成します。
  2. Cloud Pub/Sub サーバーにリクエストを送信して、メッセージを目的のトピックに公開します。

トピックの作成と管理の詳細については、トピックとサブスクリプションの管理をご覧ください。

セットアップ

ご使用のプログラミング言語で環境をセットアップする方法については、クライアント ライブラリのスタートガイドをご覧ください。

トピックへのメッセージの公開

REST で JSON を使用する場合、メッセージ データを base64 でエンコードする必要があります。デコード後のリクエスト全体のサイズ(メッセージを含む)は 10 MB 未満にする必要があります。メッセージ ペイロードは空にできません。空でないデータ フィールドか、1 つ以上の属性が含まれている必要があります。

クライアント ライブラリは、選択したプログラミング言語に応じて、メッセージを同期的または非同期的に公開できます。非同期的に公開する場合はアプリケーションのバッチ処理が可能になり、スループットが高まります。

すべてのクライアント ライブラリでは、非同期的なメッセージの公開がサポートされます。同期的にメッセージを公開したい場合に、選択したプログラミング言語のクライアント ライブラリでそれがサポートされているか確認するには、その言語の API リファレンス ドキュメントをご覧ください。メッセージの公開が成功すると、サーバーによって生成された ID(トピック内で一意)が返されます。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/topics/mytopic:publish
{
  "messages": [
    {
      "attributes": {
        "key": "iana.org/language_tag",
        "value": "en"
      },
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ=="
    }
  ]
}

レスポンス:

200 OK
{
  "messageIds": [
    "19916711285"
  ]
}

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

PublisherServiceApiClient publisherClient = PublisherServiceApiClient.Create();
PublisherClient publisher = PublisherClient.Create(
    new TopicName(projectId, topicId), new[] { publisherClient });

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

var wg sync.WaitGroup
var totalErrors uint64
t := client.Topic(topic)

for i := 0; i < n; i++ {
	result := t.Publish(ctx, &pubsub.Message{
		// data must be a ByteString
		Data: []byte("Message " + strconv.Itoa(i)),
	})

	wg.Add(1)
	go func(i int, res *pubsub.PublishResult) {
		defer wg.Done()
		// The Get method blocks until a server-generated ID or
		// an error is returned for the published message.
		id, err := res.Get(ctx)
		if err != nil {
			// Error handling code can be added here.
			log.Output(1, fmt.Sprintf("Failed to publish: %v", err))
			atomic.AddUint64(&totalErrors, 1)
			return
		}
		fmt.Printf("Published message %d; msg ID: %v\n", i, id)
	}(i, result)
}

wg.Wait()

if totalErrors > 0 {
	return errors.New(
		fmt.Sprintf("%d of %d messages did not publish successfully",
			totalErrors, n))
}
return nil

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

ProjectTopicName topicName = ProjectTopicName.of("my-project-id", "my-topic-id");
Publisher publisher = null;

try {
  // Create a publisher instance with default settings bound to the topic
  publisher = Publisher.newBuilder(topicName).build();

  List<String> messages = Arrays.asList("first message", "second message");

  for (final String message : messages) {
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

    // Once published, returns a server-assigned message id (unique within the topic)
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Add an asynchronous callback to handle success / failure
    ApiFutures.addCallback(future, new ApiFutureCallback<String>() {

      @Override
      public void onFailure(Throwable throwable) {
        if (throwable instanceof ApiException) {
          ApiException apiException = ((ApiException) throwable);
          // details on the API exception
          System.out.println(apiException.getStatusCode().getCode());
          System.out.println(apiException.isRetryable());
        }
        System.out.println("Error publishing message : " + message);
      }

      @Override
      public void onSuccess(String messageId) {
        // Once published, returns server-assigned message ids (unique within the topic)
        System.out.println(messageId);
      }
    });
  }
} finally {
  if (publisher != null) {
    // When finished with the publisher, shutdown to free up resources.
    publisher.shutdown();
    publisher.awaitTermination(1, TimeUnit.MINUTES);
  }
}

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher()
  .publish(dataBuffer)
  .then(messageId => {
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

PHP

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の PHP の設定手順に従ってください。詳細については、Cloud Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

def callback(message_future):
    # When timeout is unspecified, the exception method waits indefinitely.
    if message_future.exception(timeout=30):
        print('Publishing message on {} threw an Exception {}.'.format(
            topic_name, message_future.exception()))
    else:
        print(message_future.result())

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a Future.
    message_future = publisher.publish(topic_path, data=data)
    message_future.add_done_callback(callback)

print('Published message IDs:')

# We must keep the main thread from exiting to allow it to process
# messages in the background.
while True:
    time.sleep(60)

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish_async "This is a test message." do |result|
  if result.succeeded?
    puts "Message published asynchronously."
  else
    raise "Failed to publish the message."
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

カスタム属性

カスタム属性を Pub/Sub メッセージにメタデータとして埋め込むことができます。属性はテキスト文字列またはバイト文字列にできます。

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);
// Add two custom attributes, origin and username, to the message
const customAttributes = {
  origin: 'nodejs-sample',
  username: 'gcp',
};

pubsub
  .topic(topicName)
  .publisher()
  .publish(dataBuffer, customAttributes)
  .then(messageId => {
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # Add two attributes, origin and username, to the message
    publisher.publish(
        topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
# Add two attributes, origin and username, to the message
topic.publish_async "This is a test message.",
                    origin: "ruby-sample",
                    username: "gcp" do |result|
  if result.succeeded?
    puts "Message with custom attributes published asynchronously."
  else
    raise "Failed to publish the message."
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

バッチ処理によるレイテンシとスループットの調整

Cloud Pub/Sub クライアント ライブラリは、複数のメッセージをバッチにまとめて 1 回の呼び出しでサービスに送信します。バッチのサイズを大きくすると、メッセージのスループット(CPU あたりのメッセージ送信量)が向上します。バッチ処理の費用で問題になるのは、個々のメッセージのレイテンシです。バッチがいっぱいになり、ネットワーク経由で送信可能になるまで、メッセージはメモリ内のキューに格納されています。レイテンシを最小にするには、バッチ処理をオフにする必要があります。リクエスト / レスポンスのシーケンスで単一のメッセージを発行するアプリケーションの場合、この点は特に重要になります。このパターンの問題がよく起こるのは、Cloud Functions や App Engine を使用したサーバーレスのイベント駆動型アプリケーションです。

メッセージは、リクエストのサイズ(バイト単位)、メッセージの数、時間に応じてバッチ処理されます。このサンプルに示すように、デフォルト設定をオーバーライドできます。

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

PublisherServiceApiClient publisherClient = PublisherServiceApiClient.Create();
PublisherClient publisher = PublisherClient.Create(
    new TopicName(projectId, topicId), new[] { publisherClient },
    new PublisherClient.Settings
    {
        BatchingSettings = new Google.Api.Gax.BatchingSettings(
            elementCountThreshold: 100,
            byteCountThreshold: 10240,
            delayThreshold: TimeSpan.FromSeconds(3))
    });
var publishTasks = new List<Task<string>>();
// PublisherClient collects messages into appropriately sized
// batches.
foreach (string text in messageTexts)
{
    // Record all publishing Tasks. When each Task completes
    // without error, the message has been successfully published.
    // In real use don't simply store all publish Tasks forever;
    // it is usually appropriate to keep a reference to the Task
    // only until it has completed.
    publishTasks.Add(publisher.PublishAsync(text));
}
foreach (var task in publishTasks)
{
    Console.WriteLine("Published message {0}", task.Result);
}

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	ByteThreshold:  5000,
	CountThreshold: 10,
	DelayThreshold: 100 * time.Millisecond,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

// Batch settings control how the publisher batches messages
long requestBytesThreshold = 5000L; // default : 1kb
long messageCountBatchSize = 10L; // default : 100

Duration publishDelayThreshold = Duration.ofMillis(100); // default : 1 ms

// Publish request get triggered based on request size, messages count & time since last publish
BatchingSettings batchingSettings = BatchingSettings.newBuilder()
    .setElementCountThreshold(messageCountBatchSize)
    .setRequestByteThreshold(requestBytesThreshold)
    .setDelayThreshold(publishDelayThreshold)
    .build();

Publisher publisher = Publisher.newBuilder(topicName)
    .setBatchingSettings(batchingSettings).build();

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });
// const maxMessages = 10;
// const maxWaitTime = 10000;

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

pubsub
  .topic(topicName)
  .publisher({
    batching: {
      maxMessages: maxMessages,
      maxMilliseconds: maxWaitTime,
    },
  })
  .publish(dataBuffer)
  .then(results => {
    const messageId = results[0];
    console.log(`Message ${messageId} published.`);
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

# Configure the batch to publish once there is one kilobyte of data or
# 1 second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
    max_bytes=1024,  # One kilobyte
    max_latency=1,  # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    publisher.publish(topic_path, data=data)

print('Published messages.')

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish do |batch|
  10.times do |i|
    batch.publish "This is message \##{i}."
  end
end

puts "Messages published in batch."
# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

# Start sending messages in one request once the size of all queued messages
# reaches 1 MB or the number of queued messages reaches 20
topic = pubsub.topic topic_name, async: {
  :max_bytes => 1000000,
  :max_messages => 20
}
10.times do |i|
  topic.publish_async "This is message \##{i}."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Messages published asynchronously in batch."

リクエストの再試行

公開に失敗した場合、再試行を妨げるエラーが発生しない限り自動的にリクエストが再試行されます。このサンプルコードは、再試行設定をカスタマイズしたパブリッシャーを作成しています(クライアント ライブラリによっては、再試行設定のカスタマイズがサポートされない場合があります。選択した言語の API リファレンス ドキュメントをご覧ください)。

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

// Retry settings control how the publisher handles retryable failures
Duration retryDelay = Duration.ofMillis(100); // default : 1 ms
double retryDelayMultiplier = 2.0; // back off for repeated failures
Duration maxRetryDelay = Duration.ofSeconds(5); // default : 10 seconds
Duration totalTimeout = Duration.ofSeconds(1); // default: 0
Duration initialRpcTimeout = Duration.ofSeconds(1); // default: 0
Duration maxRpcTimeout = Duration.ofSeconds(10); // default: 0

RetrySettings retrySettings = RetrySettings.newBuilder()
    .setInitialRetryDelay(retryDelay)
    .setRetryDelayMultiplier(retryDelayMultiplier)
    .setMaxRetryDelay(maxRetryDelay)
    .setTotalTimeout(totalTimeout)
    .setInitialRpcTimeout(initialRpcTimeout)
    .setMaxRpcTimeout(maxRpcTimeout)
    .build();

Publisher publisher = Publisher.newBuilder(topicName)
    .setRetrySettings(retrySettings).build();

同時実行制御

同時実行のサポートは、プログラミング言語によって異なります。詳しくは API リファレンス ドキュメントをご覧ください。

以下のサンプルでは、パブリッシャーの同時実行を制御する方法を示しています。

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

t := client.Topic(topic)
t.PublishSettings = pubsub.PublishSettings{
	NumGoroutines: 1,
}
result := t.Publish(ctx, &pubsub.Message{Data: msg})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

// create a publisher with a single threaded executor
ExecutorProvider executorProvider = InstantiatingExecutorProvider.newBuilder()
    .setExecutorThreadCount(1).build();
Publisher publisher = Publisher.newBuilder(topicName)
    .setExecutorProvider(executorProvider).build();

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

# When you publish a message, the client returns a Future. This Future
# can be used to track when the message is published.
futures = []

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    message_future = publisher.publish(topic_path, data=data)
    futures.append(message_future)

print('Published message IDs:')
for future in futures:
    # result() blocks until the message is published.
    print(future.result())

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name, async: {
  :threads => {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    :publish => 1,
    :callback => 1
  }
}
topic.publish_async "This is a test message." do |result|
  if result.succeeded?
    puts "Message published asynchronously."
  else
    raise "Failed to publish the message."
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...