フロー制御設定を使用したサブスクライバーを作成して、メッセージを受信します。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
C++
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Change the flow control watermarks, by default the client library uses
// 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
// size watermarks. Recall that the library stops requesting messages if
// any of the high watermarks are reached, and the library resumes
// requesting messages when *both* low watermarks are reached.
auto constexpr kMiB = 1024 * 1024L;
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::MaxOutstandingMessagesOption>(1000)
.set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
C#
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesWithFlowControlAsyncSample
{
public async Task<int> PullMessagesWithFlowControlAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
int messageCount = 0;
SubscriberClient subscriber = await new SubscriberClientBuilder
{
SubscriptionName = subscriptionName,
Settings = new SubscriberClient.Settings
{
AckExtensionWindow = TimeSpan.FromSeconds(4),
AckDeadline = TimeSpan.FromSeconds(10),
FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
}
}.BuildAsync();
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string text = message.Data.ToStringUtf8();
Console.WriteLine($"Message {message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import (
"context"
"fmt"
"io"
"cloud.google.com/go/pubsub"
)
func pullMsgsFlowControlSettings(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %w", err)
}
defer client.Close()
sub := client.Subscription(subID)
// MaxOutstandingMessages is the maximum number of unprocessed messages the
// subscriber client will pull from the server before pausing. This also configures
// the maximum number of concurrent handlers for received messages.
//
// For more information, see https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
sub.ReceiveSettings.MaxOutstandingMessages = 100
// MaxOutstandingBytes is the maximum size of unprocessed messages,
// that the subscriber client will pull from the server before pausing.
sub.ReceiveSettings.MaxOutstandingBytes = 1e8
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
msg.Ack()
})
if err != nil {
return fmt.Errorf("sub.Receive: %w", err)
}
return nil
}
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeWithFlowControlSettingsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeWithFlowControlSettingsExample(projectId, subscriptionId);
}
public static void subscribeWithFlowControlSettingsExample(
String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
// The subscriber will pause the message stream and stop receiving more messsages from the
// server if any one of the conditions is met.
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
// 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
// the subscriber receives before pausing the message stream.
.setMaxOutstandingElementCount(1000L)
// 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
// receives before pausing the message stream.
.setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
.build();
try {
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
.setFlowControlSettings(flowControlSettings)
.build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const maxInProgress = 5;
// const timeout = 10;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function subscribeWithFlowControlSettings(
subscriptionNameOrId,
maxInProgress,
timeout
) {
const subscriberOptions = {
flowControl: {
maxMessages: maxInProgress,
},
};
// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubSubClient.subscription(
subscriptionNameOrId,
subscriberOptions
);
console.log(
`Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);
const messageHandler = message => {
console.log(`Received message: ${message.id}`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
subscription.on('message', messageHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.close();
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const maxInProgress = 5;
// const timeout = 10;
// Imports the Google Cloud client library
import {Message, PubSub, SubscriberOptions} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
async function subscribeWithFlowControlSettings(
subscriptionNameOrId: string,
maxInProgress: number,
timeout: number
) {
const subscriberOptions: SubscriberOptions = {
flowControl: {
maxMessages: maxInProgress,
},
};
// References an existing subscription.
// Note that flow control settings are not persistent across subscribers.
const subscription = pubSubClient.subscription(
subscriptionNameOrId,
subscriberOptions
);
console.log(
`Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
);
const messageHandler = (message: Message) => {
console.log(`Received message: ${message.id}`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
// "Ack" (acknowledge receipt of) the message
message.ack();
};
subscription.on('message', messageHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.close();
}, timeout * 1000);
}
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message.data!r}.")
message.ack()
# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback, flow_control=flow_control
)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
# subscription_id = "your-subscription-id"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen inventory: 10 do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。