구독에서 메시지를 수신하기 위한 빠른 시작 샘플입니다.
더 살펴보기
이 코드 샘플이 포함된 자세한 문서는 다음을 참조하세요.
코드 샘플
C++
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
C#
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.
using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesAsyncSample
{
public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
{
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
int messageCount = 0;
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string text = System.Text.Encoding.UTF8.GetString(message.Data.ToArray());
Console.WriteLine($"Message {message.MessageId}: {text}");
Interlocked.Increment(ref messageCount);
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 5 seconds.
await Task.Delay(5000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return messageCount;
}
}
Go
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 확인하세요.
import (
"context"
"fmt"
"io"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
)
func pullMsgs(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
defer client.Close()
sub := client.Subscription(subID)
// Receive messages for 10 seconds, which simplifies testing.
// Comment this out in production, since `Receive` should
// be used as a long running operation.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var received int32
err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
atomic.AddInt32(&received, 1)
msg.Ack()
})
if err != nil {
return fmt.Errorf("sub.Receive: %v", err)
}
fmt.Fprintf(w, "Received %d messages\n", received)
return nil
}
Java
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참조 문서를 확인하세요.
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class SubscribeAsyncExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
subscribeAsyncExample(projectId, subscriptionId);
}
public static void subscribeAsyncExample(String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForMessages(subscriptionNameOrId, timeout) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
Node.js
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;
// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';
// Creates a client; cache this for further use
const pubSubClient = new PubSub();
function listenForMessages(subscriptionNameOrId: string, timeout: number) {
// References an existing subscription
const subscription = pubSubClient.subscription(subscriptionNameOrId);
// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = (message: Message) => {
console.log(`Received message ${message.id}:`);
console.log(`\tData: ${message.data}`);
console.log(`\tAttributes: ${message.attributes}`);
messageCount += 1;
// "Ack" (acknowledge receipt of) the message
message.ack();
};
// Listen for new messages until timeout is hit
subscription.on('message', messageHandler);
// Wait a while for the subscription to run. (Part of the sample only.)
setTimeout(() => {
subscription.removeListener('message', messageHandler);
console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);
}
PHP
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참조 문서를 확인하세요.
use Google\Cloud\PubSub\PubSubClient;
/**
* Pulls all Pub/Sub messages for a subscription.
*
* @param string $projectId The Google project ID.
* @param string $subscriptionName The Pub/Sub subscription name.
*/
function pull_messages($projectId, $subscriptionName)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$subscription = $pubsub->subscription($subscriptionName);
foreach ($subscription->pull() as $message) {
printf('Message: %s' . PHP_EOL, $message->data());
// Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
$subscription->acknowledge($message);
}
}
Python
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 확인하세요.
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
Ruby
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 확인하세요.
# subscription_id = "your-subscription-id"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_id
subscriber = subscription.listen do |received_message|
puts "Received message: #{received_message.data}"
received_message.acknowledge!
end
subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!
다음 단계
다른 Google Cloud 제품의 코드 샘플을 검색하고 필터링하려면 Google Cloud 샘플 브라우저를 참조하세요.