利用提取功能接收訊息

Cloud Pub/Sub 支援推送和提取訊息傳送。如要瞭解提取和推送訂閱的概況,並對這兩者進行比較,請參閱訂閱者總覽。 這份文件說明提取傳送。有關推送傳送的討論,請參閱推送訂閱者指南

非同步提取

非同步提取不需要應用程式封鎖新訊息,因此能在應用程式中提供更高的總處理量。在應用程式中可以使用長時間執行的訊息接聽程式接收訊息,而且一次可以確認一則訊息,如下文中的範例所示。Java、Python、.NET、Go 及 Ruby 用戶端使用 streamingPull 服務 API,有效實作非同步用戶端 API。

並非所有用戶端程式庫都支援以非同步方式提取訊息。 如要瞭解如何以同步方式提取訊息,請參閱同步提取一文。

詳情請參閱程式設計語言中的 API 參考說明文件

C#

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 C# 設定說明進行操作。詳情請參閱 Cloud Pub/Sub C# API 參考說明文件

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Go 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Go API 參考說明文件

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Java 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Java API 參考說明文件

String projectId = "my-project-id";
String subscriptionId = "my-subscription-id";

ProjectSubscriptionName subscriptionName =
    ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver
MessageReceiver receiver =
    new MessageReceiver() {
      @Override
      public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
        // handle incoming message, then ack/nack the received message
        System.out.println("Id : " + message.getMessageId());
        System.out.println("Data : " + message.getData().toStringUtf8());
        consumer.ack();
      }
    };

Subscriber subscriber = null;
try {
  // Create a subscriber for "my-subscription-id" bound to the message receiver
  subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
  subscriber.startAsync();
  // ...
} finally {
  // stop receiving messages
  if (subscriber != null) {
    subscriber.stopAsync();
  }
}

Node.js

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Node.js 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Node.js API 參考說明文件

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

Python

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Python 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Python API 參考說明文件

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

處理自訂屬性

這個範例顯示如何以非同步方式提取訊息,以及如何擷取中繼資料中的自訂屬性:

Python

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Python 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Python API 參考說明文件

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Ruby 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Ruby API 參考說明文件

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

接聽錯誤

這個範例顯示如何處理在訂閱訊息時發生的錯誤:

Go

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Go 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Go API 參考說明文件

// If the service returns a non-retryable error, Receive returns that error after
// all of the outstanding calls to the handler have returned.
err := client.Subscription(subName).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Java 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Java API 參考說明文件

subscriber.addListener(
    new Subscriber.Listener() {
      public void failed(Subscriber.State from, Throwable failure) {
        // Handle error.
      }
    },
    MoreExecutors.directExecutor());

Node.js

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Node.js 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Node.js API 參考說明文件

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
const messageHandler = function(message) {
  // Do something with the message
  console.log(`Message: ${message}`);

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Create an event handler to handle errors
const errorHandler = function(error) {
  // Do something with the error
  console.error(`ERROR: ${error}`);
};

// Listen for new messages/errors until timeout is hit
subscription.on(`message`, messageHandler);
subscription.on(`error`, errorHandler);

setTimeout(() => {
  subscription.removeListener(`message`, messageHandler);
  subscription.removeListener(`error`, errorHandler);
}, timeout * 1000);

Python

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Python 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Python API 參考說明文件

from google.cloud import pubsub_v1

# TODO project           = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
try:
    # When timeout is unspecified, the result method waits indefinitely.
    future.result(timeout=30)
except Exception as e:
    print(
        'Listening for messages on {} threw an Exception: {}.'.format(
            subscription_name, e))

Ruby

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Ruby 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Ruby API 參考說明文件

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => ex
  puts "Exception #{ex.inspect}: #{ex.message}"
  raise "Stopped listening for messages."
end

訊息流量控制

您的訂閱者用戶端在處理和確認訊息時,可能比 Cloud Pub/Sub 傳送訊息至用戶端更為緩慢。在這種情況下:

  • 一個用戶端可能有訊息的待處理作業,因為它沒有處理大量收到訊息的能力,但網路上的另一個用戶端擁有該能力。第二個用戶端可以減少訂閱待處理作業,但沒有機會進行,因為第一個用戶端仍維持其接收訊息的釋出期。這樣一來就會降低整體處理速率,因為訊息在第一個用戶端卡住了。
  • 由於用戶端程式庫重複延長待處理訊息的確認期限,導致這些訊息持續佔用 CPU 及頻寬資源。因此訂閱者用戶端可能會耗盡資源 (例如記憶體),對總處理量及處理訊息的延遲時間造成負面影響。

為了緩解以上問題,請使用訂閱者的流量控制功能,控制訂閱者接收訊息的速率。這些流量控制功能如以下範例所示:

C#

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 C# 設定說明進行操作。詳情請參閱 Cloud Pub/Sub C# API 參考說明文件

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(
    subscriptionName,
    settings: new SubscriberClient.Settings()
    {
        AckExtensionWindow = TimeSpan.FromSeconds(4),
        Scheduler = Google.Api.Gax.SystemScheduler.Instance,
        AckDeadline = TimeSpan.FromSeconds(10),
        FlowControlSettings = new Google.Api.Gax
            .FlowControlSettings(
            maxOutstandingElementCount: 100,
            maxOutstandingByteCount: 10240)
    });
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Go 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Go API 參考說明文件

sub := client.Subscription(subName)
sub.ReceiveSettings.MaxOutstandingMessages = 10
err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
	fmt.Printf("Got message: %q\n", string(msg.Data))
	msg.Ack()
})
if err != nil {
	return err
}

Java

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Java 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Java API 參考說明文件

FlowControlSettings flowControlSettings =
    FlowControlSettings.newBuilder()
        .setMaxOutstandingElementCount(10_000L)
        .setMaxOutstandingRequestBytes(1_000_000_000L)
        .build();
Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setFlowControlSettings(flowControlSettings)
        .build();

Node.js

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Node.js 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Node.js API 參考說明文件

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const subscriptionName = 'my-sub';
// const maxInProgress = 5;
// const maxBytes = 10000;

const topic = pubsub.topic(topicName);

const options = {
  flowControl: {
    maxBytes: maxBytes,
    maxMessages: maxInProgress,
  },
};

const subscription = topic.subscription(subscriptionName, options);

// Creates a new subscription
// Note that flow control configurations are not persistent
const [newSubscription] = await subscription.get({
  autoCreate: true,
});
console.log(
  `Subscription ${
    newSubscription.name
  } created with a maximum of ${maxInProgress} unprocessed messages.`
);

Python

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Python 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Python API 參考說明文件

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Ruby 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Ruby API 參考說明文件

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
廣泛來說,需要流量控制就表示正在以比耗用訊息更高的速率發佈訊息。如果這是持續狀態,而不是訊息量一時達到最高峰,請考慮增加訂閱者用戶端執行個體數量。

並行控制

是否支援並行要視您的程式設計語言而定。對於支援平行執行緒的語言實作,例如 Java 與 Go,用戶端程式庫會對執行緒數量做出預設選擇。這項選擇對於您的應用程式而言可能不是最佳選擇。 例如,如果您發現訂閱者應用程式跟不上收到的訊息量,但不受 CPU 限制,您應該增加執行緒計數。 對於會耗用大量 CPU 資源的訊息處理作業,減少執行緒數量可能比較適當。

以下範例說明如何控制訂閱者中的並行:

Java

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Java 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Java API 參考說明文件

// provide a separate executor service for polling
ExecutorProvider executorProvider =
    InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();

Subscriber subscriber =
    Subscriber.newBuilder(subscriptionName, receiver)
        .setExecutorProvider(executorProvider)
        .build();

Ruby

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Ruby 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Ruby API 參考說明文件

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

是否支援並行取決於您的程式設計語言。 如需更多資訊,請參閱 API 參考說明文件

StreamingPull

Cloud Pub/Sub 服務有兩個 API 可用來擷取訊息:

在可能的情況下,Cloud 用戶端程式庫會使用 StreamingPull,這是一個雙向串流 RPC,可以使總處理量達到最高並使延遲時間降到最低。雖然您可能絕不會直接使用 StreamingPull API,但您需要瞭解 StreamingPull 的一些重要屬性以及它與更傳統提取方法之間的差異,這很重要。

提取方法依賴於要求/回應模型:

  1. 應用程式會傳送訊息的要求。
  2. 伺服器會回覆零或多個訊息並關閉連線。

StreamingPull 服務 API 依賴於持續的雙向連線來在訊息變為可用時接收多個訊息、傳送確認以及修改確認期限:

  1. 用戶端會傳送要求至服務,以建立連線。
  2. 用戶端會使用該連線交換訊息資料。
  3. 要求 (即雙向連線) 會由用戶端或伺服器終止。

StreamingPull 有 100% 的錯誤率 (這是預期情況)

StreamingPull 串流一律以可以重試的錯誤碼終止。請注意,與一般 RPC 不同,這裡的錯誤只是用來指示串流已中斷,而非要求失敗。因此,雖然 StreamingPull API 表面上的錯誤率可能達到了驚人的 100%,但這是設計使然。如要診斷 StreamingPull 錯誤,我們建議關注 StreamingPull 訊息作業指標,而不要關注 StreamingPull 要求指標。

StreamingPull:處理小型訊息的大型待處理作業

gRPC StreamingPull 堆疊已針對高總處理量最佳化,因此會緩衝訊息。 如果您嘗試處理小型訊息的大型待處理作業 (而非持續串流的新訊息),這可能會產生某些後果。在這些情況下,您可能會看到訊息被多次傳送,而且訊息可能不會跨用戶端有效達到負載平衡。

Cloud Pub/Sub 服務與用戶端程式庫使用者空間之間的緩衝區約為 10MB。如要瞭解此緩衝區對用戶端程式庫行為的影響,請考量下列範例:

  • 訂閱項目有 10000 個 1KB 訊息的待處理作業。
  • 每個訊息都會依單一執行緒用戶端執行個體,花費 1 秒鐘的時間來依序處理。
  • 用來針對該訂閱項目建立 StreamingPull 與服務間連線的第一個用戶端執行個體將會取得完整 10K 訊息的緩衝區。
  • 它會花費 10000 秒 (約 3 小時) 的時間來處理緩衝區。
  • 在此時間內,一些訊息會超出其確認期限,並會重新傳送至相同的用戶端,這樣就造成了重複。
  • 當多個用戶端執行個體在執行時,卡在緩衝區中的訊息無法用於第一個執行個體以外的任何執行個體。

如果訊息以固定速率到達,而不是一次到達一大批,就不會發生此情況。服務絕對不會一次擁有完整的 10MB 訊息,因此能夠跨多個訂閱者使訊息有效達到負載平衡。

如要解決此情況,請使用推送訂閱或提取 API,目前在一些 Cloud 用戶端程式庫 (請參閱「同步提取」部分) 及所有 API 用戶端程式庫中都有提供。詳情請參閱用戶端程式庫說明文件

同步提取

有時非同步提取並不是應用程式的最佳選擇。例如應用程式邏輯可能仰賴輪詢模式以擷取訊息,或需要針對用戶端在任何特定時間接收的訊息數量訂定精確限制。為了支援這類應用程式,服務及大部分用戶端程式庫都支援同步提取方法。

以下是提取確認固定數量訊息的部分程式碼範例:

通訊協定

要求:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull
{
  "returnImmediately": "false",
  "maxMessages": "1"
}

回應:

200 OK
{
  "receivedMessages": [
    {
      "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
      "message": {
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "19917247034"
      }
    }
  ]
}

要求:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge
{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

回應:

200 OK

C#

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 C# 設定說明進行操作。詳情請參閱 Cloud Pub/Sub C# API 參考說明文件

SubscriptionName subscriptionName = new SubscriptionName(projectId,
    subscriptionId);
SubscriberServiceApiClient subscriberClient =
    SubscriberServiceApiClient.Create();
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(
    subscriptionName, returnImmediately: true, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
    string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
    Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
}
// If acknowledgement required, send to server.
if (acknowledge)
{
    subscriberClient.Acknowledge(subscriptionName,
        response.ReceivedMessages.Select(msg => msg.AckId));
}

Java

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Java 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Java API 參考說明文件

SubscriberStubSettings subscriberStubSettings =
    SubscriberStubSettings.newBuilder()
        .setTransportChannelProvider(
            SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                .setMaxInboundMessageSize(20 << 20) // 20MB
                .build())
        .build();

try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
  // String projectId = "my-project-id";
  // String subscriptionId = "my-subscription-id";
  // int numOfMessages = 10;   // max number of messages to be pulled
  String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
  PullRequest pullRequest =
      PullRequest.newBuilder()
          .setMaxMessages(numOfMessages)
          .setReturnImmediately(false) // return immediately if messages are not available
          .setSubscription(subscriptionName)
          .build();

  // use pullCallable().futureCall to asynchronously perform this operation
  PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
  List<String> ackIds = new ArrayList<>();
  for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
    // handle received message
    // ...
    ackIds.add(message.getAckId());
  }
  // acknowledge received messages
  AcknowledgeRequest acknowledgeRequest =
      AcknowledgeRequest.newBuilder()
          .setSubscription(subscriptionName)
          .addAllAckIds(ackIds)
          .build();
  // use acknowledgeCallable().futureCall to asynchronously perform this operation
  subscriber.acknowledgeCallable().call(acknowledgeRequest);
  return pullResponse.getReceivedMessagesList();
}

Node.js

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Node.js 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Node.js API 參考說明文件

// Imports the Google Cloud client library
const pubsub = require('@google-cloud/pubsub');

const client = new pubsub.v1.SubscriberClient();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const projectName = 'your-project';
// const subscriptionName = 'your-subscription';

const formattedSubscription = client.subscriptionPath(
  projectName,
  subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
  subscription: formattedSubscription,
  maxMessages: maxMessages,
};

let isProcessed = false;

// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
  console.log(`Processing "${message.message.data}"...`);

  setTimeout(() => {
    console.log(`Finished procesing "${message.message.data}".`);
    isProcessed = true;
  }, 30000);
}

// The subscriber pulls a specified number of messages.
const [response] = await client.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);

let waiting = true;
while (waiting) {
  await new Promise(r => setTimeout(r, 10000));
  // If the message has been processed..
  if (isProcessed) {
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
    };

    //..acknowledges the message.
    await client.acknowledge(ackRequest);
    console.log(`Acknowledged: "${message.message.data}".`);
    // Exit after the message is acknowledged.
    waiting = false;
    console.log(`Done.`);
  } else {
    // If the message is not yet processed..
    const modifyAckRequest = {
      subscription: formattedSubscription,
      ackIds: [message.ackId],
      ackDeadlineSeconds: newAckDeadlineSeconds,
    };

    //..reset its ack deadline.
    await client.modifyAckDeadline(modifyAckRequest);

    console.log(
      `Reset ack deadline for "${
        message.message.data
      }" for ${newAckDeadlineSeconds}s.`
    );
  }
}

PHP

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 PHP 設定說明進行操作。詳情請參閱 Cloud Pub/Sub PHP API 參考說明文件

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Python 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Python API 參考說明文件

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info('{}: Running {} for {}s'.format(
        time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME))
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                subscription_path,
                [ack_id],
                ack_deadline_seconds=ACK_DEADLINE)
            logger.info('{}: Reset ack deadline for {} for {}s'.format(
                time.strftime("%X", time.gmtime()),
                msg_data, ACK_DEADLINE))

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(subscription_path, [ack_id])
            logger.info("{}: Acknowledged {}".format(
                time.strftime("%X", time.gmtime()), msg_data))
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print("Received and acknowledged {} messages. Done.".format(NUM_MESSAGES))

Ruby

在嘗試這個範例之前,請至 Cloud Pub/Sub 快速入門導覽課程:使用用戶端程式庫,按照 Ruby 設定說明進行操作。詳情請參閱 Cloud Pub/Sub Ruby API 參考說明文件

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

請注意,為了讓同步提取達到低延遲的訊息提交,請務必具備許多同時未解決的提取要求。隨著主題總處理量增加,就必須擁有更多提取要求。一般來說,對延遲時間敏感的應用程式,比較適合使用非同步提取

並非所有用戶端程式庫都支援以同步方式提取固定數量的訊息。關於您選擇之程式設計語言的詳情,請參閱 API 參考說明文件

資源調度

您可能需要為訂閱者應用程式實作配置機制,才能跟上訊息量。具體操作方式取決於您的環境,但通常以透過 Stackdriver Monitoring 服務提供的待處理作業指標為基礎。 如要進一步瞭解如何針對 Compute Engine 執行這項操作,請參閱以 Cloud Monitoring 指標為基礎的配置一文。

在 Cloud Monitoring 的支援的指標頁面搜尋「pubsub」,瞭解可以透過程式方式監控哪些指標。

最後,和所有已發佈服務一樣,預期會偶爾重試每個要求。

處理重複訊息

Cloud Pub/Sub 可傳送重複訊息。例如若您未在訊息確認期限到期之前確認訊息,Cloud Pub/Sub 就會再次傳送訊息。請使用 Stackdriver 監控確認操作,並利用 expired 回應碼偵測這個情況。取得這項資料的方式之一是「確認訊息作業」指標,以「response_code」進行分組

利用 Stackdriver 搜尋已過期的訊息確認期限

為了減少重複比率,請延長訊息期限:

  • 用戶端程式庫會自動處理期限延長,但請注意您可設定最大延長期限的預設限制。
  • 如果您正在建構自己的用戶端程式庫,請使用 modifyAckDeadline 方法延長確認期限。
本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
Cloud Pub/Sub