pull サブスクライバー ガイド

Google Cloud Pub/Sub は、push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。

非同期 pull

非同期 pull を使用すると、アプリケーションのスループットを向上させることができます。アプリケーションで新しいメッセージをブロックする必要はありません。メッセージは、アプリケーション内で長時間実行されているメッセージ リスナーを使って受信できます。以下の例で示すとおり、同時に確認応答されるメッセージは 1 回に 1 つだけです。

クライアント ライブラリによっては、非同期のメッセージ pull に対応していない場合があります。同期 pull によるメッセージの取得については、同期 pull をご覧ください。

詳細については、ご使用のプログラミング言語の API リファレンス ドキュメントをご覧ください。

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(name)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	mu.Lock()
	defer mu.Unlock()
	received++
	if received >= 10 {
		cancel()
		msg.Nack()
		return
	}
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'your-subscription';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);
setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def receive_messages(project, subscription_name):
    """Receives messages from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscriber.subscribe(subscription_path, callback=callback)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

エラーのリッスン

次のサンプルでは、メッセージの登録中に発生するエラーを処理します。

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(name).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'your-subscription';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);
setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def listen_for_errors(project, subscription_name):
    """Receives messages and catches errors from a pull subscription."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    subscription = subscriber.subscribe(subscription_path, callback=callback)

    # Blocks the thread while messages are coming in through the stream. Any
    # exceptions that crop up on the thread will be set on the future.
    try:
        subscription.future.result()
    except Exception as e:
        print(
            'Listening for messages on {} threw an Exception: {}.'.format(
                subscription_name, e))
        raise

メッセージのフロー制御

サブスクライバー クライアントのメッセージの処理と確認が遅く、Pub/Sub がクライアントにメッセージを送信できない場合があります。これにより、次の 2 つの問題が発生する可能性があります。

  • 受信メッセージの量がクライアントの処理能力を超えたときに、ネットワーク上の別のクライアントにその処理能力がある場合、メッセージのバックログが発生する可能性があります。2 番目のクライアントで全体のバックログを減らすことも可能ですが、最初のクライアントが 2 番目のクライアントにメッセージをすぐに送信できないため、バックログが減ることはありません。このため、最初のクライアントにメッセージが溜まり、全体の処理速度が低下します。
  • クライアント ライブラリがメッセージの承認期限を延長し続けるため、バックログとなったメッセージはメモリ、CPU、帯域幅を消費します。

この状況でスループットを最大にし、レイテンシを最小にするには、サブスクライバーのフロー制御機能を使用して、サブスクライバーのメッセージの取得速度を制御します。以下に、フロー制御機能のサンプルを示します。

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
SubscriberClient subscriber = SubscriberClient.Create(
    subscriptionName, new[] { subscriberClient },
    new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        StreamAckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Go

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

sub := client.Subscription(name)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

long maxMessageCount = 10L;
// Configure max number of messages to be pulled
FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder().setMaxOutstandingElementCount(maxMessageCount).build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'your-topic';
// const subscriptionName = 'your-subscription';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
subscription
  .get({
    autoCreate: true,
  })
  .then(results => {
    const subscription = results[0];

    console.log(
      `Subscription ${
        subscription.name
      } created with a maximum of ${maxInProgress} unprocessed messages.`
    );
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

Python

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

def receive_messages_with_flow_control(project, subscription_name):
    """Receives messages from a pull subscription with flow control."""
    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message))
        message.ack()

    # Limit the subscriber to only have ten outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=10)
    subscriber.subscribe(
        subscription_path, callback=callback, flow_control=flow_control)

    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
通常、フロー制御が必要な場合、公開されるメッセージの量が消費されるメッセージの量を超えています。これが一時的な急増ではなく、永続的な状態である場合は、メッセージ量に対応できるように、サブスクライバー クライアント インスタンスとマシンの数を増やすことを検討してください。

同時実行制御

同時実行のサポートは、プログラミング言語によって異なります。Java や Go など、並列スレッドに対応している言語の場合、クライアント ライブラリはデフォルトのスレッド数を選択します。しかし、この値がアプリケーションに最適ではない場合もあります。たとえば、サブスクライバー アプリケーションがメッセージの受信量に対応できず、CPU にバインドされていない場合は、スレッド数を増やす必要があります。CPU 使用率の高いメッセージ処理では、スレッド数を減らすと、問題が解決する場合があります。

次のサンプルでは、サブスクライバーで同時実行制御を行います。

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  :callback => 4,
  :push => 2
} do |received_message |
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

同時実行のサポートは、プログラミング言語によって異なります。詳細については、API リファレンス ドキュメントをご覧ください。

同期 pull

新しいメッセージを常にポーリングする必要がある場合、非同期 pull(プログラミング言語で選択可能な場合)のほうがスループットの改善に役立つ可能性があります。同期 pull は、パブリッシュ後すぐにメッセージを処理する必要のないワークロードに適しています。同期サブスクライバーは接続を長時間維持する必要がないため、固定数のメッセージを pull して処理するように設定することもできます。現在処理するメッセージがない場合、サブスクライバーはタイムアウトします。クライアント ライブラリによっては、固定数のメッセージの同期 pull に対応していないものもあります。詳細については、ご利用のプログラミング言語の API リファレンス ドキュメントをご覧ください。

次に、固定数のメッセージを pull して確認応答するサンプルコードを記載します。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

レスポンス:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

レスポンス:

200 OK

C#

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
SubscriberClient subscriber = SubscriberClient.Create(
    subscriptionName, new[] { subscriberClient });
// SimpleSubscriber runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Java

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder().build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

PHP

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

Cloud Pub/Sub クライアントのインストールと作成について、詳しくは Cloud Pub/Sub クライアント ライブラリをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

スケーリング

メッセージ量の変化に対応するため、サブスクライバー アプリケーションにスケーリング メカニズムの実装が必要になる場合があります。その方法は環境によって異なりますが、一般には Stackdriver Monitoring サービスを通じて提供されたバックログ指標に基づきます。Compute Engine に対してこれを行う方法について詳しくは、Cloud Monitoring 指標に基づくスケーリングをご覧ください。

Cloud Monitoring のサポートされている指標ページで「PubSub」を探し、プログラムを使用してモニタリングできる指標を確認してください。

最後に、すべての分散型サービスと同様に、すべてのリクエストが再試行される場合があることを想定してください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...