pull 구독

이 문서에서는 가져오기 구독, 워크플로, 관련 속성을 간략하게 설명합니다.

pull 구독에서 구독자 클라이언트는 Pub/Sub 서버에서 메시지를 요청합니다.

pull 모드는 두 개의 서비스 API인 Pull 또는 StreamingPull 중 하나를 사용할 수 있습니다. 선택한 API를 실행하려면 Google에서 제공하는 상위 수준의 클라이언트 라이브러리나 하위 수준의 자동 생성 클라이언트 라이브러리를 선택하면 됩니다. 비동기 메시지나 동기 메시지 처리 중에서 선택할 수도 있습니다.

시작하기 전에

이 문서를 읽기 전 다음 내용을 숙지해야 합니다.

  • Pub/Sub 작동 방법과 여러 가지 Pub/Sub 용어

  • Pub/Sub가 지원하는 다양한 종류의 구독과 pull 구독을 사용해야 하는 이유

Pull 구독 워크플로

Pull 구독의 경우 구독자 클라이언트가 Pub/Sub 서버에 메시지 검색을 요청하기 시작합니다. 구독자 클라이언트는 다음 API 중 하나를 사용합니다.

대부분의 구독자 클라이언트는 이러한 요청을 직접 수행하지 않습니다. 대신 클라이언트는 스트리밍 pull 요청을 내부적으로 수행하고 메시지를 비동기식으로 전송하는 Google Cloud가 제공하는 높은 수준의 클라이언트 라이브러리를 사용합니다. 메시지 가져오기 방법을 더 잘 제어해야 하는 구독자 클라이언트의 경우 Pub/Sub는 자동으로 생성되는 낮은 수준의 gRPC 라이브러리를 사용합니다. 이 라이브러리는 pull 또는 스트리밍 pull 요청을 직접 수행합니다. 이러한 요청은 동기식 또는 비동기식일 수 있습니다.

다음 두 이미지는 구독자 클라이언트와 pull 구독 간의 워크플로를 보여줍니다.

pull 구독의 메시지 흐름
그림 1. pull 구독 워크플로



streamingPull 구독의 메시지 흐름
그림 2. 스트리밍 pull 구독 워크플로

워크플로 가져오기

pull 워크플로는 다음과 같으며 그림 1을 참조합니다.

  1. 구독자 클라이언트는 pull 방법을 명시적으로 호출하여 메시지 전달을 요청합니다. 이 요청은 이미지에 표시된 PullRequest입니다.
  2. Pub/Sub 서버는 0개 이상의 메시지와 확인 ID로 응답합니다. 메시지가 0개이거나 오류가 있는 응답이 반드시 수신 가능한 메시지가 없음을 나타내는 것은 아닙니다. 이 응답은 이미지에 표시된 PullResponse입니다.

  3. 구독자 클라이언트는 acknowledge 메서드를 명시적으로 호출합니다. 클라이언트는 반환된 확인 ID를 사용하여 메시지가 처리되었으며 다시 전달할 필요가 없음을 확인합니다.

단일 streaming pull 요청의 경우 구독자 클라이언트가 열려 있는 연결 때문에 여러 개의 응답을 반환할 수 있습니다. 반대로 각 pull 요청에는 하나의 응답만 반환됩니다.

pull 구독의 속성

Pull 구독으로 구성하는 속성은 구독에 메시지를 쓰는 방법을 결정합니다. 자세한 내용은 구독 속성을 참조하세요.

Pub/Sub 서비스 API

Pub/Sub pull 구독은 다음 두 API 중 하나를 사용하여 메시지를 검색할 수 있습니다.

  • Pull
  • StreamingPull

이러한 API를 사용하여 메시지를 수신할 때는 단항 확인 및 ModifyAckDeadline RPC를 사용합니다. 두 가지 Pub/Sub API는 다음 탭에 설명되어 있습니다.

StreamingPull API

가능한 경우 Pub/Sub 클라이언트 라이브러리는 최대 처리량과 최저 지연 시간을 제공하는 StreamingPull을 사용합니다. StreamingPull API를 직접 사용할 일이 없다 하더라도, Pull API와 어떻게 다른지 알아야 합니다.

StreamingPull API는 사용 가능한 여러 메시지를 수신하기 위해 다음과 같이 지속적인 양방향 연결을 이용합니다. 워크플로는 다음과 같습니다.

  1. 클라이언트는 연결을 설정하라는 요청을 서비스에 보냅니다. 연결 할당량이 초과되면 서버에서 리소스 소진 오류를 반환합니다. 클라이언트 라이브러리는 할당량 부족 오류를 자동으로 재시도합니다.

  2. 오류가 없거나 연결 할당량을 다시 사용할 수 있으면 서버가 연결된 클라이언트에 메시지를 계속 전송합니다.

  3. 처리량 할당량이 초과되면 서버에서 메시지 전송을 중지합니다. 하지만 연결이 끊어지지는 않습니다. 다시 사용 가능한 처리량 할당량이 충분할 때마다 스트림이 재개됩니다.

  4. 클라이언트 또는 서버는 결국 연결을 닫습니다.

StreamingPull API는 열린 연결을 유지합니다. Pub/Sub 서버는 장기 실행 고정 연결을 방지하기 위해 일정 기간이 지나면 연결을 반복적으로 닫습니다. 클라이언트 라이브러리는 StreamingPull 연결을 자동으로 다시 엽니다.

메시지를 사용할 수 있을 때 연결로 전송됩니다. 따라서 StreamingPull API는 지연 시간을 최소화하고 메시지 처리량을 극대화합니다.

StreamingPull REST 메서드인 StreamingPullRequestStreamingPullResponse에 대해 자세히 알아보세요.

StreamingPull RPC 메서드인 StreamingPullRequestStreamingPullResponse에 대해 자세히 알아보세요.

Pull API

이 API는 요청 및 응답 모델을 기반으로 하는 기존의 단항 RPC입니다. 단일 pull 응답은 단일 pull 요청에 해당합니다. 워크플로는 다음과 같습니다.

  1. 클라이언트는 서버에 메시지 요청을 보냅니다. 처리량 할당량이 초과되면 서버에서 리소스 소진 오류가 반환됩니다.

  2. 오류가 없거나 처리량 할당량을 다시 사용할 수 있는 경우 서버는 0개 이상의 메시지와 확인 ID로 응답합니다.

단항 Pull API를 사용할 때 메시지가 없거나 오류가 있다는 응답이 수신 가능한 메시지가 없다는 뜻은 아닙니다.

Pull API를 사용해도 지연 시간이 짧고 메시지 처리량이 많다고 보장할 수 없습니다. Pull API를 통해 높은 처리량과 짧은 지연 시간을 달성하려면 동시에 여러 개의 미해결 요청이 있어야 합니다. 이전 요청이 응답을 받으면 새 요청이 생성됩니다. 이러한 솔루션의 아키텍처 설계는 오류가 발생하기 쉬우며 유지관리하기 어렵습니다. 이러한 사용 사례에는 StreamingPull API를 사용하는 것이 좋습니다.

다음을 엄격하게 제어해야 하는 경우에만 StreamingPull API 대신 Pull API를 사용합니다.

  • 구독자 클라이언트가 처리할 수 있는 메시지 수
  • 클라이언트 메모리 및 리소스

구독자가 Pub/Sub와 보다 pull 중심적인 방식으로 작동하는 다른 서비스 사이의 프록시인 경우에도 이 API를 사용할 수 있습니다.

Pull REST 메서드에 대한 자세한 내용은 메서드: projects.subscriptions.pull을 참조하세요.

Pull RPC 메서드에 대한 자세한 내용은 PullRequestPullResponse를 참조하세요.

메시지 처리 모드 유형

구독자 클라이언트에서 다음 pull 모드 중 하나를 선택합니다.

비동기식 pull 모드

비동기식 pull 모드는 구독자 클라이언트의 메시지 처리에서 메시지 수신을 분리합니다. 이 모드는 대부분의 구독자 클라이언트에서 기본값입니다. 비동기식 pull 모드는 StreamingPull API 또는 단항 Pull API를 사용할 수 있습니다. 비동기식 pull은 상위 수준의 클라이언트 라이브러리 또는 하위 수준의 자동 생성 클라이언트 라이브러리를 사용할 수도 있습니다.

클라이언트 라이브러리에 대한 자세한 내용은 이 문서 뒷부분을 참조하세요.

동기식 pull 모드

동기식 pull 모드에서는 메시지 수신 및 처리가 순차적으로 수행되며 서로 분리되지 않습니다. 따라서 StreamingPull과 단항 Pull API와 비슷하지만, 비동기식 처리는 동기식 처리보다 지연 시간이 짧고 처리량이 높습니다.

다른 요구사항에 비해 짧은 지연 시간과 높은 처리량이 가장 중요한 요소가 아닌 애플리케이션에만 동기식 pull 모드를 사용하세요. 예를 들어 애플리케이션이 동기식 프로그래밍 모델만 사용하도록 제한될 수 있습니다. 또는 리소스 제약조건이 있는 애플리케이션에는 메모리, 네트워크, CPU를 더 정확하게 제어해야 할 수 있습니다. 이러한 경우 단항 Pull API에 동기식 모드를 사용합니다.

Pub/Sub 클라이언트 라이브러리

Pub/Sub는 높은 수준 및 낮은 수준의 자동 생성 클라이언트 라이브러리를 제공합니다.

상위 수준 Pub/Sub 클라이언트 라이브러리

상위 수준 클라이언트 라이브러리는 임대 관리를 사용하여 확인 기한을 제어할 수 있는 옵션을 제공합니다. 이러한 옵션은 구독 수준에서 콘솔 또는 CLI를 사용하여 확인 기한을 구성할 때보다 더 세분화됩니다. 상위 수준 클라이언트 라이브러리에서는 순서가 지정된 전송, 1회만 전송, 흐름 제어와 같은 기능도 지원합니다.

상위 수준 클라이언트 라이브러리와 함께 비동기식 pull 및 StreamingPull API를 사용하는 것이 좋습니다. Google Cloud에서 지원하는 모든 언어가 상위 수준 클라이언트 라이브러리의 Pull API를 지원하는 것은 아닙니다.

상위 수준 클라이언트 라이브러리를 사용하려면 Pub/Sub 클라이언트 라이브러리를 참조하세요.

하위 수준 자동 생성 Pub/Sub 클라이언트 라이브러리

Pull API를 직접 사용해야 하는 경우 하위 수준 클라이언트 라이브러리를 사용할 수 있습니다. 하위 수준 자동 생성 클라이언트 라이브러리에서 동기 또는 비동기 처리를 사용할 수 있습니다. 하위 수준 자동 생성 클라이언트 라이브러리를 사용할 때는 정렬된 전송, 1회만 전송, 흐름 제어, 임대 관리와 같은 기능을 수동으로 코딩해야 합니다.

지원되는 모든 언어에 하위 수준의 자동 생성 클라이언트 라이브러리를 사용할 때 동기 처리 모델을 사용할 수 있습니다. Pull API를 직접 사용하는 것이 적절한 경우 하위 수준 자동 생성 클라이언트 라이브러리와 동기식 pull을 사용할 수 있습니다. 예를 들어 이 모델을 사용하는 기존 애플리케이션 로직이 있을 수 있습니다.

하위 수준 자동 생성 클라이언트 라이브러리를 직접 사용하려면 Pub/Sub API 개요를 참조하세요.

클라이언트 라이브러리 코드 샘플

StreamingPull 및 상위 수준 클라이언트 라이브러리 코드 샘플

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
{
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

상위 수준 클라이언트 라이브러리를 사용하여 커스텀 속성 검색

다음 샘플은 메시지를 비동기식으로 가져오고 메타데이터에서 커스텀 속성을 검색하는 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";
        }
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithCustomAttributesAsyncSample
{
    public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        var messages = new List<PubsubMessage>();
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            messages.Add(message);
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            if (message.Attributes != null)
            {
                foreach (var attribute in message.Attributes)
                {
                    Console.WriteLine($"{attribute.Key} = {attribute.Value}");
                }
            }
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messages;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
		fmt.Fprintln(w, "Attributes:")
		for key, value := range msg.Attributes {
			fmt.Fprintf(w, "%s = %s\n", key, value)
		}
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}

	return nil
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);
  }

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
          message
              .getAttributesMap()
              .forEach((key, value) -> System.out.println(key + " = " + value));
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenWithCustomAttributes(subscriptionNameOrId, timeout) {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    console.log(
      `Received message: id ${message.id}, data ${
        message.data
      }, attributes: ${JSON.stringify(message.attributes)}`
    );

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Wait a while for the subscription to run. (Part of the sample only.)
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data!r}.")
    if message.attributes:
        print("Attributes:")
        for key in message.attributes:
            value = message.attributes.get(key)
            print(f"{key}: {value}")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

상위 수준 클라이언트 라이브러리를 사용하여 오류 처리

다음 샘플은 메시지를 구독할 때 발생하는 오류 처리 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber
      .Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      })
      // Setup an error handler for the subscription session
      .then([](future<google::cloud::Status> f) {
        std::cout << "Subscription session result: " << f.get() << "\n";
      });
};

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %w", err)
	}
	return nil
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);
  }

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setExecutorProvider(executorProvider)
              .build();

      // Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
      // when the current subscriber encounters permanent errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (!executorProvider.getExecutor().isShutdown()) {
                subscribeWithErrorListenerExample(projectId, subscriptionId);
              }
            }
          },
          MoreExecutors.directExecutor());

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = error => {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;
  };

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except Exception as e:
        print(
            f"Listening for messages on {subscription_path} threw an exception: {e}."
        )
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  subscriber.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  subscriber.stop.wait!
rescue StandardError => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

단항 pull 코드 샘플

다음은 고정된 수의 메시지를 가져오고 확인하는 샘플 코드입니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

[](google::cloud::pubsub::Subscriber subscriber) {
  auto response = subscriber.Pull();
  if (!response) throw std::move(response).status();
  std::cout << "Received message " << response->message << "\n";
  std::move(response->handler).ack();
}

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

[](google::cloud::pubsub::Subscriber subscriber) {
  auto response = subscriber.Pull();
  if (!response) throw std::move(response).status();
  std::cout << "Received message " << response->message << "\n";
  std::move(response->handler).ack();
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Threading;

public class PullMessagesSyncSample
{
    public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
        int messageCount = 0;
        try
        {
            // Pull messages from server,
            // allowing an immediate response if there are no messages.
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && messageCount > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return messageCount;
    }
}

Java

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncExample(
      String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // Handle received message
        // ...
        ackIds.add(message.getAckId());
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPull(projectId, subscriptionNameOrId) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${message.message.data}`);
    if (message.ackId) {
      ackIds.push(message.ackId);
    }
  }

  if (ackIds.length !== 0) {
    // Acknowledge all of the messages. You could also acknowledge
    // these individually, but this is more efficient.
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: ackIds,
    };

    await subClient.acknowledge(ackRequest);
  }

  console.log('Done.');
}

PHP

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.pull(immediate: false).each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

프로토콜

요청:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

응답:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

요청:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from google.api_core import retry
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 3

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages. The actual
    # number of messages pulled may be smaller than max_messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
        retry=retry.Retry(deadline=300),
    )

    if len(response.received_messages) == 0:
        return

    ack_ids = []
    for received_message in response.received_messages:
        print(f"Received: {received_message.message.data}.")
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(
        request={"subscription": subscription_path, "ack_ids": ack_ids}
    )

    print(
        f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
    )

Pub/Sub는 메시지 목록을 전송합니다. 목록에 여러 메시지가 포함된 경우 Pub/Sub는 동일한 순서 키로 메시지를 정렬합니다. 다음은 몇 가지 중요한 주의사항입니다.

  • 요청에서 max_messages 값을 설정하면 백로그에 메시지가 많은 경우에도 max_messages가 반환되지 않습니다. Pub/Sub Pull API는 전송할 수 있는 메시지의 전송 지연 시간을 줄이기 위해 max_messages보다 적게 반환할 수 있습니다.

  • 메시지가 없는 pull 응답을 백로그에 메시지가 없음을 나타내는 지표로 사용해서는 안 됩니다. 메시지가 없는 응답이 수신되고 메시지를 반환하는 후속 요청이 있을 수 있습니다.

  • 단항 pull 모드로 메시지 전송 지연 시간을 짧게 하려면 동시에 많은 미해결 pull 요청을 하는 것이 중요합니다. 주제의 처리량이 증가함에 따라 더 많은 pull 요청이 필요합니다. 일반적으로 StreamingPull 모드는 지연 시간에 민감한 애플리케이션에 사용하는 것이 좋습니다.

할당량 및 한도

Pull 및 StreamingPull 연결에는 할당량과 한도가 적용됩니다. 자세한 내용은 Pub/Sub 할당량 및 한도를 참조하세요.

다음 단계

  • 주제에 대한 pull 구독을 만듭니다.

  • gcloud CLI를 사용하여 구독을 만들거나 수정합니다.

  • REST API로 구독을 만들거나 수정하기

  • RPC API로 구독을 만들거나 수정하기