このページでは、Spring Framework で作成された Java アプリケーションで Pub/Sub を使用する方法について説明します。
Spring Cloud GCP には、Spring Framework を使用して Pub/Sub トピックにメッセージを送信し、Pub/Sub サブスクリプションからメッセージを受信するためのモジュールが複数あります。これらのモジュールは、異なるユースケースに対して個別に使用することも、組み合わせて使用することもできます。
- Spring Cloud GCP Pub/Sub Starter を使用すると、ヘルパークラスを使用してメッセージを送受信できます。より高度なシナリオが必要な場合は、Pub/Sub Java クライアント ライブラリを呼び出すこともできます。
- Pub/Sub の Spring 統合チャネル アダプタを使用すると、Spring Integration のメッセージ チャネルを Pub/Sub に接続できます。
- Pub/Sub 用 Spring Cloud Stream Binder を使用すると、Pub/Sub を、Spring Cloud Stream アプリケーションでメッセージング ミドルウェアとして使用できます。
注: Spring Cloud GCP ライブラリでは、Java クライアント ライブラリを使用して 1 回限りの機能を実装するための必須モジュールである AckReplyConsumerWithResponse にはアクセスできません。
準備
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Set up a Google Cloud console project.
Click to:
- Create or select a project.
- Enable the Pub/Sub API for that project.
- Create a service account.
- Download a private key as JSON.
You can view and manage these resources at any time in the Google Cloud console.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. -
Set up a Google Cloud console project.
Click to:
- Create or select a project.
- Enable the Pub/Sub API for that project.
- Create a service account.
- Download a private key as JSON.
You can view and manage these resources at any time in the Google Cloud console.
-
Set the environment variable
GOOGLE_APPLICATION_CREDENTIALS
to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again. - 環境変数
GOOGLE_CLOUD_PROJECT
に Google Cloud プロジェクト ID を設定します。
Spring Cloud GCP Pub/Sub Starter の使用
Spring Cloud GCP Pub/Sub Starter モジュールは、Spring Cloud GCP Pub/Sub モジュールを使用して Pub/Sub Java クライアント ライブラリをインストールします。Spring Cloud GCP Pub/Sub Starter が提供するクラスまたは Pub/Sub Java クライアント ライブラリを使用して、Spring アプリケーションから Pub/Sub API を呼び出すことができます。Spring Cloud GCP Pub/Sub Starter が提供するクラスを使用している場合は、デフォルトの Pub/Sub 構成をオーバーライドできます。
モジュールのインストール
Spring Cloud GCP Pub/Sub Starter モジュールをインストールするには、次の依存関係を pom.xml
ファイルに追加します。
Spring Cloud GCP Pub/Sub Starter アーティファクト:
サポートされているオペレーション
Spring Cloud GCP Pub/Sub Starter モジュールには、次のクラスが含まれています。
- 管理オペレーションのための
PubSubAdmin
:- トピックとサブスクリプションの作成
- トピックとサブスクリプションの取得
- トピックとサブスクリプションの一覧表示
- トピックとサブスクリプションの削除
- サブスクリプションの確認応答期限の取得と設定
- メッセージを送受信するための
PubSubTemplate
:- メッセージのトピックへのパブリッシュ
- サブスクリプションからメッセージを同期的に pull する
- サブスクリプションからメッセージを非同期で pull する
- メッセージの確認応答
- 確認応答期限の変更
- Pub/Sub メッセージをプレーン オールド Java オブジェクト(POJO)に変換する
Spring 統合チャネル アダプタの使用
Spring アプリケーションで Spring 統合メッセージ チャネルを使用する場合、チャネル アダプタを使用して、メッセージ チャネルと Pub/Sub の間でメッセージをルーティングできます。
- 受信チャネル アダプタは、メッセージを Pub/Sub サブスクリプションからメッセージ チャネルに転送します。
- 送信チャネル アダプタは、メッセージ チャネルから Pub/Sub トピックにメッセージをパブリッシュします。
モジュールのインストール
Spring Integration チャネル アダプタ用のモジュールをインストールするには、次のものを pom.xml
ファイルに追加します。
Spring Cloud GCP Pub/Sub Starter と Spring Integration Core のアーティファクト。
Pub/Sub からのメッセージの受信
Spring アプリケーションで Pub/Sub サブスクリプションからメッセージを受信するには、受信チャネル アダプタを使用します。受信チャネル アダプタは、受信した Pub/Sub メッセージを POJO に変換し、POJO をメッセージ チャネルに転送します。
上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。
inputMessageChannel
という名前のメッセージ チャネル Bean- 型が
PubSubInboundChannelAdapter
でinboundChannelAdapter
という名前の受信チャネル アダプタ Bean。 sub-one
という名前の Pub/Sub サブスクリプション ID。
inboundChannelAdapter
は、PubSubTemplate
を使用して sub-one
から非同期でメッセージを pull し、inputMessageChannel
にメッセージを送信します。
inboundChannelAdapter
で確認応答モードを MANUAL
に設定すると、アプリケーションはメッセージの処理後にメッセージの確認応答ができます。PubSubInboundChannelAdapter
型のデフォルトの確認応答モードは、AUTO
です。
ServiceActivator
Bean messageReceiver
は、inputMessageChannel
に到着する各メッセージを標準出力に記録してから、メッセージを確認応答します。
Pub/Sub へのメッセージのパブリッシュ
メッセージ チャネルから Pub/Sub トピックにメッセージをパブリッシュするには、送信チャネル アダプタを使用します。送信チャネル アダプタは、POJO を Pub/Sub メッセージに変換し、メッセージを Pub/Sub トピックに送信します。
上の例では、次の Spring Bean と Pub/Sub リソースを使用しています。
inputMessageChannel
という名前のメッセージ チャネル Bean- 型が
PubSubMessageHandler
でmessageSender
という名前の発信チャネル アダプタ Bean。 topic-two
という名前の Pub/Sub トピック ID。
ServiceActivator
Bean は、messageSender
のロジックを inputMessageChannel
内の各メッセージに適用します。
messageSender
の PubSubMessageHandler
は、PubSubTemplate
を使用して inputMessageChannel
にメッセージをパブリッシュします。PubSubMessageHandler
は、Pub/Sub トピック topic-two
にメッセージをパブリッシュします。
Spring Cloud Stream Binder の使用
Spring Cloud Stream アプリケーションで Pub/Sub API を呼び出すには、Spring Cloud GCP Pub/Sub Stream Binder モジュールを使用します。
モジュールのインストール
Spring Cloud Stream Binder モジュールをインストールするには、pom.xml
ファイルに次の内容を追加します。
Spring Cloud Stream Binder アーティファクト。
Pub/Sub からのメッセージの受信
アプリケーションをイベントシンクとして使用するには、次のように入力バインダを構成します。
メッセージ処理ロジックを定義する
Consumer
Bean。たとえば、次のConsumer
Bean はreceiveMessageFromTopicTwo
という名前になっています。構成ファイル
application.properties
内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-two
という名前の Pub/Sub トピック ID を使用しています。
このサンプルコードは、Pub/Sub からメッセージを受信します。この例では、次のことを行います。
application.properties
の入力バインディングの宛先で Pub/Sub トピック IDtopic-two
を検索します。topic-two
への Pub/Sub サブスクリプションを作成します。- バインディング名
receiveMessageFromTopicTwo-in-0
を使用して、receiveMessageFromTopicTwo
という名前のConsumer
Bean を見つけます。 - 着信したメッセージを標準出力に出力し、自動的に確認応答します。
Pub/Sub へのメッセージのパブリッシュ
アプリケーションをイベントソースとして使用するには、次のように出力バインダを構成します。
Supplier
Bean は、アプリケーション内にメッセージが作られる場所を定義します。たとえば、次のSupplier
Bean はsendMessageToTopicOne
という名前になっています。構成ファイル
application.properties
内の Pub/Sub トピック ID。たとえば、次の構成ファイルでは、topic-one
という名前の Pub/Sub トピック ID を使用しています。
このサンプルコードは、Pub/Sub にメッセージをパブリッシュします。この例では、次のことを行います。
application.properties
の出力バインディングの宛先で Pub/Sub トピック IDtopic-one
を検索します。- バインディング名
sendMessageToTopicOne-out-0
を使用して、sendMessageToTopicOne
という名前のSupplier
Bean を見つけます。 - 10 秒ごとに
topic-one
に番号付きメッセージを送信します。