Spring アプリケーションでの Pub/Sub の使用

このページでは、Spring Framework で作成された Java アプリケーションで Pub/Sub を使用する方法について説明します。

Spring Cloud GCP には、Spring Framework を使用して Pub/Sub トピックにメッセージを送信し、Pub/Sub サブスクリプションからメッセージを受信するためのモジュールが複数あります。これらのモジュールは、異なるユースケースに対して個別に使用することも、組み合わせて使用することもできます。

注: Spring Cloud GCP ライブラリでは、Java クライアント ライブラリを使用して 1 回限りの機能を実装するための必須モジュールである AckReplyConsumerWithResponse にはアクセスできません。


  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Set up a Google Cloud console project.

    Set up a project

    Click to:

    • Create or select a project.
    • Enable the Pub/Sub API for that project.
    • Create a service account.
    • Download a private key as JSON.

    You can view and manage these resources at any time in the Google Cloud console.

  3. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.


    Replace KEY_PATH with the path of the JSON file that contains your credentials.

    For example:

    export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/service-account-file.json"

    For PowerShell:


    Replace KEY_PATH with the path of the JSON file that contains your credentials.

    For example:


    For command prompt:


    Replace KEY_PATH with the path of the JSON file that contains your credentials.

  4. 環境変数 GOOGLE_CLOUD_PROJECT に Google Cloud プロジェクト ID を設定します。

    PROJECT_ID は、 Google Cloud プロジェクトの ID に置き換えます。


    PROJECT_ID は、 Google Cloud プロジェクトの ID に置き換えます。

Spring Cloud GCP Pub/Sub Starter の使用

Spring Cloud GCP Pub/Sub Starter モジュールは、Spring Cloud GCP Pub/Sub モジュールを使用して Pub/Sub Java クライアント ライブラリをインストールします。Spring Cloud GCP Pub/Sub Starter が提供するクラスまたは Pub/Sub Java クライアント ライブラリを使用して、Spring アプリケーションから Pub/Sub API を呼び出すことができます。Spring Cloud GCP Pub/Sub Starter が提供するクラスを使用している場合は、デフォルトの Pub/Sub 構成をオーバーライドできます。


Spring Cloud GCP Pub/Sub Starter モジュールをインストールするには、次の依存関係を pom.xml ファイルに追加します。

  1. Spring Cloud Bill of Materials(BOM):

  2. Spring Cloud GCP Pub/Sub Starter アーティファクト:



Spring Cloud GCP Pub/Sub Starter モジュールには、次のクラスが含まれています。

  • 管理オペレーションのための PubSubAdmin:
    • トピックとサブスクリプションの作成
    • トピックとサブスクリプションの取得
    • トピックとサブスクリプションの一覧表示
    • トピックとサブスクリプションの削除
    • サブスクリプションの確認応答期限の取得と設定
  • メッセージを送受信するためのPubSubTemplate:
    • メッセージのトピックへのパブリッシュ
    • サブスクリプションからメッセージを同期的に pull する
    • サブスクリプションからメッセージを非同期で pull する
    • メッセージの確認応答
    • 確認応答期限の変更
    • Pub/Sub メッセージをプレーン オールド Java オブジェクト(POJO)に変換する

Spring 統合チャネル アダプタの使用

Spring アプリケーションで Spring 統合メッセージ チャネルを使用する場合、チャネル アダプタを使用して、メッセージ チャネルと Pub/Sub の間でメッセージをルーティングできます。


Spring Integration チャネル アダプタ用のモジュールをインストールするには、次のものを pom.xml ファイルに追加します。

  1. Spring Cloud GCP BOM

  2. Spring Cloud GCP Pub/Sub Starter と Spring Integration Core のアーティファクト。


Pub/Sub からのメッセージの受信

Spring アプリケーションで Pub/Sub サブスクリプションからメッセージを受信するには、受信チャネル アダプタを使用します。受信チャネル アダプタは、受信した Pub/Sub メッセージを POJO に変換し、POJO をメッセージ チャネルに転送します。

// Create a message channel for messages arriving from the subscription `sub-one`.
public MessageChannel inputMessageChannel() {
  return new PublishSubscribeChannel();

// Create an inbound channel adapter to listen to the subscription `sub-one` and send
// messages to the input message channel.
public PubSubInboundChannelAdapter inboundChannelAdapter(
    @Qualifier("inputMessageChannel") MessageChannel messageChannel,
    PubSubTemplate pubSubTemplate) {
  PubSubInboundChannelAdapter adapter =
      new PubSubInboundChannelAdapter(pubSubTemplate, "sub-one");
  return adapter;

// Define what happens to the messages arriving in the message channel.
@ServiceActivator(inputChannel = "inputMessageChannel")
public void messageReceiver(
    String payload,
    @Header(GcpPubSubHeaders.ORIGINAL_MESSAGE) BasicAcknowledgeablePubsubMessage message) {
  LOGGER.info("Message arrived via an inbound channel adapter from sub-one! Payload: " + payload);

上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。

  • inputMessageChannel という名前のメッセージ チャネル Bean
  • 型が PubSubInboundChannelAdapterinboundChannelAdapter という名前の受信チャネル アダプタ Bean。
  • sub-one という名前の Pub/Sub サブスクリプション ID。

inboundChannelAdapter は、PubSubTemplate を使用して sub-one から非同期でメッセージを pull し、inputMessageChannel にメッセージを送信します。

inboundChannelAdapter で確認応答モードを MANUAL に設定すると、アプリケーションはメッセージの処理後にメッセージの確認応答ができます。PubSubInboundChannelAdapter 型のデフォルトの確認応答モードは、AUTO です。

ServiceActivator Bean messageReceiverは、inputMessageChannel に到着する各メッセージを標準出力に記録してから、メッセージを確認応答します。

Pub/Sub へのメッセージのパブリッシュ

メッセージ チャネルから Pub/Sub トピックにメッセージをパブリッシュするには、送信チャネル アダプタを使用します。送信チャネル アダプタは、POJO を Pub/Sub メッセージに変換し、メッセージを Pub/Sub トピックに送信します。

// Create an outbound channel adapter to send messages from the input message channel to the
// topic `topic-two`.
@ServiceActivator(inputChannel = "inputMessageChannel")
public MessageHandler messageSender(PubSubTemplate pubsubTemplate) {
  PubSubMessageHandler adapter = new PubSubMessageHandler(pubsubTemplate, "topic-two");

      ((ackId, message) ->
          LOGGER.info("Message was sent via the outbound channel adapter to topic-two!")));

      (cause, message) -> LOGGER.info("Error sending " + message + " due to " + cause));

  return adapter;

上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。

  • inputMessageChannel という名前のメッセージ チャネル Bean
  • 型が PubSubMessageHandlermessageSender という名前の発信チャネル アダプタ Bean。
  • topic-two という名前の Pub/Sub トピック ID。

ServiceActivator Bean は、messageSender のロジックを inputMessageChannel 内の各メッセージに適用します。

messageSenderPubSubMessageHandler は、PubSubTemplate を使用して inputMessageChannel にメッセージをパブリッシュします。PubSubMessageHandler は、Pub/Sub トピック topic-two にメッセージをパブリッシュします。

Spring Cloud Stream Binder の使用

Spring Cloud Stream アプリケーションで Pub/Sub API を呼び出すには、Spring Cloud GCP Pub/Sub Stream Binder モジュールを使用します。


Spring Cloud Stream Binder モジュールをインストールするには、pom.xml ファイルに次の内容を追加します。

  1. Spring Cloud GCP BOM

  2. Spring Cloud Stream Binder アーティファクト。


Pub/Sub からのメッセージの受信


  • メッセージ処理ロジックを定義する Consumer Bean。たとえば、次の Consumer Bean は receiveMessageFromTopicTwo という名前になっています。

    // Create an input binder to receive messages from `topic-two` using a Consumer bean.
    public Consumer<Message<String>> receiveMessageFromTopicTwo() {
      return message -> {
            "Message arrived via an input binder from topic-two! Payload: " + message.getPayload());
  • 構成ファイル application.properties 内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-two という名前の Pub/Sub トピック ID を使用しています。

    # Bind the Pub/Sub topic `topic-two` to the Consumer bean
    # `receiveMessageFromTopicTwo`. Your Spring application will
    # automatically create and attach a subscription to the topic.

このサンプルコードは、Pub/Sub からメッセージを受信します。この例では、次のことを行います。

  1. application.properties の入力バインディングの宛先で Pub/Sub トピック ID topic-two を検索します。
  2. topic-two への Pub/Sub サブスクリプションを作成します。
  3. バインディング名 receiveMessageFromTopicTwo-in-0 を使用して、receiveMessageFromTopicTwo という名前の Consumer Bean を見つけます。
  4. 着信したメッセージを標準出力に出力し、自動的に確認応答します。

Pub/Sub へのメッセージのパブリッシュ


  • Supplier Bean は、アプリケーション内にメッセージが作られる場所を定義します。たとえば、次の Supplier Bean は sendMessageToTopicOne という名前になっています。

    // Create an output binder to send messages to `topic-one` using a Supplier bean.
    public Supplier<Flux<Message<String>>> sendMessageToTopicOne() {
      return () ->
                  sink -> {
                    try {
                    } catch (InterruptedException e) {
                      // Stop sleep earlier.
                    Message<String> message =
                        MessageBuilder.withPayload("message-" + rand.nextInt(1000)).build();
                        "Sending a message via the output binder to topic-one! Payload: "
                            + message.getPayload());
  • 構成ファイル application.properties 内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-one という名前の Pub/Sub トピック ID を使用しています。

    # Bind the Supplier bean `sendMessageToTopicOne` to the Pub/Sub topic
    # `topic-one`. If the topic does not exist, one will be created.

このサンプルコードは、Pub/Sub にメッセージをパブリッシュします。この例では、次のことを行います。

  1. application.properties の出力バインディングの宛先で Pub/Sub トピック ID topic-one を検索します。
  2. バインディング名 sendMessageToTopicOne-out-0 を使用して、sendMessageToTopicOneという名前の Supplier Bean を見つけます。
  3. 10 秒ごとに topic-one に番号付きメッセージを送信します。