Pull을 사용하여 메시지 수신

Pub/Sub는 내보내기와 가져오기 메시지 전송을 모두 지원합니다. 가져오기/내보내기 구독에 관한 설명과 비교는 구독자 개요에서 확인할 수 있습니다. 이 문서에서는 가져오기 전달을 설명합니다. 내보내기 전달 관련 내용은 내보내기 구독자 가이드를 참조하세요.

비동기식 가져오기

비동기식 가져오기를 사용하면 애플리케이션이 새 메시지를 차단할 필요가 없어 애플리케이션의 처리량이 증가합니다. 메시지는 장기 실행 메시지 리스너를 통해 애플리케이션에서 수신하며, 아래 예시처럼 한 번에 하나씩 메시지를 확인합니다. 자바, Python, .NET, Go, Ruby 클라이언트는 StreamingPull 서비스 API를 사용하여 비동기 클라이언트 API를 효율적으로 구현합니다.

비동기식 메시지 가져오기를 지원하지 않는 클라이언트 라이브러리도 있습니다. 비동기식 메시지 가져오기에 관한 자세한 내용은 동기식 가져오기를 참조하세요.

자세한 내용은 해당하는 프로그래밍 언어의 API 참조 문서에서 확인할 수 있습니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.

namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber, pubsub::Subscription const& subscription) {
  std::mutex mu;
  std::condition_variable cv;
  int message_count = 0;
  auto session = subscriber.Subscribe(
      subscription, [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::unique_lock<std::mutex> lk(mu);
        ++message_count;
        lk.unlock();
        cv.notify_one();
        std::move(h).ack();
      });
  // Wait until at least one message has been received.
  std::unique_lock<std::mutex> lk(mu);
  cv.wait(lk, [&message_count] { return message_count > 0; });
  lk.unlock();
  // Cancel the subscription session.
  session.cancel();
  // Wait for the session to complete, no more callbacks can happen after this
  // point.
  auto status = session.get();
  // Report any final status, blocking.
  std::cout << "Message count: " << message_count << ", status: " << status
            << "\n";
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
{
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"sync"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// Consume 10 messages.
	var mu sync.Mutex
	received := 0
	sub := client.Subscription(subID)
	cctx, cancel := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
		received++
		if received == 10 {
			cancel()
		}
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages() {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

listenForMessages();

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

커스텀 속성 처리

이 샘플은 메시지를 비동기식으로 가져오고 메타데이터에서 커스텀 속성을 검색하는 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.

namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber, pubsub::Subscription const& subscription) {
  std::mutex mu;
  std::condition_variable cv;
  int message_count = 0;
  auto session = subscriber.Subscribe(
      subscription, [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto const& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";
        }
        std::unique_lock<std::mutex> lk(mu);
        ++message_count;
        lk.unlock();
        cv.notify_one();
        std::move(h).ack();
      });
  // Most applications would just release the `session` object at this point,
  // but we want to gracefully close down this example.
  std::unique_lock<std::mutex> lk(mu);
  cv.wait(lk, [&message_count] { return message_count > 0; });
  lk.unlock();
  session.cancel();
  auto status = session.get();
  std::cout << "Message count: " << message_count << ", status: " << status
            << "\n";
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);
  }

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
          message
              .getAttributesMap()
              .forEach((key, value) -> System.out.println(key + " = " + value));
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message.data))
    if message.attributes:
        print("Attributes:")
        for key in message.attributes:
            value = message.attributes.get(key)
            print("{}: {}".format(key, value))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

오류 수신 대기

이 샘플은 메시지를 구독할 때 발생하는 오류 처리 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.

namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber, pubsub::Subscription const& subscription) {
  std::mutex mu;
  std::condition_variable cv;
  bool done = false;
  int message_count = 0;
  auto session =
      subscriber
          .Subscribe(subscription,
                     [&](pubsub::Message const& m, pubsub::AckHandler h) {
                       std::cout << "Received message " << m << "\n";
                       std::unique_lock<std::mutex> lk(mu);
                       ++message_count;
                       done = true;
                       lk.unlock();
                       cv.notify_one();
                       std::move(h).ack();
                     })
          // Setup an error handler for the subscription session
          .then([&](future<google::cloud::Status> f) {
            std::cout << "Subscription session result: " << f.get() << "\n";
            std::unique_lock<std::mutex> lk(mu);
            done = true;
            cv.notify_one();
          });
  // Most applications would just release the `session` object at this point,
  // but we want to gracefully close down this example.
  std::unique_lock<std::mutex> lk(mu);
  cv.wait(lk, [&done] { return done; });
  lk.unlock();
  session.cancel();
  session.get();
  std::cout << "Message count:" << message_count << "\n";
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);
  }

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setExecutorProvider(executorProvider)
              .build();

      // Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
      // when the current subscriber encounters permanent errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (!executorProvider.getExecutor().isShutdown()) {
                subscribeWithErrorListenerExample(projectId, subscriptionId);
              }
            }
          },
          MoreExecutors.directExecutor());

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors() {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionName);

  // Create an event handler to handle messages
  const messageHandler = function (message) {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = function (error) {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;
  };

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);
}

listenForErrors();

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message))
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except Exception as e:
        streaming_pull_future.cancel()
        print(
            "Listening for messages on {} threw an exception: {}.".format(
                subscription_id, e
            )
        )

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue Exception => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

메시지 흐름 제어

구독자 클라이언트의 메시지 처리 및 확인 속도가 Pub/Sub가 클라이언트에 전송하는 속도보다 느릴 수도 있습니다. 주요 내용은 다음과 같습니다.

  • 받은 메시지의 볼륨을 처리할 용량이 없어 한 클라이언트에 메시지 백로그가 생기지만, 네트워크상의 다른 클라이언트는 이를 처리할 용량이 있습니다. 두 번째 클라이언트는 구독의 백로그를 줄일 수 있지만 첫 번째 클라이언트가 받는 메시지에 대한 임대를 유지하기 때문에 기회를 얻을 수 없습니다. 메시지가 첫 번째 클라이언트에서 정체되어 전체 처리 속도가 감소하게 됩니다.

  • 클라이언트 라이브러리는 백로그된 메시지에 대한 확인 기한을 반복적으로 연장하기 때문에 이러한 메시지는 메모리, CPU, 대역폭 리소스를 계속 소비합니다. 따라서 구독자 클라이언트는 메모리와 같은 리소스가 부족할 수 있습니다. 이는 처리 메시지의 처리량과 지연 시간에 부정적인 영향을 미칠 수 있습니다.

위의 문제를 완화하려면 구독자의 흐름 제어 기능을 사용하여 구독자가 메시지를 받는 속도를 제어합니다. 이러한 흐름 제어 기능은 아래 샘플에서 설명합니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.

namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](pubsub::Subscriber subscriber, pubsub::Subscription const& subscription) {
  std::mutex mu;
  std::condition_variable cv;
  int count = 0;
  auto constexpr kExpectedMessageCount = 4;
  auto handler = [&](pubsub::Message const& m, pubsub::AckHandler h) {
    std::move(h).ack();
    {
      std::lock_guard<std::mutex> lk(mu);
      std::cout << "Received message [" << count << "] " << m.data() << "\n";
      if (++count < kExpectedMessageCount) return;
    }
    cv.notify_one();
  };

  // Change the flow control watermarks, by default the client library uses
  // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
  // size watermarks. Recall that the library stops requesting messages if
  // any of the high watermarks are reached, and the library resumes
  // requesting messages when *both* low watermarks are reached.
  auto constexpr kMiB = 1024 * 1024L;
  auto session = subscriber.Subscribe(
      subscription, std::move(handler),
      pubsub::SubscriptionOptions{}
          .set_message_count_watermarks(/*lwm=*/800, /*hwm=*/1000)
          .set_message_size_watermarks(/*lwm=*/4 * kMiB, /*hwm=*/8 * kMiB));
  {
    std::unique_lock<std::mutex> lk(mu);
    cv.wait(lk, [&] { return count >= kExpectedMessageCount; });
  }
  session.cancel();
  auto status = session.get();
  std::cout << "Message count: " << count << ", status: " << status << "\n";
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithFlowControlAsyncSample
{
    public async Task<int> PullMessagesWithFlowControlAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName,
            settings: new SubscriberClient.Settings()
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            });
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSettings(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}

	sub := client.Subscription(subID)
	sub.ReceiveSettings.Synchronous = true
	// MaxOutstandingMessages is the maximum number of unprocessed messages the
	// client will pull from the server before pausing.
	//
	// This is only guaranteed when ReceiveSettings.Synchronous is set to true.
	// When Synchronous is set to false, the StreamingPull RPC is used which
	// can pull a single large batch of messages at once that is greater than
	// MaxOustandingMessages before pausing. For more info, see
	// https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
	sub.ReceiveSettings.MaxOutstandingMessages = 10
	// MaxOutstandingBytes is the maximum size of unprocessed messages,
	// that the client will pull from the server before pausing. Similar
	// to MaxOutstandingMessages, this may be exceeded with a large batch
	// of messages since we cannot control the size of a batch of messages
	// from the server (even with the synchronous Pull RPC).
	sub.ReceiveSettings.MaxOutstandingBytes = 1e10
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}
	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithFlowControlSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithFlowControlSettingsExample(projectId, subscriptionId);
  }

  public static void subscribeWithFlowControlSettingsExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;

    // The subscriber will pause the message stream and stop receiving more messsages from the
    // server if any one of the conditions is met.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.newBuilder()
            // 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
            // the subscriber receives before pausing the message stream.
            .setMaxOutstandingElementCount(1000L)
            // 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
            // receives before pausing the message stream.
            .setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
            .build();

    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setFlowControlSettings(flowControlSettings)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
// const maxInProgress = 5;
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function subscribeWithFlowControlSettings() {
  const subscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubSubClient.subscription(
    subscriptionName,
    subscriberOptions
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
  );

  const messageHandler = message => {
    console.log(`Received message: ${message.id}`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.close();
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print("Received message: {}".format(message.data))
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control
)
print("Listening for messages on {}..\n".format(subscription_path))

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

대부분의 경우 흐름 제어가 필요하다는 사실은 메시지의 게시 속도가 소비 속도보다 빠르다는 것을 의미합니다. 메시지 볼륨의 일시적인 증가가 아닌 지속적 상태인 경우 구독자 클라이언트 인스턴스 수를 늘려보세요.

동시 실행 제어

동시 실행 지원 여부는 프로그래밍 언어에 따라 다릅니다. 자바, Go와 같은 동시 스레드를 지원하는 언어 구현의 경우 클라이언트 라이브러리에서 스레드 수를 기본으로 선택합니다. 이는 애플리케이션에 가장 적합한 선택이 아닐 수도 있습니다. 예를 들어 구독자 애플리케이션이 받은 메시지 볼륨은 따라잡지 못하지만 CPU의 제한은 받지 않는 경우, 스레드 수를 늘려야 합니다. CPU 집약적인 메시지 처리 작업이라면 스레드 수를 줄여야 할 수도 있습니다.

다음 샘플은 구독자의 동시 실행을 제어하는 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참조 문서를 확인하세요.

namespace pubsub = google::cloud::pubsub;
using google::cloud::future;
using google::cloud::StatusOr;
[](std::string project_id, std::string subscription_id) {
  // Create a subscriber with 16 threads handling I/O work, by default the
  // library creates `std::thread::hardware_concurrency()` threads.
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::ConnectionOptions{}.set_background_thread_pool_size(16)));
  auto subscription =
      pubsub::Subscription(std::move(project_id), std::move(subscription_id));

  std::mutex mu;
  std::condition_variable cv;
  int count = 0;
  auto constexpr kExpectedMessageCount = 4;
  auto handler = [&](pubsub::Message const& m, pubsub::AckHandler h) {
    // This handler executes in the I/O threads, applications could use,
    // std::async(), a thread-pool,
    // google::cloud::CompletionQueue::RunAsync(), or any other mechanism to
    // transfer the execution to other threads.
    std::cout << "Received message " << m << "\n";
    std::move(h).ack();
    {
      std::lock_guard<std::mutex> lk(mu);
      if (++count < kExpectedMessageCount) return;
    }
    cv.notify_one();
  };

  // Create a subscription where up to 8 messages are handled concurrently. By
  // default the library uses `0` and `std::thread::hardwarde_concurrency()`
  // for the concurrency watermarks.
  auto session = subscriber.Subscribe(
      subscription, std::move(handler),
      pubsub::SubscriptionOptions{}.set_concurrency_watermarks(
          /*lwm=*/4, /*hwm=*/8));
  {
    std::unique_lock<std::mutex> lk(mu);
    cv.wait(lk, [&] { return count >= kExpectedMessageCount; });
  }
  session.cancel();
  auto status = session.get();
  std::cout << "Message count: " << count << ", status: " << status << "\n";
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"runtime"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsConcurrenyControl(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Must set ReceiveSettings.Synchronous to false (or leave as default) to enable
	// concurrency settings. Otherwise, NumGoroutines will be set to 1.
	sub.ReceiveSettings.Synchronous = false
	// NumGoroutines is the number of goroutines sub.Receive will spawn to pull messages concurrently.
	sub.ReceiveSettings.NumGoroutines = runtime.NumCPU()

	// Receive messages for 10 seconds.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	defer close(cm)
	// Handle individual messages in a goroutine.
	go func() {
		for msg := range cm {
			fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
			msg.Ack()
		}
	}()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil {
		return fmt.Errorf("Receive: %v", err)
	}

	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithConcurrencyControlExample(projectId, subscriptionId);
  }

  public static void subscribeWithConcurrencyControlExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages. The default `executorProvider` used
      // by the subscriber has a default thread count of 5.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
      // to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
      // subscriber to process messages. Here, the subscriber is configured to open 2 streams for
      // receiving messages, each stream creates a new executor with 4 threads to help process the
      // message callbacks. In total 2x4=8 threads are used for message processing.
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setParallelPullCount(2)
              .setExecutorProvider(executorProvider)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
# Use 2 threads for streaming, 4 threads for executing callbacks and 2 threads
# for sending acknowledgements and/or delays
subscriber   = subscription.listen streams: 2, threads: {
  callback: 4,
  push:     2
} do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

동시 실행 지원 여부는 프로그래밍 언어에 따라 다릅니다. 자세한 내용은 API 참조 문서를 참조하세요.

StreamingPull

Pub/Sub 서비스는 2가지 API를 이용해 메시지를 검색합니다.

가능한 경우 Cloud Client 라이브러리는 최대 처리량과 최저 지연 시간을 제공하는 StreamingPull을 사용합니다. StreamingPull API를 직접 사용할 일이 없다 하더라도, StreamingPull의 주요 특성과 기본 Pull 메서드와의 차이점은 반드시 이해해야 합니다.

Pull 메서드는 요청/응답 모델을 활용합니다.

  1. 클라이언트는 서버에 메시지 요청을 보냅니다.
  2. 서버가 0개 이상의 메시지로 응답하고 연결을 닫습니다.

StreamingPull 서비스 API는 사용 가능한 여러 메시지를 수신하기 위해 다음과 같이 지속적인 양방향 연결을 이용합니다.

  1. 클라이언트는 연결을 설정하라는 요청을 서비스에 보냅니다.
  2. 서버는 연결된 클라이언트에 메시지를 계속 보냅니다.
  3. 결국 클라이언트 또는 서버에 의해 연결이 종료됩니다.

구독자에게 콜백을 제공하면 구독자가 각 메시지에 대한 콜백을 비동기적으로 실행합니다. 구독자가 동일한 순서 키가 포함된 메시지를 수신하면 클라이언트 라이브러리가 콜백을 순차적으로 실행합니다. Pub/Sub 서비스는 이러한 메시지를 동일한 구독자에게 최선의 방식으로 전달합니다.

StreamingPull의 오류율은 100%(예상)

StreamingPull 스트림은 항상 OK(확인)가 아닌 상태로 종료됩니다. 일반 RPC와는 달리, 이 상태는 단순히 스트림이 손상되었다는 의미일 뿐 요청 실패를 의미하지는 않습니다. 따라서 StreamingPull API가 100%라는 놀라운 오류율을 보이더라도, 이는 의도적으로 설계된 것입니다.

StreamingPull 오류 진단

StreamingPull 스트림은 항상 오류로 종료되므로 오류를 진단하는 동안 스트림 종료 측정항목을 검사하는 것은 도움이 되지 않습니다. 그 대신 StreamingPull 메시지 작업 측정항목(subscription/streaming_pull_message_operation_count)에 초점을 맞춥니다. 다음 오류를 찾습니다.

  • 다음의 경우 FAILED_PRECONDITION 오류가 발생할 수 있습니다.
    • Pub/Sub는 사용 중지된 Cloud KMS 키가 있는 메시지의 복호화를 시도합니다.
    • 사용 중지된 Cloud KMS 키로 암호화되는 구독 백로그에 메시지가 있는 경우 구독을 일시적으로 정지할 수 있습니다.
  • 오류 UNAVAILABLE

StreamingPull: 작은 메시지의 큰 백로그 처리하기

gRPC StreamingPull 스택은 높은 처리량에 최적화되어 있어 메시지 버퍼링이 발생합니다. 새 메시지의 지속적인 스트림이 아닌 작은 메시지의 큰 백로그를 처리하려는 경우, 이러한 특성에 영향을 받을 수도 있습니다. 이러한 조건에서는 메시지가 여러 번 전달되지만 클라이언트 간에 부하 분산이 효율적으로 되지 않을 수 있습니다.

Pub/Sub 서비스와 클라이언트 라이브러리 사용자 공간 사이의 버퍼는 약 10MB입니다. 이 버퍼가 클라이언트 라이브러리 동작에 미치는 영향을 이해하려면 다음 예시를 살펴보세요.

  • 구독에 1KB 메시지 10,000개의 백로그가 존재합니다.
  • 단일 스레드 클라이언트 인스턴스로 각 메시지를 순차적으로 처리하는 데 1초가 걸립니다.
  • 해당 구독에 대한 서비스와 StreamingPull 연결을 설정하는 첫 번째 클라이언트 인스턴스는 메시지 1만 개로 버퍼를 모두 채웁니다.
  • 버퍼를 처리하는 데 10,000초(약 3시간)가 걸립니다.
  • 이때 일부 메시지가 확인 기한을 초과해 같은 클라이언트로 재전송되어 중복 메시지가 발생합니다.
  • 여러 클라이언트 인스턴스가 실행중인 경우, 한 클라이언트의 버퍼에 갇힌 메시지는 모든 클라이언트 인스턴스에서 사용할 수 없습니다.

메시지가 단 하나의 대규모 배치가 아닌 일정한 속도로 수신되는 경우에는 이러한 상황이 발생하지 않습니다. 즉, 서비스가 한 번에 10MB의 전체 메시지를 갖지는 않으므로 여러 구독자 간에 메시지 부하를 효과적으로 분산할 수 있습니다.

이 같은 상황을 해결하려면 내보내기 구독이나 Pull API를 이용하세요. 현재 일부 Cloud 클라이언트 라이브러리(동기식 가져오기 섹션 참조)와 모든 API 클라이언트 라이브러리에서 사용할 수 있습니다. 자세한 내용은 클라이언트 라이브러리 문서를 참조하세요.

동기식 가져오기

비동기식 가져오기가 사용자의 애플리케이션에 완벽하게 적합하지 않은 경우가 있습니다. 예를 들어 애플리케이션 로직이 메시지를 검색하기 위해 폴링 패턴에 의존하거나 특정 시간에 클라이언트가 검색하는 메시지의 개수에 정확한 상한을 요구할 수 있습니다. 이러한 애플리케이션을 지원하기 위해 서비스는 동기식 Pull 메서드를 지원합니다.

다음은 고정된 수의 메시지를 가져오고 확인하는 샘플 코드입니다.

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Text;
using System.Threading;

public class PullMessagesSyncSample
{
    public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
        int messageCount = 0;
        try
        {
            // Pull messages from server,
            // allowing an immediate response if there are no messages.
            PullResponse response = subscriberClient.Pull(subscriptionName, returnImmediately: false, maxMessages: 20);
            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                string text = Encoding.UTF8.GetString(msg.Message.Data.ToArray());
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && messageCount > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return messageCount;
    }
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncExample(
      String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // Handle received message
        // ...
        ackIds.add(message.getAckId());
      }
      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPull() {
  const formattedSubscription = subClient.subscriptionPath(
    projectId,
    subscriptionName
  );

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages) {
    console.log(`Received message: ${message.message.data}`);
    ackIds.push(message.ackId);
  }

  // Acknowledge all of the messages. You could also ackknowledge
  // these individually, but this is more efficient.
  const ackRequest = {
    subscription: formattedSubscription,
    ackIds: ackIds,
  };
  await subClient.acknowledge(ackRequest);

  console.log('Done.');
}

synchronousPull().catch(console.error);

PHP

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 확인하세요.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

프로토콜

요청:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

응답:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

요청:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 3

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
    )

    ack_ids = []
    for received_message in response.received_messages:
        print("Received: {}".format(received_message.message.data))
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(
        request={"subscription": subscription_path, "ack_ids": ack_ids}
    )

    print(
        "Received and acknowledged {} messages. Done.".format(
            len(response.received_messages)
        )
    )

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참조 문서를 참조하세요.

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscription.pull.each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

Pub/Sub는 메시지 목록을 전송합니다. 목록에 여러 메시지가 포함된 경우 Pub/Sub는 동일한 순서 키로 메시지를 정렬합니다.

동기식 가져오기로 메시지 전송 지연 시간을 짧게 하려면 동시에 많은 미해결 가져오기 요청을 하는 것이 중요합니다. 주제의 처리량이 증가함에 따라 더 많은 가져오기 요청이 필요합니다. 일반적으로 비동기식 가져오기는 지연 시간에 민감한 애플리케이션에 사용하는 것이 좋습니다.

동기식 가져오기 및 임대 관리

개별 메시지 처리가 임대라고도 하는 미리 구성된 확인 기한을 초과할 수 있습니다. 이러한 메시지의 재전송을 피하기 위해 클라이언트 라이브러리는 다음 샘플에 표시된 대로 확인 기한을 재설정(폴링된 메시지의 확인 기한을 자동으로 수정하는 Go 클라이언트 라이브러리 제외)하는 방법을 제공합니다.

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참조 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"cloud.google.com/go/pubsub"
)

func pullMsgsSync(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Turn on synchronous mode. This makes the subscriber use the Pull RPC rather
	// than the StreamingPull RPC, which is useful for guaranteeing MaxOutstandingMessages,
	// the max number of messages the client will hold in memory at a time.
	sub.ReceiveSettings.Synchronous = true
	sub.ReceiveSettings.MaxOutstandingMessages = 10

	// Receive messages for 5 seconds.
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	// Create a channel to handle messages to as they come in.
	cm := make(chan *pubsub.Message)
	defer close(cm)
	// Handle individual messages in a goroutine.
	go func() {
		for msg := range cm {
			fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
			msg.Ack()
		}
	}()

	// Receive blocks until the passed in context is done.
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		cm <- msg
	})
	if err != nil && status.Code(err) != codes.Canceled {
		return fmt.Errorf("Receive: %v", err)
	}

	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    projectId = "tz-playground-bigdata";
    subscriptionId = "uno";

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      List<String> ackIds = new ArrayList<>();

      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithLeaseManagement() {
  const formattedSubscription = subClient.subscriptionPath(
    projectId,
    subscriptionName
  );

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const maxMessages = 1;
  const newAckDeadlineSeconds = 30;
  const request = {
    subscription: formattedSubscription,
    maxMessages: maxMessages,
  };

  let isProcessed = false;

  // The worker function is meant to be non-blocking. It starts a long-
  // running process, such as writing the message to a table, which may
  // take longer than the default 10-sec acknowledge deadline.
  function worker(message) {
    console.log(`Processing "${message.message.data}"...`);

    setTimeout(() => {
      console.log(`Finished procesing "${message.message.data}".`);
      isProcessed = true;
    }, 30000);
  }

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Obtain the first message.
  const message = response.receivedMessages[0];

  // Send the message to the worker function.
  worker(message);

  let waiting = true;
  while (waiting) {
    await new Promise(r => setTimeout(r, 10000));
    // If the message has been processed..
    if (isProcessed) {
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
      };

      //..acknowledges the message.
      await subClient.acknowledge(ackRequest);
      console.log(`Acknowledged: "${message.message.data}".`);
      // Exit after the message is acknowledged.
      waiting = false;
      console.log('Done.');
    } else {
      // If the message is not yet processed..
      const modifyAckRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
        ackDeadlineSeconds: newAckDeadlineSeconds,
      };

      //..reset its ack deadline.
      await subClient.modifyAckDeadline(modifyAckRequest);

      console.log(
        `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
      );
    }
  }
}

synchronousPullWithLeaseManagement().catch(console.error);

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

import logging
import multiprocessing
import random
import time

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 2
ACK_DEADLINE = 30
SLEEP_TIME = 10

# The subscriber pulls a specific number of messages.
response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": NUM_MESSAGES}
)

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)

def worker(msg):
    """Simulates a long-running process."""
    RUN_TIME = random.randint(1, 60)
    logger.info(
        "{}: Running {} for {}s".format(
            time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME
        )
    )
    time.sleep(RUN_TIME)

# `processes` stores process as key and ack id and message as values.
processes = dict()
for message in response.received_messages:
    process = multiprocessing.Process(target=worker, args=(message,))
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is still running, reset the ack deadline as
        # specified by ACK_DEADLINE once every while as specified
        # by SLEEP_TIME.
        if process.is_alive():
            # `ack_deadline_seconds` must be between 10 to 600.
            subscriber.modify_ack_deadline(
                request={
                    "subscription": subscription_path,
                    "ack_ids": [ack_id],
                    "ack_deadline_seconds": ACK_DEADLINE,
                }
            )
            logger.info(
                "{}: Reset ack deadline for {} for {}s".format(
                    time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE,
                )
            )

        # If the processs is finished, acknowledges using `ack_id`.
        else:
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [ack_id]}
            )
            logger.info(
                "{}: Acknowledged {}".format(
                    time.strftime("%X", time.gmtime()), msg_data
                )
            )
            processes.pop(process)

    # If there are still processes running, sleeps the thread.
    if processes:
        time.sleep(SLEEP_TIME)

print(
    "Received and acknowledged {} messages. Done.".format(
        len(response.received_messages)
    )
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()

확장

메시지 볼륨을 처리하려면 구독자 애플리케이션에 확장 메커니즘을 구현해야 할 수도 있습니다. 이 작업을 수행하는 방법은 환경에 따라 다르며 다만 Google Cloud의 작업 제품군 모니터링 서비스를 통해 제공되는 백로그 측정항목을 기반으로 합니다. Compute Engine에서 이 작업을 수행하는 방법을 알고 싶다면 Cloud Monitoring 측정항목을 바탕으로 한 확장을 참조하세요.

GCP 측정항목 목록의 Pub/Sub 섹션 페이지로 이동하여 프로그래매틱 방식으로 모니터링할 수 있는 측정항목에 대해 알아보세요.

마지막으로 모든 분산 시스템과 마찬가지로, 각 요청을 종종 재시도해야 한다는 사실을 잊지 마세요.

중복 처리 및 강제 재시도

확인 기한이 만료되기 전에 메시지를 확인하지 않으면 Pub/Sub에서 메시지를 다시 보냅니다. 따라서 Pub/Sub는 중복 메시지를 보낼 수 있습니다. Google Cloud의 작업 제품군을 사용하여 확인 작업을 모니터링하고 expired 응답 코드를 사용해 이러한 조건을 감지합니다. 이 데이터를 가져오려면 메시지 확인 작업 측정항목을 선택한 다음 response_code 라벨로 그룹화하거나 필터링합니다. response_code는 측정항목의 시스템 라벨이며 측정항목이 아닙니다.

Stackdriver를 사용하여 만료된 메시지 확인 기한 검색

중복률을 줄이려면 메시지 기한을 연장하세요.

  • 클라이언트 라이브러리는 기한 연장을 자동으로 처리하지만, 구성할 수 있는 최대 연장 기한에 기본 한도가 있습니다.
  • 자체 클라이언트 라이브러리를 빌드하는 경우 modifyAckDeadline 메서드를 사용하여 확인 기한을 연장합니다.

또는 Pub/Sub가 메시지를 다시 시도하도록 하려면 modifyAckDeadline을 0으로 설정하세요.