Spring アプリケーションでの Pub/Sub の使用

このページでは、Spring Framework で作成された Java アプリケーションで Pub/Sub を使用する方法について説明します。

Spring Cloud GCP には、Spring Framework を使用して Pub/Sub トピックにメッセージを送信し、Pub/Sub サブスクリプションからメッセージを受信するためのモジュールが複数あります。これらのモジュールは、異なるユースケースに対して個別に使用することも、組み合わせて使用することもできます。

注: Spring Cloud GCP ライブラリでは、Java クライアント ライブラリを使用して 1 回限りの機能を実装するための必須モジュールである AckReplyConsumerWithResponse にはアクセスできません。

準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  4. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  5. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  6. 環境変数 GOOGLE_CLOUD_PROJECT に Google Cloud プロジェクト ID を設定します。

Spring Cloud GCP Pub/Sub Starter の使用

Spring Cloud GCP Pub/Sub Starter モジュールは、Spring Cloud GCP Pub/Sub モジュールを使用して Pub/Sub Java クライアント ライブラリをインストールします。Spring Cloud GCP Pub/Sub Starter が提供するクラスまたは Pub/Sub Java クライアント ライブラリを使用して、Spring アプリケーションから Pub/Sub API を呼び出すことができます。Spring Cloud GCP Pub/Sub Starter が提供するクラスを使用している場合は、デフォルトの Pub/Sub 構成をオーバーライドできます。

モジュールのインストール

Spring Cloud GCP Pub/Sub Starter モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。

  1. Spring Cloud Bill of Materials(BOM):

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub Starter アーティファクト:

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>

サポートされているオペレーション

Spring Cloud GCP Pub/Sub Starter モジュールには、次のクラスが含まれています。

  • 管理オペレーションのための PubSubAdmin:
    • トピックとサブスクリプションの作成
    • トピックとサブスクリプションの取得
    • トピックとサブスクリプションの一覧表示
    • トピックとサブスクリプションの削除
    • サブスクリプションの確認応答期限の取得と設定
  • メッセージを送受信するためのPubSubTemplate:
    • メッセージのトピックへのパブリッシュ
    • サブスクリプションからメッセージを同期的に pull する
    • サブスクリプションからメッセージを非同期で pull する
    • メッセージの確認応答
    • 確認応答期限の変更
    • Pub/Sub メッセージをプレーン オールド Java オブジェクト(POJO)に変換する

Spring 統合チャネル アダプタの使用

Spring アプリケーションで Spring 統合メッセージ チャネルを使用する場合、チャネル アダプタを使用して、メッセージ チャネルと Pub/Sub の間でメッセージをルーティングできます。

モジュールのインストール

Spring Integration チャネル アダプタ用のモジュールをインストールするには、次のものを pom.xml ファイルに追加します。

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud GCP Pub/Sub Starter と Spring Integration Core のアーティファクト。

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.integration</groupId>
      <artifactId>spring-integration-core</artifactId>
    </dependency>

Pub/Sub からのメッセージの受信

Spring アプリケーションで Pub/Sub サブスクリプションからメッセージを受信するには、受信チャネル アダプタを使用します。受信チャネル アダプタは、受信した Pub/Sub メッセージを POJO に変換し、POJO をメッセージ チャネルに転送します。

// Create a message channel for messages arriving from the subscription `sub-one`.
@Bean
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();
}

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
@Bean
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  adapter.setOutputChannel(messageChannel);
  adapter.setAckMode(AckMode.MANUAL);
  adapter.setPayloadType(String.class);
  return adapter;
}

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);
  message.ack();
}

上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。

  • inputMessageChannel という名前のメッセージ チャネル Bean
  • 型が PubSubInboundChannelAdapterinboundChannelAdapter という名前の受信チャネル アダプタ Bean。
  • sub-one という名前の Pub/Sub サブスクリプション ID。

inboundChannelAdapter は、PubSubTemplate を使用して sub-one から非同期でメッセージを pull し、inputMessageChannel にメッセージを送信します。

inboundChannelAdapter で確認応答モードを MANUAL に設定すると、アプリケーションはメッセージの処理後にメッセージの確認応答ができます。PubSubInboundChannelAdapter 型のデフォルトの確認応答モードは、AUTO です。

ServiceActivator Bean messageReceiverは、inputMessageChannel に到着する各メッセージを標準出力に記録してから、メッセージを確認応答します。

Pub/Sub へのメッセージのパブリッシュ

メッセージ チャネルから Pub/Sub トピックにメッセージをパブリッシュするには、送信チャネル アダプタを使用します。送信チャネル アダプタは、POJO を Pub/Sub メッセージに変換し、メッセージを Pub/Sub トピックに送信します。

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@Bean
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

  adapter.setSuccessCallback(
      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

  adapter.setFailureCallback(
      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;
}

上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。

  • inputMessageChannel という名前のメッセージ チャネル Bean
  • 型が PubSubMessageHandlermessageSender という名前の発信チャネル アダプタ Bean。
  • topic-two という名前の Pub/Sub トピック ID。

ServiceActivator Bean は、messageSender のロジックを inputMessageChannel 内の各メッセージに適用します。

messageSenderPubSubMessageHandler は、PubSubTemplate を使用して inputMessageChannel にメッセージをパブリッシュします。PubSubMessageHandler は、Pub/Sub トピック topic-two にメッセージをパブリッシュします。

Spring Cloud Stream Binder の使用

Spring Cloud Stream アプリケーションで Pub/Sub API を呼び出すには、Spring Cloud GCP Pub/Sub Stream Binder モジュールを使用します。

モジュールのインストール

Spring Cloud Stream Binder モジュールをインストールするには、pom.xml ファイルに次の内容を追加します。

  1. Spring Cloud GCP BOM

    <dependencyManagement>
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>spring-cloud-gcp-dependencies</artifactId>
          <version>3.7.7</version>
          <type>pom</type>
          <scope>import</scope>
        </dependency>
      </dependencies>
    </dependencyManagement>
  2. Spring Cloud Stream Binder アーティファクト。

    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>spring-cloud-gcp-pubsub-stream-binder</artifactId>
    </dependency>

Pub/Sub からのメッセージの受信

アプリケーションをイベントシンクとして使用するには、次のように入力バインダを構成します。

  • メッセージ処理ロジックを定義する Consumer Bean。たとえば、次の Consumer Bean は receiveMessageFromTopicTwo という名前になっています。

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    @Bean
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
        LOGGER.info(
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
      };
    }
  • 構成ファイル application.properties 内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-two という名前の Pub/Sub トピック ID を使用しています。

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.
    spring.cloud.stream.bindings.receiveMessageFromTopicTwo-in-0.destination=topic-two

このサンプルコードは、Pub/Sub からメッセージを受信します。この例では、次のことを行います。

  1. application.properties の入力バインディングの宛先で Pub/Sub トピック ID topic-two を検索します。
  2. topic-two への Pub/Sub サブスクリプションを作成します。
  3. バインディング名 receiveMessageFromTopicTwo-in-0 を使用して、receiveMessageFromTopicTwo という名前の Consumer Bean を見つけます。
  4. 着信したメッセージを標準出力に出力し、自動的に確認応答します。

Pub/Sub へのメッセージのパブリッシュ

アプリケーションをイベントソースとして使用するには、次のように出力バインダを構成します。

  • Supplier Bean は、アプリケーション内にメッセージが作られる場所を定義します。たとえば、次の Supplier Bean は sendMessageToTopicOne という名前になっています。

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    @Bean
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
          Flux.<Message<String>>generate(
                  sink -> {
                    try {
                      Thread.sleep(10000);
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    }
    
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                    LOGGER.info(
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
                    sink.next(message);
                  })
              .subscribeOn(Schedulers.boundedElastic());
    }
  • 構成ファイル application.properties 内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-one という名前の Pub/Sub トピック ID を使用しています。

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.
    spring.cloud.stream.bindings.sendMessageToTopicOne-out-0.destination=topic-one

このサンプルコードは、Pub/Sub にメッセージをパブリッシュします。この例では、次のことを行います。

  1. application.properties の出力バインディングの宛先で Pub/Sub トピック ID topic-one を検索します。
  2. バインディング名 sendMessageToTopicOne-out-0 を使用して、sendMessageToTopicOneという名前の Supplier Bean を見つけます。
  3. 10 秒ごとに topic-one に番号付きメッセージを送信します。