Spring 애플리케이션에서 Pub/Sub 사용

이 페이지에서는 Spring Framework로 빌드된 Java 애플리케이션에서 Pub/Sub를 사용하는 방법을 설명합니다.

Spring Cloud GCP에는 Spring Framework를 사용하여 Pub/Sub 구독에서 메시지를 수신하고 Pub/Sub 주제로 메시지를 전송하기 위한 몇 가지 모듈이 포함되어 있습니다. 이러한 모듈은 여러 사용 사례에 맞게 개별적으로 또는 결합된 형태로 사용할 수 있습니다.

참고: Spring Cloud GCP 라이브러리에서는 Java 클라이언트 라이브러리를 사용하여 1회만 구현하는 기능을 구현하는 데 필요한 모듈인 AckReplyConsumerWithResponse에 대해 액세스 권한을 제공하지 않습니다.

시작하기 전에

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. GOOGLE_CLOUD_PROJECT 환경 변수를 Google Cloud 프로젝트 ID로 설정합니다.

Spring Cloud GCP Pub/Sub Starter 사용

Spring Cloud GCP Pub/Sub Starter 모듈은 Spring Cloud GCP Pub/Sub 모듈을 사용하여 Pub/Sub Java 클라이언트 라이브러리를 설치합니다. Spring Cloud GCP Pub/Sub Starter가 제공하는 클래스 또는 Pub/Sub Java 클라이언트 라이브러리를 사용하여 Spring 애플리케이션에서 Pub/Sub API를 호출할 수 있습니다. Spring Cloud GCP Pub/Sub Starter가 제공하는 클래스를 사용하는 경우 기본 Pub/Sub 구성을 무효화할 수 있습니다.

모듈 설치

Spring Cloud GCP Pub/Sub Starter 모듈을 설치하려면 해당 종속 항목을 pom.xml 파일에 추가합니다.

  1. Spring Cloud 명세서(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub Starter 아티팩트:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

지원되는 작업

Spring Cloud GCP Pub/Sub Starter 모듈에는 다음 클래스가 포함되어 있습니다.

  • 관리 작업을 위한 PubSubAdmin:
    • 주제 및 구독을 만듭니다.
    • 주제 및 구독을 가져옵니다.
    • 주제 및 구독을 나열합니다.
    • 주제 및 구독을 삭제합니다.
    • 구독에 대한 확인 기한을 가져오고 설정합니다.
  • 메시지 송신 및 수신을 위한 PubSubTemplate:
    • 주제에 메시지를 게시합니다.
    • 구독에서 동기식으로 메시지를 가져옵니다.
    • 구독에서 비동기식으로 메시지를 가져옵니다.
    • 메시지를 확인합니다.
    • 확인 기한을 수정합니다.
    • Pub/Sub 메시지를 일반 구형 자바 객체(POJO)로 변환합니다.

Spring Integration 채널 어댑터 사용

Spring 애플리케이션에 Spring 통합 메시지 채널이 사용되는 경우 채널 어댑터를 사용하여 메시지 채널과 Pub/Sub 사이에 메시지를 라우팅할 수 있습니다.

모듈 설치

Spring 통합 채널 어댑터에 대해 모듈을 설치하려면 pom.xml 파일에 다음을 추가합니다.

  1. Spring Cloud GCP BOM.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub Starter 및 Spring 통합 코어 아티팩트:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Pub/Sub에서 메시지 수신

Spring 애플리케이션에서 Pub/Sub 구독으로부터 메시지를 수신하려면 인바운드 채널 어댑터를 사용합니다. 인바운드 채널 어댑터는 수신되는 Pub/Sub 메시지를 POJO로 변환하고 POJO를 메시지 채널로 전달합니다.

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

위 예시에서는 다음 Spring bean 및 Pub/Sub 리소스가 사용됩니다.

  • 이름이 inputMessageChannel인 메시지 채널 bean
  • 이름이 inboundChannelAdapter이고 유형이 PubSubInboundChannelAdapter인 인바운드 채널 어댑터 bean
  • 이름이 sub-one인 Pub/Sub 구독 ID

inboundChannelAdapterPubSubTemplate를 사용하여 sub-one에서 메시지를 비동기식으로 가져오고 메시지를 inputMessageChannel로 전송합니다.

inboundChannelAdapter는 애플리케이션이 메시지 처리 후 이를 확인할 수 있도록 확인 모드를 MANUAL로 설정합니다. PubSubInboundChannelAdapter 유형의 기본 확인 모드는 AUTO입니다.

ServiceActivator bean messageReceiverinputMessageChannel에서 도착하는 각 메시지를 표준 출력에 로깅한 후 메시지를 확인합니다.

Pub/Sub에 메시지 게시

메시지 채널에서 Pub/Sub 주제에 메시지를 게시하려면 아웃바운드 채널 어댑터를 사용합니다. 아웃바운드 채널 어댑터는 POJO를 Pub/Sub 메시지로 변환한 후 메시지를 Pub/Sub 주제로 전송합니다.

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

위 예시에서는 다음 Spring bean 및 Pub/Sub 리소스가 사용됩니다.

  • 이름이 inputMessageChannel인 메시지 채널 bean
  • 이름이 messageSender이고 유형이 PubSubMessageHandler인 아웃바운드 채널 어댑터 bean
  • 이름이 topic-two인 Pub/Sub 주제 ID

ServiceActivator bean은 messageSender의 논리를 inputMessageChannel의 각 메시지에 적용합니다.

messageSenderPubSubMessageHandlerPubSubTemplate을 사용하여 inputMessageChannel의 메시지를 게시합니다. PubSubMessageHandler는 메시지를 Pub/Sub 주제 topic-two에 게시합니다.

Spring Cloud Stream Binder 사용

Spring Cloud Stream 애플리케이션에서 Pub/Sub API를 호출하려면 Spring Cloud GCP Pub/Sub Stream Binder 모듈을 사용합니다.

모듈 설치

Spring Cloud Stream Binder 모듈을 설치하려면 pom.xml 파일에 다음을 추가합니다.

  1. Spring Cloud GCP BOM.

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud Stream Binder 아티팩트:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Pub/Sub에서 메시지 수신

애플리케이션을 이벤트 싱크로 사용하려면 다음을 지정하여 입력 바인더를 구성합니다.

  • 메시지 처리 논리를 정의하는 Consumer bean. 예를 들어 다음 Consumer bean은 이름이 receiveMessageFromTopicTwo입니다.

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • 구성 파일 application.properties의 Pub/Sub 주제 ID. 예를 들어 다음 구성 파일은 topic-two라는 Pub/Sub 주제 ID를 사용합니다.

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

이 예시 코드는 Pub/Sub에서 메시지를 수신합니다. 이 예시는 다음을 수행합니다.

  1. application.properties의 입력 바인딩 대상에서 Pub/Sub 주제 ID topic-two를 찾습니다.
  2. topic-two에 대해 Pub/Sub 구독을 만듭니다.
  3. 바인딩 이름 receiveMessageFromTopicTwo-in-0을 사용하여 receiveMessageFromTopicTwo라는 Consumer bean을 찾습니다.
  4. 수신 메시지를 표준 출력으로 출력하고 이를 자동으로 확인합니다.

Pub/Sub에 메시지 게시

애플리케이션을 이벤트 소스로 사용하려면 다음을 지정하여 출력 바인더를 구성합니다.

  • 애플리케이션 내에서 메시지가 수신되는 위치를 정의하는 Supplier bean. 예를 들어 다음 Supplier bean은 이름이 sendMessageToTopicOne입니다.

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • 구성 파일 application.properties의 Pub/Sub 주제 ID. 예를 들어 다음 구성 파일은 topic-one라는 Pub/Sub 주제 ID를 사용합니다.

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

이 예시 코드는 Pub/Sub에 메시지를 게시합니다. 이 예시는 다음을 수행합니다.

  1. application.properties의 출력 바인딩 대상에서 Pub/Sub 주제 ID topic-one를 찾습니다.
  2. 바인딩 이름 sendMessageToTopicOne-out-0을 사용하여 sendMessageToTopicOne라는 Supplier bean을 찾습니다.
  3. 10초마다 topic-one으로 번호가 지정된 메시지를 전송합니다.