가져오기 구독에서 메시지 수신

이 문서에서는 풀 구독에서 메시지를 수신하는 방법을 설명합니다. Google Cloud 콘솔, Google Cloud CLI, 클라이언트 라이브러리 또는 Pub/Sub API를 사용하여 pull 구독을 만들 수 있습니다.

시작하기 전에

필수 역할 및 권한

구독에서 메시지를 가져오고 관리하는 데 필요한 권한을 얻으려면 관리자에게 프로젝트에 대한 Pub/Sub 구독자 (roles/pubsub.subscriber) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

이 사전 정의된 역할에는 구독에서 메시지를 가져오고 관리하는 데 필요한 권한이 포함되어 있습니다. 필요한 정확한 권한을 보려면 필수 권한 섹션을 펼치세요.

필수 권한

구독에서 메시지를 가져오고 관리하려면 다음 권한이 필요합니다.

  • 구독에서 가져오기: pubsub.subscriptions.consume
  • 구독 만들기: pubsub.subscriptions.create
  • 구독 삭제: pubsub.subscriptions.delete
  • 구독 가져오기: pubsub.subscriptions.get
  • 구독 나열: pubsub.subscriptions.list
  • 구독 업데이트: pubsub.subscriptions.update
  • 주제에 구독 연결하기: pubsub.topics.attachSubscription
  • 구독의 IAM 정책 가져오기: pubsub.subscriptions.getIamPolicy
  • 구독의 IAM 정책 구성: pubsub.subscriptions.setIamPolicy
  • 가져오기 구독에 구독에서 메시지 사용 권한을 부여합니다. pubsub.subscriptions.consume

커스텀 역할이나 다른 사전 정의된 역할을 사용하여 이 권한을 부여받을 수도 있습니다.

구독에서 메시지 가져오기

다음 샘플은 StreamingPull API 또는 Pull API를 사용하여 구독에서 메시지를 가져오는 방법을 보여줍니다.

StreamingPull API

StreamingPull API를 사용하려면 클라이언트 라이브러리를 사용해야 합니다.

Google Cloud 콘솔과 Google Cloud CLI는 StreamingPull API를 지원하지 않습니다.

StreamingPull 및 상위 수준 클라이언트 라이브러리 코드 샘플

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
{
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

다음 샘플에서는 Go Pub/Sub 클라이언트 라이브러리의 주요 버전 (v2)을 사용합니다. 아직 v1 라이브러리를 사용하고 있다면 v2로의 마이그레이션 가이드를 참고하세요. v1 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

다음 샘플에서는 Ruby Pub/Sub 클라이언트 라이브러리 v3을 사용합니다. 아직 v2 라이브러리를 사용하고 있다면 v3로의 마이그레이션 가이드를 참고하세요. Ruby v2 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

상위 수준 클라이언트 라이브러리를 사용하여 커스텀 속성 검색

다음 샘플은 메시지를 비동기식으로 가져오고 메타데이터에서 커스텀 속성을 검색하는 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";
        }
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithCustomAttributesAsyncSample
{
    public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        var messages = new List<PubsubMessage>();
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            messages.Add(message);
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            if (message.Attributes != null)
            {
                foreach (var attribute in message.Attributes)
                {
                    Console.WriteLine($"{attribute.Key} = {attribute.Value}");
                }
            }
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messages;
    }
}

Go

다음 샘플에서는 Go Pub/Sub 클라이언트 라이브러리의 주요 버전 (v2)을 사용합니다. 아직 v1 라이브러리를 사용하고 있다면 v2로의 마이그레이션 가이드를 참고하세요. v1 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
		fmt.Fprintln(w, "Attributes:")
		for key, value := range msg.Attributes {
			fmt.Fprintf(w, "%s = %s\n", key, value)
		}
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}

	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);
  }

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
          message
              .getAttributesMap()
              .forEach((key, value) -> System.out.println(key + " = " + value));
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenWithCustomAttributes(subscriptionNameOrId, timeout) {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    console.log(
      `Received message: id ${message.id}, data ${
        message.data
      }, attributes: ${JSON.stringify(message.attributes)}`,
    );

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Wait a while for the subscription to run. (Part of the sample only.)
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data!r}.")
    if message.attributes:
        print("Attributes:")
        for key in message.attributes:
            value = message.attributes.get(key)
            print(f"{key}: {value}")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

다음 샘플에서는 Ruby Pub/Sub 클라이언트 라이브러리 v3을 사용합니다. 아직 v2 라이브러리를 사용하고 있다면 v3로의 마이그레이션 가이드를 참고하세요. Ruby v2 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen do |received_message|
  puts "Received message: #{received_message.data}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"
    end
  end
  received_message.acknowledge!
end

listener.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
listener.stop.wait!

상위 수준 클라이언트 라이브러리를 사용하여 오류 처리

다음 샘플은 메시지를 구독할 때 발생하는 오류 처리 방법을 보여줍니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber
      .Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      })
      // Setup an error handler for the subscription session
      .then([](future<google::cloud::Status> f) {
        std::cout << "Subscription session result: " << f.get() << "\n";
      });
};

Go

다음 샘플에서는 Go Pub/Sub 클라이언트 라이브러리의 주요 버전 (v2)을 사용합니다. 아직 v1 라이브러리를 사용하고 있다면 v2로의 마이그레이션 가이드를 참고하세요. v1 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("Receive: %w", err)
	}
	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);
  }

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setExecutorProvider(executorProvider)
              .build();

      // Listen for unrecoverable failures.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println("Unrecoverable subscriber failure:" + failure.getStackTrace());
            }
          },
          MoreExecutors.directExecutor());

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Create an event handler to handle errors
  const errorHandler = error => {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;
  };

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except Exception as e:
        print(
            f"Listening for messages on {subscription_path} threw an exception: {e}."
        )
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

다음 샘플에서는 Go Pub/Sub 클라이언트 라이브러리의 주요 버전 (v2)을 사용합니다. 아직 v1 라이브러리를 사용하고 있다면 v2로의 마이그레이션 가이드를 참고하세요. v1 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

listener = subscriber.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

# Propagate exception from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

begin
  listener.start
  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
  listener.stop.wait!
rescue StandardError => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."
end

단항 가져오기

유니 API를 사용하려면 Google Cloud 콘솔, Google Cloud CLI 또는 클라이언트 라이브러리를 사용하면 됩니다.

고려사항

Pub/Sub는 메시지 목록을 전송합니다. 목록에 여러 메시지가 포함된 경우 Pub/Sub는 동일한 순서 키로 메시지를 정렬합니다. 다음은 몇 가지 중요한 주의사항입니다.

  • 요청에서 max_messages 값을 설정하면 백로그에 메시지가 많은 경우에도 max_messages가 반환되지 않습니다. Pub/Sub Pull API는 전송할 수 있는 메시지의 전송 지연 시간을 줄이기 위해 max_messages보다 적게 반환할 수 있습니다.

  • 메시지가 없는 pull 응답을 백로그에 메시지가 없음을 나타내는 지표로 사용해서는 안 됩니다. 메시지가 없는 응답이 수신되고 메시지를 반환하는 후속 요청이 있을 수 있습니다.

콘솔

  1. Google Cloud 콘솔에서 Pub/Sub 구독 페이지로 이동합니다.

    구독 페이지로 이동

  2. 가져올 구독의 이름을 클릭합니다.

  3. 메시지 탭에서 가져오기를 클릭합니다.

이 구독에 게시한 메시지와 게시 시간이 표시됩니다.

Google Cloud 콘솔을 사용할 때 낮은 메시지 볼륨에 대한 개별 가져오기에서 메시지를 0개 반환하는 경우가 많습니다. 메시지가 표시되지 않으면 가져오기를 여러 번 클릭하여 가져오기 요청을 여러 개 실행하세요. Pub/Sub 클라이언트 라이브러리에서는 이런 문제가 없습니다.

gcloud

구독에서 메시지를 가져오려면 gcloud pubsub subscriptions pull 명령어를 실행합니다. gcloud CLI는 명령줄에 메시지를 인쇄합니다.

gcloud pubsub subscriptions pull SUBSCRIPTION_NAME --auto-ack

다음과 같이 바꿉니다.

  • SUBSCRIPTION_NAME: 메시지를 가져올 구독의 이름입니다.

클라이언트 라이브러리

다음은 고정된 수의 메시지를 가져오고 확인하는 샘플 코드입니다.

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

[](google::cloud::pubsub::Subscriber subscriber) {
  auto response = subscriber.Pull();
  if (!response) throw std::move(response).status();
  std::cout << "Received message " << response->message << "\n";
  std::move(response->handler).ack();
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참조 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Threading;

public class PullMessagesSyncSample
{
    public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
        int messageCount = 0;
        try
        {
            // Pull messages from server,
            // allowing an immediate response if there are no messages.
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && messageCount > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return messageCount;
    }
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncExample(
      String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // Handle received message
        // ...
        ackIds.add(message.getAckId());
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPull(projectId, subscriptionNameOrId) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${message.message.data}`);
    if (message.ackId) {
      ackIds.push(message.ackId);
    }
  }

  if (ackIds.length !== 0) {
    // Acknowledge all of the messages. You could also acknowledge
    // these individually, but this is more efficient.
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: ackIds,
    };

    await subClient.acknowledge(ackRequest);
  }

  console.log('Done.');
}

PHP

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참조 문서를 참조하세요.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

다음 샘플에서는 Ruby Pub/Sub 클라이언트 라이브러리 v3을 사용합니다. 아직 v2 라이브러리를 사용하고 있다면 v3로의 마이그레이션 가이드를 참고하세요. Ruby v2 코드 샘플 목록을 보려면 지원 중단된 코드 샘플을 참고하세요.

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

subscriber.pull(immediate: false).each do |message|
  puts "Message pulled: #{message.data}"
  message.acknowledge!
end

프로토콜

요청:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:pull

{
  "returnImmediately": "false",
  "maxMessages": "1"
}

응답:

200 OK

{
  "receivedMessages": [{
    "ackId": "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK...",
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"
    }
  }]
}

요청:

POST https://pubsub.googleapis.com/v1/projects/myproject/subscriptions/mysubscription:acknowledge

{
  "ackIds": [
    "dQNNHlAbEGEIBERNK0EPKVgUWQYyODM2LwgRHFEZDDsLRk1SK..."
  ]
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참조 문서를 참조하세요.

from google.api_core import retry
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

NUM_MESSAGES = 3

# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages. The actual
    # number of messages pulled may be smaller than max_messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},
        retry=retry.Retry(deadline=300),
    )

    if len(response.received_messages) == 0:
        return

    ack_ids = []
    for received_message in response.received_messages:
        print(f"Received: {received_message.message.data}.")
        ack_ids.append(received_message.ack_id)

    # Acknowledges the received messages so they will not be sent again.
    subscriber.acknowledge(
        request={"subscription": subscription_path, "ack_ids": ack_ids}
    )

    print(
        f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
    )

다음 단계

  • gcloud 명령어로 구독을 만들거나 수정합니다.
  • REST API로 구독을 만들거나 수정하기
  • RPC API로 구독을 만들거나 수정하기