同期 pull を使用してメッセージを受信します。
もっと見る
このコードサンプルを含む詳細なドキュメントについては、以下をご覧ください。
コードサンプル
C++
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C++ 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C++ API のリファレンス ドキュメントをご覧ください。
[](google::cloud::pubsub::Subscriber subscriber) {
auto response = subscriber.Pull();
if (!response) throw std::move(response).status();
std::cout << "Received message " << response->message << "\n";
std::move(response->handler).ack();
}
C#
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。
using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Text;
using System.Threading;
public class PullMessagesSyncSample
{
public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
int messageCount = 0;
try
{
// Pull messages from server,
// allowing an immediate response if there are no messages.
PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
// Print out each received message.
foreach (ReceivedMessage msg in response.ReceivedMessages)
{
string text = System.Text.Encoding.UTF8.GetString(msg.Message.Data.ToArray());
Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
}
// If acknowledgement required, send to server.
if (acknowledge && messageCount > 0)
{
subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
}
}
catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
{
// UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
}
return messageCount;
}
}
Go
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Go 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
)
func pullMsgsSync(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
// the max number of messages the client will hold in memory at a time.
sub.ReceiveSettings.Synchronous = true
sub.ReceiveSettings.MaxOutstandingMessages = 10
// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var received int32
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
atomic.AddInt32(&received, 1)
msg.Ack()
})
if err != nil {
return fmt.Errorf("sub.Receive: %v", err)
}
fmt.Fprintf(w, "Received %d messages\n", received)
return nil
}
Java
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class SubscribeSyncExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
Integer numOfMessages = 10;
subscribeSyncExample(projectId, subscriptionId, numOfMessages);
}
public static void subscribeSyncExample(
String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
// Stop the program if the pull response is empty to avoid acknowledging
// an empty list of ack IDs.
if (pullResponse.getReceivedMessagesList().isEmpty()) {
System.out.println("No message was pulled. Exiting.");
return;
}
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
// Handle received message
// ...
ackIds.add(message.getAckId());
}
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
System.out.println(pullResponse.getReceivedMessagesList());
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPull(projectId, subscriptionNameOrId) {
// The low level API client requires a name only.
const formattedSubscription =
subscriptionNameOrId.indexOf('/') >= 0
? subscriptionNameOrId
: subClient.subscriptionPath(projectId, subscriptionNameOrId);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const request = {
subscription: formattedSubscription,
maxMessages: 10,
};
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Process the messages.
const ackIds = [];
for (const message of response.receivedMessages || []) {
console.log(`Received message: ${message.message.data}`);
if (message.ackId) {
ackIds.push(message.ackId);
}
}
if (ackIds.length !== 0) {
// Acknowledge all of the messages. You could also acknowledge
// these individually, but this is more efficient.
const ackRequest = {
subscription: formattedSubscription,
ackIds: ackIds,
};
await subClient.acknowledge(ackRequest);
}
console.log('Done.');
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
import {v1} from '@google-cloud/pubsub';
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPull(
projectId: string,
subscriptionNameOrId: string
) {
// The low level API client requires a name only.
const formattedSubscription =
subscriptionNameOrId.indexOf('/') >= 0
? subscriptionNameOrId
: subClient.subscriptionPath(projectId, subscriptionNameOrId);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const request = {
subscription: formattedSubscription,
maxMessages: 10,
};
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Process the messages.
const ackIds: string[] = [];
for (const message of response.receivedMessages ?? []) {
console.log(`Received message: ${message.message?.data}`);
if (message.ackId) {
ackIds.push(message.ackId);
}
}
if (ackIds.length !== 0) {
// Acknowledge all of the messages. You could also acknowledge
// these individually, but this is more efficient.
const ackRequest = {
subscription: formattedSubscription,
ackIds: ackIds,
};
await subClient.acknowledge(ackRequest);
}
console.log('Done.');
}
PHP
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub PHP API のリファレンス ドキュメントをご覧ください。
use Google\Cloud\PubSub\PubSubClient;
/**
* Pulls all Pub/Sub messages for a subscription.
*
* @param string $projectId The Google project ID.
* @param string $subscriptionName The Pub/Sub subscription name.
*/
function pull_messages($projectId, $subscriptionName)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionName);
foreach ($subscription->pull() as $message) {
printf('Message: %s' . PHP_EOL, $message->data());
// Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
$subscription->acknowledge($message);
}
}
Python
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Python API のリファレンス ドキュメントをご覧ください。
from google.api_core import retry
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
NUM_MESSAGES = 3
# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
# The subscriber pulls a specific number of messages. The actual
# number of messages pulled may be smaller than max_messages.
response = subscriber.pull(
request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
retry=retry.Retry(deadline=300),
)
if len(response.received_messages) == 0:
return
ack_ids = []
for received_message in response.received_messages:
print(f"Received: {received_message.message.data}.")
ack_ids.append(received_message.ack_id)
# Acknowledges the received messages so they will not be sent again.
subscriber.acknowledge(
request={"subscription": subscription_path, "ack_ids": ack_ids}
)
print(
f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)
Ruby
このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Ruby API のリファレンス ドキュメントをご覧ください。
# subscription_id = "your-subscription-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscription.pull(immediate: false).each do |message|
puts "Message pulled: #{message.data}"
message.acknowledge!
end
次のステップ
他の Google Cloud プロダクトに関連するコードサンプルの検索およびフィルタ検索を行うには、Google Cloud のサンプルをご覧ください。