pull サブスクライバー ガイド

Cloud Pub/Sub は push と pull の両方のメッセージ配信をサポートします。pull サブスクリプションと push サブスクリプションの概要と比較については、サブスクライバーの概要をご覧ください。このドキュメントでは、pull 配信について説明します。push 配信の説明については、push サブスクライバー ガイドをご覧ください。

非同期 pull

非同期 pull を使用すると、アプリケーションのスループットを向上させることができます。アプリケーションで新しいメッセージをブロックする必要はありません。メッセージは、アプリケーション内で長時間実行されているメッセージ リスナーを使って受信できます。以下の例で示すとおり、同時に確認応答されるメッセージは 1 回に 1 つだけです。

クライアント ライブラリによっては、非同期のメッセージ pull に対応していない場合があります。同期 pull によるメッセージの取得については、同期 pull をご覧ください。

詳細については、ご使用のプログラミング言語の API リファレンス ドキュメントをご覧ください。

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
SubscriberClient subscriber = SubscriberClient.Create(
    subscriptionName, new[] { subscriberClient });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);
setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

カスタム属性の処理

このサンプルでは、非同期でメッセージを pull し、メタデータからカスタム属性を取得する方法を示します。

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  if !received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

エラーのリッスン

次のサンプルでは、メッセージの登録中に発生するエラーを処理します。

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);
setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception= true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

メッセージのフロー制御

サブスクライバー クライアントのメッセージの処理と確認が遅く、Cloud Pub/Sub がクライアントにメッセージを送信できない場合があります。その場合、次の状況になります。

  • 受信メッセージの量がクライアントの処理能力を超えたときに、ネットワーク上の別のクライアントにその処理能力がある場合、メッセージのバックログが発生する可能性があります。2 番目のクライアントで全体のバックログを減らすことも可能ですが、最初のクライアントが 2 番目のクライアントにメッセージをすぐに送信できないため、バックログが減ることはありません。このため、最初のクライアントにメッセージが溜まり、全体の処理速度が低下します。
  • クライアント ライブラリは処理が遅れているメッセージに対する確認応答期限を繰り返し延長するため、こうしたメッセージによってメモリ、CPU、帯域幅のリソースが消費され続けます。そのため、メモリ不足の問題が発生する可能性があります。

この状況でスループットを最大にし、レイテンシを最小にするには、サブスクライバーのフロー制御機能を使用して、サブスクライバーのメッセージの取得速度を制御します。以下に、フロー制御機能のサンプルを示します。

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
SubscriberClient subscriber = SubscriberClient.Create(
    subscriptionName, new[] { subscriberClient },
    new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        StreamAckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
Thread.Sleep(3000);
subscriber.StopAsync(CancellationToken.None).Wait();

Go

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、Cloud Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

sub := client.Subscription(subName)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const PubSub = require(`@google-cloud/pubsub`);

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
subscription
  .get({
    autoCreate: true,
  })
  .then(results => {
    const subscription = results[0];

    console.log(
      `Subscription ${
        subscription.name
      } created with a maximum of ${maxInProgress} unprocessed messages.`
    );
  })
  .catch(err => {
    console.error('ERROR:', err);
  });

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
通常、フロー制御が必要な場合、公開されるメッセージの量が消費されるメッセージの量を超えています。これが一時的な急増ではなく、永続的な状態である場合は、サブスクライバー クライアント インスタンスとマシンの数を増やすことを検討してください。

同時実行制御

同時実行のサポートは、プログラミング言語によって異なります。Java や Go など、並列スレッドに対応している言語の場合、クライアント ライブラリはデフォルトのスレッド数を選択します。しかし、この値がアプリケーションに最適ではない場合もあります。たとえば、サブスクライバー アプリケーションがメッセージの受信量に対応できず、CPU にバインドされていない場合は、スレッド数を増やす必要があります。CPU 使用率の高いメッセージ処理では、スレッド数を減らすと、問題が解決する場合があります。

次のサンプルでは、サブスクライバーで同時実行制御を行います。

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  :callback => 4,
  :push => 2
} do |received_message |
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

同時実行のサポートは、プログラミング言語によって異なります。詳細については、API リファレンス ドキュメントをご覧ください。

同期 pull

新しいメッセージを常にポーリングする必要がある場合、非同期 pull(プログラミング言語で選択可能な場合)のほうがスループットの改善に役立つ可能性があります。同期 pull は、パブリッシュ後すぐにメッセージを処理する必要のないワークロードに適しています。同期サブスクライバーは接続を長時間維持する必要がないため、固定数のメッセージを pull して処理するように設定することもできます。現在処理するメッセージがない場合、サブスクライバーはタイムアウトします。

同期 pull でのレイテンシを短くするには、returnImmediately が false に設定された未処理の pull リクエストが多く存在している必要があります。トピックのスループットが増加するにつれ、より多くの pull リクエストが必要になります。同時に存在する未処理のリクエスト数は 10〜100 が適切です。ただし、レイテンシを最小限に抑えるには、非同期 pull の使用をおすすめします。

クライアント ライブラリによっては、固定数のメッセージの同期 pull に対応していないものもあります。詳細については、ご利用のプログラミング言語の API リファレンス ドキュメントをご覧ください。次に、固定数のメッセージを pull して確認応答するサンプルコードを記載します。

プロトコル

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

レスポンス:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

リクエスト:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

レスポンス:

200 OK

C#

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の C# の設定手順に従ってください。詳細については、Cloud Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

Java

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Java の設定手順に従ってください。詳細については、Cloud Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder().build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

Node.js

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、Cloud Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 3;
const ackDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};
// `messages` is a dict that stores message ack ids as keys, and message
// data and the processing states (true if done, false if not) as values.
const messages = {};

// The worker function takes a message and starts a long-running process.
function worker(message) {
  const target = Math.floor(Math.random() * 1e5);
  console.log(`Processing "${message.message.data}" for ${target / 1e3}s...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    // After the message has been processed, set its processing state to true.
    messages[message.ackId][1] = true;
  }, target);
}

// The subscriber pulls a specific number of messages.
client
  .pull(request)
  .then(responses => {
    // The first element of `responses` is a PullResponse object.
    const response = responses[0];

    // Initialize `messages` with message ackId, message data and `false` as
    // processing state. Then, start each message in a worker function.
    response.receivedMessages.forEach(message => {
      messages[message.ackId] = [message.message.data, false];
      worker(message);
    });

    let numProcessed = 0;

    // setInterval() gets run every 10s.
    const interval = setInterval(function() {
      // Every 10s, we do a check on the processing states of the messages.
      Object.keys(messages).forEach(ackId => {
        if (messages[ackId][1]) {
          // If the processing state for a particular message is true,
          // We will ack the message.
          const ackRequest = {
            subscription: formattedSubscription,
            ackIds: [ackId],
          };

          client.acknowledge(ackRequest).catch(err => {
            console.error(err);
          });

          console.log(`Acknowledged: "${messages[ackId][0]}".`);

          // Increment numProcessed by 1.
          numProcessed += 1;

          // Remove this message from `messages`.
          delete messages[ackId];
        } else {
          // If the processing state of a particular message remains false,
          // we will modify its ack deadline.
          const modifyAckRequest = {
            subscription: formattedSubscription,
            ackIds: [ackId],
            ackDeadlineSeconds: ackDeadlineSeconds,
          };

          client.modifyAckDeadline(modifyAckRequest).catch(err => {
            console.error(err);
          });

          console.log(
            `Reset ack deadline for "${
              messages[ackId][0]
            }" for ${ackDeadlineSeconds}s.`
          );
        }

        // If all messages have been processed, we clear out of the interval.
        if (numProcessed === response.receivedMessages.length) {
          clearInterval(interval);
          console.log(`Done.`);
        }
      });
    }, 10000);
  })
  .catch(err => {
    console.error(err);
  });

PHP

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の PHP の設定手順に従ってください。詳細については、Cloud Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Python の設定手順に従ってください。詳細については、Cloud Pub/Sub Python API のリファレンス ドキュメントをご覧ください。

# project           = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_name)

NUM_MESSAGES=2
ACK_DEADLINE=30
SLEEP_TIME=10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1,60)
    logger.info('{}: Running {} for {}s'.format(
        time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process, (ack_id, msg_data) in processes.items():
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(subscription_path,
                [ack_id], ack_deadline_seconds=ACK_DEADLINE)
            logger.info('{}: Reset ack deadline for {} for {}s'.format(
                time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE))

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info("{}: Acknowledged {}".format(
                time.strftime("%X", time.gmtime()), msg_data))
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))

Ruby

このサンプルを試す前に、Cloud Pub/Sub クイックスタート: クライアント ライブラリの使用の Ruby の設定手順に従ってください。詳細については、Cloud Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

スケーリング

メッセージ量の変化に対応するため、サブスクライバー アプリケーションにスケーリング メカニズムの実装が必要になる場合があります。その方法は環境によって異なりますが、一般には Stackdriver Monitoring サービスを通じて提供されたバックログ指標に基づきます。Compute Engine に対してこれを行う方法について詳しくは、Cloud Monitoring 指標に基づくスケーリングをご覧ください。

Cloud Monitoring のサポートされている指標ページで「PubSub」を探し、プログラムを使用してモニタリングできる指標を確認してください。

最後に、すべての分散型サービスと同様に、すべてのリクエストが再試行される場合があることを想定してください。

StreamingPull

Cloud Pub/Sub サービスにはメッセージを取得するための 2 つの API があります。

Cloud Client ライブラリでは、使用可能な場合に、スループットを最大にし、レイテンシを最小にするために、双方向ストリーミング RPC である StreamingPull を使用します。StreamingPull API を直接使用することはない場合でも、StreamingPull のいくつかの重要なプロパティ、従来の pull メソッドとの違いを理解することは重要です。

pull メソッドは次のリクエスト / レスポンス モデルに依存しています。

  1. アプリケーションがメッセージのリクエストを送信します。
  2. サーバーが 0 または 1 つ以上のメッセージとともに応答し、接続を終了します。

StreamingPull サービス API は確認応答を送信し、確認応答期限を変更する際に、複数のメッセージを受信するために、次の永続的な双方向接続に依存します。

  1. クライアントが接続を確立するためにサービスへのリクエストを送信します。
  2. クライアントがメッセージ データを交換するためにこの接続を使用します。
  3. リクエスト(つまり、双方向接続)はクライアントまたはサーバーによって終了します。

StreamingPull のエラー率は 100%(予測された結果)

StreamingPull ストリームは常に再試行可能なエラーコードで終了します。通常の RPC とは異なり、ここでのエラーはリクエストが失敗したことではなく、単にストリームが破損したことを示しています。そのため、StreamingPull API のエラー率が驚異的な 100% になることがありますが、これは設計によるものです。StreamingPull エラーを診断するために、StreamingPull リクエスト指標ではなく、StreamingPull メッセージ オペレーション指標を確認することをおすすめします。

小規模なメッセージの大規模なバックログの処理

gRPC StreamingPull スタックは高スループットのために最適化されているため、メッセージをバッファリングします。(新しいメッセージの安定したストリームではなく)小規模なメッセージの大規模なバックログを処理しようとすると、なんらかの影響が出る可能性があります。このような状況下では、メッセージが何度も配信され、クライアント間で負荷が効率的に分散されない可能性があります。

Cloud Pub/Sub サービスとクライアント ライブラリ ユーザー スペースの間のバッファ容量は約 10 MB です。クライアント ライブラリの動作に対するこのバッファの影響を理解するために、次の例を見てみましょう。

  • サブスクリプションに 10,000 件の 1 KB メッセージのバックログがあります。
  • シングル スレッド クライアント インスタンスによって各メッセージを順次処理するために 1 秒ずつかかります。
  • そのサブスクリプションのためのサービスへの StreamingPull 接続を確立する最初のクライアント インスタンスは、10,000 件のメッセージ全体のバッファを受け取ります。
  • このバッファを処理するには、10,000 秒(約 3 時間)かかります。
  • その間、一部のメッセージが確認応答期限を超え、同じクライアントに再送信され、結果として重複が生じます。
  • 複数のクライアント インスタンスが実行されている場合、バッファ内のメッセージ スタックを、最初のインスタンス以外のインスタンスでは利用できません。

メッセージが単一の大規模なバッチとしてではなく、一定のレートで到着する場合には、この状況は発生しません。このサービスでは 10 MB のメッセージ全体が一度に処理されることはないため、複数のサブスクライバー間でメッセージの負荷を効率的に分散できます。

この状況を解決するには、現在、一部の Cloud クライアント ライブラリ(「同期 pull」セクションを参照)とすべての API クライアント ライブラリで利用できる push サブスクリプションまたは pull API のいずれかを使用します。詳細については、クライアント ライブラリのドキュメントをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...