Spring 애플리케이션에서 Pub/Sub 사용

이 페이지에서는 Spring Framework로 빌드된 Java 애플리케이션에서 Pub/Sub를 사용하는 방법을 설명합니다.

Spring Cloud GCP에는 Spring Framework를 사용하여 Pub/Sub 구독에서 메시지를 수신하고 Pub/Sub 주제로 메시지를 전송하기 위한 몇 가지 모듈이 포함되어 있습니다. 이러한 모듈은 여러 사용 사례에 맞게 개별적으로 또는 결합된 형태로 사용할 수 있습니다.

참고: Spring Cloud GCP 라이브러리에서는 Java 클라이언트 라이브러리를 사용하여 1회만 구현하는 기능을 구현하는 데 필요한 모듈인 AckReplyConsumerWithResponse에 대해 액세스 권한을 제공하지 않습니다.

시작하기 전에

  4. GOOGLE_CLOUD_PROJECT 환경 변수를 Google Cloud 프로젝트 ID로 설정합니다.

    PROJECT_ID를 Google Cloud 프로젝트의 ID로 바꿉니다.

Spring Cloud GCP Pub/Sub Starter 사용

Spring Cloud GCP Pub/Sub Starter 모듈은 Spring Cloud GCP Pub/Sub 모듈을 사용하여 Pub/Sub Java 클라이언트 라이브러리를 설치합니다. Spring Cloud GCP Pub/Sub Starter가 제공하는 클래스 또는 Pub/Sub Java 클라이언트 라이브러리를 사용하여 Spring 애플리케이션에서 Pub/Sub API를 호출할 수 있습니다. Spring Cloud GCP Pub/Sub Starter가 제공하는 클래스를 사용하는 경우 기본 Pub/Sub 구성을 무효화할 수 있습니다.

모듈 설치

Spring Cloud GCP Pub/Sub Starter 모듈을 설치하려면 해당 종속 항목을 pom.xml 파일에 추가합니다.

  1. Spring Cloud 명세서(BOM):

  2. Spring Cloud GCP Pub/Sub Starter 아티팩트:


지원되는 작업

Spring Cloud GCP Pub/Sub Starter 모듈에는 다음 클래스가 포함되어 있습니다.

  • 관리 작업을 위한 PubSubAdmin:
    • 주제 및 구독을 만듭니다.
    • 주제 및 구독을 가져옵니다.
    • 주제 및 구독을 나열합니다.
    • 주제 및 구독을 삭제합니다.
    • 구독에 대한 확인 기한을 가져오고 설정합니다.
  • 메시지 송신 및 수신을 위한 PubSubTemplate:
    • 주제에 메시지를 게시합니다.
    • 구독에서 동기식으로 메시지를 가져옵니다.
    • 구독에서 비동기식으로 메시지를 가져옵니다.
    • 메시지를 확인합니다.
    • 확인 기한을 수정합니다.
    • Pub/Sub 메시지를 일반 구형 자바 객체(POJO)로 변환합니다.

Spring Integration 채널 어댑터 사용

Spring 애플리케이션에 Spring 통합 메시지 채널이 사용되는 경우 채널 어댑터를 사용하여 메시지 채널과 Pub/Sub 사이에 메시지를 라우팅할 수 있습니다.

모듈 설치

Spring 통합 채널 어댑터에 대해 모듈을 설치하려면 pom.xml 파일에 다음을 추가합니다.

  1. Spring Cloud GCP BOM.

  2. Spring Cloud GCP Pub/Sub Starter 및 Spring 통합 코어 아티팩트:


Pub/Sub에서 메시지 수신

Spring 애플리케이션에서 Pub/Sub 구독으로부터 메시지를 수신하려면 인바운드 채널 어댑터를 사용합니다. 인바운드 채널 어댑터는 수신되는 Pub/Sub 메시지를 POJO로 변환하고 POJO를 메시지 채널로 전달합니다.

// Create a message channel for messages arriving from the subscription `sub-one`.
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  return adapter;

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);

위 예시에서는 다음 Spring bean 및 Pub/Sub 리소스가 사용됩니다.

  • 이름이 inputMessageChannel인 메시지 채널 bean
  • 이름이 inboundChannelAdapter이고 유형이 PubSubInboundChannelAdapter인 인바운드 채널 어댑터 bean
  • 이름이 sub-one인 Pub/Sub 구독 ID

inboundChannelAdapterPubSubTemplate를 사용하여 sub-one에서 메시지를 비동기식으로 가져오고 메시지를 inputMessageChannel로 전송합니다.

inboundChannelAdapter는 애플리케이션이 메시지 처리 후 이를 확인할 수 있도록 확인 모드를 MANUAL로 설정합니다. PubSubInboundChannelAdapter 유형의 기본 확인 모드는 AUTO입니다.

ServiceActivator bean messageReceiverinputMessageChannel에서 도착하는 각 메시지를 표준 출력에 로깅한 후 메시지를 확인합니다.

Pub/Sub에 메시지 게시

메시지 채널에서 Pub/Sub 주제에 메시지를 게시하려면 아웃바운드 채널 어댑터를 사용합니다. 아웃바운드 채널 어댑터는 POJO를 Pub/Sub 메시지로 변환한 후 메시지를 Pub/Sub 주제로 전송합니다.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;

위 예시에서는 다음 Spring bean 및 Pub/Sub 리소스가 사용됩니다.

  • 이름이 inputMessageChannel인 메시지 채널 bean
  • 이름이 messageSender이고 유형이 PubSubMessageHandler인 아웃바운드 채널 어댑터 bean
  • 이름이 topic-two인 Pub/Sub 주제 ID

ServiceActivator bean은 messageSender의 논리를 inputMessageChannel의 각 메시지에 적용합니다.

messageSenderPubSubMessageHandlerPubSubTemplate을 사용하여 inputMessageChannel의 메시지를 게시합니다. PubSubMessageHandler는 메시지를 Pub/Sub 주제 topic-two에 게시합니다.

Spring Cloud Stream Binder 사용

Spring Cloud Stream 애플리케이션에서 Pub/Sub API를 호출하려면 Spring Cloud GCP Pub/Sub Stream Binder 모듈을 사용합니다.

모듈 설치

Spring Cloud Stream Binder 모듈을 설치하려면 pom.xml 파일에 다음을 추가합니다.

  1. Spring Cloud GCP BOM.

  2. Spring Cloud Stream Binder 아티팩트:


Pub/Sub에서 메시지 수신

애플리케이션을 이벤트 싱크로 사용하려면 다음을 지정하여 입력 바인더를 구성합니다.

  • 메시지 처리 논리를 정의하는 Consumer bean. 예를 들어 다음 Consumer bean은 이름이 receiveMessageFromTopicTwo입니다.

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
  • 구성 파일 application.properties의 Pub/Sub 주제 ID. 예를 들어 다음 구성 파일은 topic-two라는 Pub/Sub 주제 ID를 사용합니다.

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.

이 예시 코드는 Pub/Sub에서 메시지를 수신합니다. 이 예시는 다음을 수행합니다.

  1. application.properties의 입력 바인딩 대상에서 Pub/Sub 주제 ID topic-two를 찾습니다.
  2. topic-two에 대해 Pub/Sub 구독을 만듭니다.
  3. 바인딩 이름 receiveMessageFromTopicTwo-in-0을 사용하여 receiveMessageFromTopicTwo라는 Consumer bean을 찾습니다.
  4. 수신 메시지를 표준 출력으로 출력하고 이를 자동으로 확인합니다.

Pub/Sub에 메시지 게시

애플리케이션을 이벤트 소스로 사용하려면 다음을 지정하여 출력 바인더를 구성합니다.

  • 애플리케이션 내에서 메시지가 수신되는 위치를 정의하는 Supplier bean. 예를 들어 다음 Supplier bean은 이름이 sendMessageToTopicOne입니다.

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
                  sink -> {
                    try {
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
  • 구성 파일 application.properties의 Pub/Sub 주제 ID. 예를 들어 다음 구성 파일은 topic-one라는 Pub/Sub 주제 ID를 사용합니다.

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.

이 예시 코드는 Pub/Sub에 메시지를 게시합니다. 이 예시는 다음을 수행합니다.

  1. application.properties의 출력 바인딩 대상에서 Pub/Sub 주제 ID topic-one를 찾습니다.
  2. 바인딩 이름 sendMessageToTopicOne-out-0을 사용하여 sendMessageToTopicOne라는 Supplier bean을 찾습니다.
  3. 10초마다 topic-one으로 번호가 지정된 메시지를 전송합니다.