同期 pull を使用してメッセージを受信し、確認応答の期限を変更します。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
C#
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;
public class PullMessageWithLeaseManagementSample
{
public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
var ackIds = new List<string>();
try
{
PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
ackIds.Add(msg.AckId);
string text = msg.Message.Data.ToStringUtf8();
Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
// Modify the ack deadline of each received message from the default 10 seconds to 30.
// This prevents the server from redelivering the message after the default 10 seconds
// have passed.
subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
}
// If acknowledgement required, send to server.
if (acknowledge && ackIds.Count > 0)
{
subscriberClient.Acknowledge(subscriptionName, ackIds);
}
}
catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
{
// UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
}
return ackIds.Count;
}
}
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SubscribeSyncWithLeaseExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
Integer numOfMessages = 10;
subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
}
public static void subscribeSyncWithLeaseExample(
String projectId, String subscriptionId, Integer numOfMessages)
throws IOException, InterruptedException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20) // 20 MB
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
// Stop the program if the pull response is empty to avoid acknowledging
// an empty list of ack IDs.
if (pullResponse.getReceivedMessagesList().isEmpty()) {
System.out.println("No message was pulled. Exiting.");
return;
}
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
ackIds.add(message.getAckId());
// Modify the ack deadline of each received message from the default 10 seconds to 30.
// This prevents the server from redelivering the message after the default 10 seconds
// have passed.
ModifyAckDeadlineRequest modifyAckDeadlineRequest =
ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscriptionName)
.addAckIds(message.getAckId())
.setAckDeadlineSeconds(30)
.build();
subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
}
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
System.out.println(pullResponse.getReceivedMessagesList());
}
}
}
Node.js
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Node.js 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Node.js API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPullWithLeaseManagement() {
// The low level API client requires a name only.
const formattedSubscription =
subscriptionNameOrId.indexOf('/') >= 0
? subscriptionNameOrId
: subClient.subscriptionPath(projectId, subscriptionNameOrId);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const maxMessages = 1;
const newAckDeadlineSeconds = 30;
const request = {
subscription: formattedSubscription,
maxMessages: maxMessages,
allowExcessMessages: false,
};
let isProcessed = false;
// The worker function is meant to be non-blocking. It starts a long-
// running process, such as writing the message to a table, which may
// take longer than the default 10-sec acknowledge deadline.
function worker(message) {
console.log(`Processing "${message.message.data}"...`);
setTimeout(() => {
console.log(`Finished procesing "${message.message.data}".`);
isProcessed = true;
}, 30000);
}
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Obtain the first message.
const message = response.receivedMessages[0];
// Send the message to the worker function.
worker(message);
let waiting = true;
while (waiting) {
await new Promise(r => setTimeout(r, 10000));
// If the message has been processed..
if (isProcessed) {
const ackRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
};
//..acknowledges the message.
await subClient.acknowledge(ackRequest);
console.log(`Acknowledged: "${message.message.data}".`);
// Exit after the message is acknowledged.
waiting = false;
console.log('Done.');
} else {
// If the message is not yet processed..
const modifyAckRequest = {
subscription: formattedSubscription,
ackIds: [message.ackId],
ackDeadlineSeconds: newAckDeadlineSeconds,
};
//..reset its ack deadline.
await subClient.modifyAckDeadline(modifyAckRequest);
console.log(
`Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
);
}
}
}
synchronousPullWithLeaseManagement().catch(console.error);
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
import logging
import multiprocessing
import sys
import time
from google.api_core import retry
from google.cloud import pubsub_v1
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": 3},
retry=retry.Retry(deadline=300),
)
if len(response.received_messages) == 0:
return
# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
process = multiprocessing.Process(
target=time.sleep, args=(sys.getsizeof(message) % 10,)
)
processes[process] = (message.ack_id, message.message.data)
process.start()
while processes:
# Take a break every second.
if processes:
time.sleep(1)
for process in list(processes):
ack_id, msg_data = processes[process]
# If the process is running, reset the ack deadline.
if process.is_alive():
subscriber.modify_ack_deadline(
request={
"subscription": subscription_path,
"ack_ids": [ack_id],
# Must be between 10 and 600.
"ack_deadline_seconds": 15,
}
)
logger.debug(f"Reset ack deadline for {msg_data}.")
# If the process is complete, acknowledge the message.
else:
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": [ack_id]}
)
logger.debug(f"Acknowledged {msg_data}.")
processes.pop(process)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()
Ruby
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。
# subscription_id = "your-subscription-id"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
new_ack_deadline = 30
processed = false
# The subscriber pulls a specified number of messages.
received_messages = subscription.pull immediate: false, max: 1
# Obtain the first message.
message = received_messages.first
# Send the message to a non-blocking worker that starts a long-running process, such as writing
# the message to a table, which may take longer than the default 10-sec acknowledge deadline.
Thread.new do
sleep 15
processed = true
puts "Finished processing \"#{message.data}\"."
end
loop do
sleep 1
if processed
# If the message has been processed, acknowledge the message.
message.acknowledge!
puts "Done."
# Exit after the message is acknowledged.
break
else
# If the message has not yet been processed, reset its ack deadline.
message.modify_ack_deadline! new_ack_deadline
puts "Reset ack deadline for \"#{message.data}\" for #{new_ack_deadline} seconds."
end
end
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。