데드 레터 구독에서 메시지의 전송 시도 필드를 출력합니다.
이 코드 샘플이 포함된 문서 페이지
컨텍스트에서 사용된 코드 샘플을 보려면 다음 문서를 참조하세요.
코드 샘플
C#
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.
using Google.Cloud.PubSub.V1;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
public class PullMessagesAsyncWithDeliveryAttemptsSample
{
public async Task<int> PullMessagesAsyncWithDeliveryAttempts(string projectId, string subscriptionId, bool acknowledge)
{
// This is an existing subscription with a dead letter policy.
SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
int deliveryAttempt = 0;
Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
{
string text = Encoding.UTF8.GetString(message.Data.ToArray());
System.Console.WriteLine($"Delivery Attempt: {message.GetDeliveryAttempt()}");
if (message.GetDeliveryAttempt() != null)
{
deliveryAttempt = message.GetDeliveryAttempt().Value;
}
return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
});
// Run for 7 seconds.
await Task.Delay(7000);
await subscriber.StopAsync(CancellationToken.None);
// Lets make sure that the start task finished successfully after the call to stop.
await startTask;
return deliveryAttempt;
}
}
C++
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 참조하세요.
namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
[](pubsub::Subscriber subscriber) {
std::mutex mu;
std::condition_variable cv;
int message_count = 0;
auto session = subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::cout << "Delivery attempt: " << h.delivery_attempt() << "\n";
std::unique_lock<std::mutex> lk(mu);
++message_count;
lk.unlock();
cv.notify_one();
std::move(h).ack();
});
// Wait until at least one message has been received.
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&message_count] { return message_count > 0; });
lk.unlock();
// Cancel the subscription session.
session.cancel();
// Wait for the session to complete, no more callbacks can happen after this
// point.
auto status = session.get();
// Report any final status, blocking.
std::cout << "Message count: " << message_count << ", status: " << status
<< "\n";
}(std::move(subscriber));
Go
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.
import (
"context"
"fmt"
"io"
"time"
"cloud.google.com/go/pubsub"
)
func pullMsgsDeadLetterDeliveryAttempt(w io.Writer, projectID, subID string) error {
// projectID := "my-project-id"
// subID := "my-sub"
ctx := context.Background()
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
return fmt.Errorf("pubsub.NewClient: %v", err)
}
// Receive messages for 10 seconds.
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
sub := client.Subscription(subID)
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// When dead lettering is enabled, the delivery attempt field is a pointer to the
// the number of times the service has attempted to delivery a message.
// Otherwise, the field is nil.
if msg.DeliveryAttempt != nil {
fmt.Fprintf(w, "message: %s, delivery attempts: %d", msg.Data, *msg.DeliveryAttempt)
}
msg.Ack()
})
if err != nil {
return fmt.Errorf("got error in Receive: %v", err)
}
return nil
}
자바
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class ReceiveMessagesWithDeliveryAttemptsExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
// This is an existing subscription with a dead letter policy.
String subscriptionId = "your-subscription-id";
ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample(
projectId, subscriptionId);
}
public static void receiveMessagesWithDeliveryAttemptsExample(
String projectId, String subscriptionId) {
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
// Handle incoming message, then ack the received message.
System.out.println("Id: " + message.getMessageId());
System.out.println("Data: " + message.getData().toStringUtf8());
System.out.println("Delivery Attempt: " + Subscriber.getDeliveryAttempt(message));
consumer.ack();
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
// Start the subscriber.
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} catch (TimeoutException timeoutException) {
// Shut down the subscriber after 30s. Stop receiving messages.
subscriber.stopAsync();
}
}
}
Node.js
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.
/**
* TODO(developer): Uncomment these variables before running the sample.
*/
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');
// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();
async function synchronousPullWithDeliveryAttempts() {
const formattedSubscription = subClient.subscriptionPath(
projectId,
subscriptionName
);
// The maximum number of messages returned for this request.
// Pub/Sub may return fewer than the number specified.
const request = {
subscription: formattedSubscription,
maxMessages: 10,
};
// The subscriber pulls a specified number of messages.
const [response] = await subClient.pull(request);
// Process the messages.
const ackIds = [];
for (const message of response.receivedMessages) {
console.log(`Received message: ${message.message.data}`);
console.log(`Delivery Attempt: ${message.deliveryAttempt}`);
ackIds.push(message.ackId);
}
// Acknowledge all of the messages. You could also ackknowledge
// these individually, but this is more efficient.
const ackRequest = {
subscription: formattedSubscription,
ackIds: ackIds,
};
await subClient.acknowledge(ackRequest);
console.log('Done.');
}
synchronousPullWithDeliveryAttempts().catch(console.error);
PHP
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참조 문서를 참조하세요.
use Google\Cloud\PubSub\Message;
use Google\Cloud\PubSub\PubSubClient;
/**
* Get the delivery attempt from a pulled message.
*
* @param string $projectId The Google project ID.
* @param string $topicName The Pub/Sub topic name.
* @param string $subscriptionName The Pub/Sub subscription name.
* @param string $message The contents of a pubsub message data field.
*/
function dead_letter_delivery_attempt($projectId, $topicName, $subscriptionName, $message)
{
$pubsub = new PubSubClient([
'projectId' => $projectId,
]);
$topic = $pubsub->topic($topicName);
// publish test message
$topic->publish(new Message([
'data' => $message
]));
$subscription = $topic->subscription($subscriptionName);
$messages = $subscription->pull();
foreach ($messages as $message) {
printf('Received message %s' . PHP_EOL, $message->data());
printf('Delivery attempt %d' . PHP_EOL, $message->deliveryAttempt());
}
print('Done' . PHP_EOL);
}
Python
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
def callback(message):
print(f"Received {message}.")
print(f"With delivery attempts: {message.delivery_attempt}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
Ruby
이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new
subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
puts "Received message: #{message.data}"
puts "Delivery Attempt: #{message.delivery_attempt}"
message.acknowledge!
end