pull を使用したメッセージの受信

Cloud Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。

非同期 pull

非同期 pull を使用すると、アプリケーションのスループットを向上させることができます。アプリケーションで新しいメッセージをブロックする必要はありません。メッセージは、アプリケーション内で長時間実行されているメッセージ リスナーを使って受信できます。以下の例で示すとおり、同時に確認応答されるメッセージは 1 回に 1 つだけです。Java、Python、.NET、Go、Ruby クライアントでは、streamingPull サービス API を使用して、非同期クライアント API を効率的に実装します。

クライアント ライブラリによっては、非同期のメッセージ pull に対応していない場合があります。同期 pull によるメッセージの取得については、同期 pull をご覧ください。

詳細については、ご使用のプログラミング言語の API リファレンス ドキュメントをご覧ください。

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。 詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

カスタム属性の処理

このサンプルでは、非同期でメッセージを pull し、メタデータからカスタム属性を取得する方法を示します。

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

エラーのリッスン

次のサンプルでは、メッセージの登録中に発生するエラーを処理します。

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。 詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

from google.cloud import pubsub_v1

# TODO project           = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

メッセージのフロー制御

Cloud Pub/Sub がクライアントにメッセージを送信するよりも、サブスクライバー クライアントがメッセージを処理して確認応答するほうが遅くなる場合があります。その場合、次の状況になります。

  • 受信メッセージの量がクライアントの処理能力を超えたときに、ネットワーク上の別のクライアントにその処理能力がある場合、メッセージのバックログが発生する可能性があります。2 番目のクライアントによってサブスクリプションのバックログが少なくなりますが、最初のクライアントが受信メッセージのリースを保持しているため、この処理を行う機会がありません。このため、最初のクライアントにメッセージが溜まり、全体の処理速度が低下します。
  • クライアント ライブラリは処理が遅れているメッセージに対する確認応答期限を繰り返し延長するため、こうしたメッセージによってメモリ、CPU、帯域幅のリソースが消費され続けます。このため、サブスクライバー クライアントのリソース(メモリなど)が不足する可能性があります。メッセージ処理のスループットが低下し、レイテンシが長くなる可能性があります。

この問題を軽減するには、サブスクライバーのフロー制御機能を使用して、サブスクライバーがメッセージを受信する速度を制御します。以下に、フロー制御機能のサンプルを示します。

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。 詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

sub := client.Subscription(subName)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
const [newSubscription] = await subscription.get({
  autoCreate: true,
});
console.log(
  `Subscription ${
    newSubscription.name
  } created with a maximum of ${maxInProgress} unprocessed messages.`
);

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
通常、フロー制御が必要な場合、公開されるメッセージの量が消費されるメッセージの量を超えています。メッセージ量の急増が一時的なものではなく、永続的な状態である場合は、サブスクライバー クライアントのインスタンス数を増やすことを検討してください。

同時実行制御

同時実行のサポートは、プログラミング言語によって異なります。Java や Go など、並列スレッドに対応している言語の場合、クライアント ライブラリはデフォルトのスレッド数を選択します。しかし、この値がアプリケーションに最適ではない場合もあります。たとえば、サブスクライバー アプリケーションがメッセージの受信量に対応できず、CPU にバインドされていない場合は、スレッド数を増やす必要があります。CPU 使用率の高いメッセージ処理では、スレッド数を減らすと、問題が解決する場合があります。

次のサンプルでは、サブスクライバーで同時実行制御を行います。

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

同時実行のサポートは、プログラミング言語によって異なります。詳細については、API リファレンス ドキュメントをご覧ください。

StreamingPull

Cloud Pub/Sub サービスにはメッセージを取得するための 2 つの API があります。

Cloud Client ライブラリでは、使用可能な場合に、スループットを最大にし、レイテンシを最小にするために、双方向ストリーミング RPC である StreamingPull を使用します。StreamingPull API を直接使用することはない場合でも、StreamingPull のいくつかの重要なプロパティ、従来の pull メソッドとの違いを理解することは重要です。

pull メソッドは次のリクエスト / レスポンス モデルに依存しています。

  1. アプリケーションがメッセージのリクエストを送信します。
  2. サーバーが 0 または 1 つ以上のメッセージとともに応答し、接続を終了します。

StreamingPull サービス API は確認応答を送信し、確認応答期限を変更する際に、複数のメッセージを受信するために、次の永続的な双方向接続に依存します。

  1. クライアントが接続を確立するためにサービスへのリクエストを送信します。
  2. クライアントがメッセージ データを交換するためにこの接続を使用します。
  3. リクエスト(つまり、双方向接続)はクライアントまたはサーバーによって終了します。

StreamingPull のエラー率は 100%(予測された結果)

StreamingPull ストリームは常に再試行可能なエラーコードで終了します。通常の RPC とは異なり、ここでのエラーはリクエストが失敗したことではなく、単にストリームが破損したことを示しています。そのため、StreamingPull API のエラー率が驚異的な 100% になることがありますが、これは設計によるものです。StreamingPull エラーを診断するために、StreamingPull リクエスト指標ではなく、StreamingPull メッセージ オペレーション指標を確認することをおすすめします。

StreamingPull: 小規模なメッセージの大規模なバックログの処理

gRPC StreamingPull スタックは高スループットが得られるように最適化されているため、メッセージをバッファリングします。(新しいメッセージの安定したストリームではなく)小規模なメッセージの大規模なバックログを処理しようとすると、なんらかの影響が出る可能性があります。このような状況下では、メッセージが何度も配信され、クライアント間で負荷が効率的に分散されない可能性があります。

Cloud Pub/Sub サービスとクライアント ライブラリ ユーザー スペースの間のバッファ容量は約 10 MB です。クライアント ライブラリの動作に対するこのバッファの影響を理解するために、次の例を見てみましょう。

  • サブスクリプションに 10,000 件の 1 KB メッセージのバックログがあります。
  • シングル スレッド クライアント インスタンスによって各メッセージを順次処理するために 1 秒ずつかかります。
  • そのサブスクリプションのためのサービスへの StreamingPull 接続を確立する最初のクライアント インスタンスは、10,000 件のメッセージ全体のバッファを受け取ります。
  • このバッファを処理するには、10,000 秒(約 3 時間)かかります。
  • その間、一部のメッセージが確認応答期限を超え、同じクライアントに再送信され、結果として重複が生じます。
  • 複数のクライアント インスタンスが実行されている場合、バッファ内のメッセージ スタックを、最初のインスタンス以外のインスタンスでは利用できません。

メッセージが単一の大規模なバッチとしてではなく、一定のレートで到着する場合には、この状況は発生しません。このサービスでは 10 MB のメッセージ全体が一度に処理されることはないため、複数のサブスクライバー間でメッセージの負荷を効率的に分散できます。

この状況を解決するには、現在、一部の Cloud クライアント ライブラリ(「同期 pull」セクションを参照)とすべての API クライアント ライブラリで利用できる push サブスクリプションまたは pull API のいずれかを使用します。詳細については、クライアント ライブラリのドキュメントをご覧ください。

同期 pull

非同期 pull が、アプリケーションにとって最適ではない場合もあります。その例として、アプリケーション ロジックがメッセージを取得する際にポーリング パターンを使用する場合や、クライアントによって取得された複数のメッセージに対して正確な上限の設定が常に必要な場合などが挙げられます。このようなアプリケーションをサポートするため、サービスやクライアント ライブラリの大半で同期 pull メソッドがサポートされています。

固定数のメッセージを pull して確認応答するサンプルコードを次に示します。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

レスポンス:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

レスポンス:

200 OK

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${
        message.message.data
      }" for ${newAckDeadlineSeconds}s.`
    );
  }
}

PHP

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の PHP の設定手順に従ってください。詳細については、Cloud Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info('{}: Running {} for {}s'.format(
        time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE)
            logger.info('{}: Reset ack deadline for {} for {}s'.format(
                time.strftime("%X", time.gmtime()),
                msg_data, ACK_DEADLINE))

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info("{}: Acknowledged {}".format(
                time.strftime("%X", time.gmtime()), msg_data))
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

同期 pull でのメッセージ配信のレイテンシを短くするには、未処理の pull リクエストが多く存在している必要があります。トピックのスループットが増加するにつれ、より多くの pull リクエストが必要になります。一般に、非同期 pull はレイテンシの影響を受けやすいアプリケーションに適しています。

クライアント ライブラリによっては、固定数のメッセージの同期 pull に対応していないものもあります。詳細については、選択したプログラミング言語の API リファレンス ドキュメントをご覧ください。

スケーリング

メッセージ量の変化に対応するため、サブスクライバー アプリケーションにスケーリング メカニズムの実装が必要になる場合があります。その方法は環境によって異なりますが、一般には Stackdriver Monitoring サービスを通じて提供されたバックログ指標に基づきます。Compute Engine に対してこれを行う方法について詳しくは、Cloud Monitoring 指標に基づくスケーリングをご覧ください。

Cloud Monitoring のサポートされている指標ページで「PubSub」を探し、プログラムを使用してモニタリングできる指標を確認してください。

最後に、すべての分散型サービスと同様に、すべてのリクエストが再試行される場合があることを想定してください。

重複メッセージへの対処

Cloud Pub/Sub は、重複するメッセージを送信することがあります。たとえば、確認応答期限が切れる前にメッセージの確認応答を行わないと、Cloud Pub/Sub がメッセージを再送信します。Stackdriver で expired レスポンス コードを使用して確認応答オペレーションをモニタリングし、この状態を検出します。このデータを取得する方法の 1 つに、response_code でグループ化された確認応答メッセージ オペレーション指標を使用する方法があります。

Stackdriver で確認応答期限が切れたメッセージを検索する

メッセージの期限を延長すると、重複率を低下させることができます。

  • 期限の延長はクライアント ライブラリが自動的に行いますが、設定可能な延長期限の最大値にはデフォルトの制限があります。
  • クライアント ライブラリを独自に構築している場合は、確認応答期限の延長に modifyAckDeadline メソッドを使用します。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...