このページでは、Lite トピックにメッセージをパブリッシュする方法について説明します。メッセージは、Java 用の Pub/Sub Lite クライアント ライブラリを使用してパブリッシュできます。
Lite トピックにメッセージをパブリッシュして、Lite サブスクリプションを作成すると、Lite サブスクリプションからメッセージを受信できます。
メッセージの形式
メッセージは、メッセージ データとメタデータを含むフィールドで構成されます。 メッセージに指定できる項目は、次のとおりです。
クライアント ライブラリによってメッセージがパーティションに自動的に割り当てられ、Pub/Sub Lite サービスによってメッセージに次のフィールドが追加されます。
- パーティション内で一意のメッセージ ID
- Pub/Sub Lite サービスでパーティションにメッセージが格納されるタイムスタンプ。
メッセージの公開
メッセージをパブリッシュするには、Lite トピックへのストリーミング接続をリクエストし、ストリーミング接続を介してメッセージを送信します。
次のサンプルは、Lite トピックにメッセージをパブリッシュする方法を示しています。
gcloud
このコマンドを使用するには、Python 3.6 以降と grpcio Python パッケージのインストールが必要です。MacOS、Linux、Cloud Shell のユーザーの場合、以下を実行します。
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
メッセージを公開するには、gcloud beta pubsub lite-topics publish コマンドを使用します。
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--message=MESSAGE_DATA
以下を置き換えます。
- TOPIC_ID: Lite トピックの ID
- LITE_LOCATION: Lite トピックのロケーション
- MESSAGE_DATA: メッセージ データを含む文字列
Go
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Go の設定手順を実施してください。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
クライアント ライブラリは非同期でメッセージを送信し、エラーを処理します。エラーが発生すると、クライアント ライブラリによりメッセージが再送されます。
- Pub/Sub Lite サービスはストリームを閉じます。
- クライアント ライブラリはメッセージをバッファして、Lite トピックへの接続を再確立します。
- クライアント ライブラリはメッセージを順番に送信します。
メッセージをパブリッシュすると、Pub/Sub Lite サービスによってメッセージはパーティションに格納され、メッセージ ID がパブリッシャーに返されます。
順序指定キーの使用
メッセージの順序指定キーが同じ場合、クライアント ライブラリは同じパーティションにメッセージを割り当てます。順序指定キーは最大 1,024 バイトの文字列にする必要があります。
順序指定キーは、メッセージの key
フィールドにあり、クライアント ライブラリを使って設定できます。
gcloud
このコマンドを使用するには、Python 3.6 以降と grpcio Python パッケージのインストールが必要です。MacOS、Linux、Cloud Shell のユーザーの場合、以下を実行します。
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
メッセージを公開するには、gcloud beta pubsub lite-topics publish コマンドを使用します。
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--ordering-key=ORDERING_KEY \
--message=MESSAGE_DATA
以下を置き換えます。
- TOPIC_ID: Lite トピックの ID
- LITE_LOCATION: Lite トピックのロケーション
- ORDERING_KEY: パーティションにメッセージを割り当てるために使用される文字列
- MESSAGE_DATA: メッセージ データを含む文字列
Go
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Go の設定手順を実施してください。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
順序指定キーを使用して、複数のメッセージを同じパーティションに送信できます。これにより、サブスクライバーはメッセージを順番に受信します。クライアント ライブラリによって、同じパーティションに複数の順序指定キーが割り当てられることがあります。
イベント時間を設定する
イベント時間を使用して Lite メッセージをパブリッシュできます。イベント時間は、メッセージに追加できるカスタム属性です。
イベントのタイムスタンプは、クライアント ライブラリまたは gcloud CLI を使用して設定できます。
このコマンドを使用するには、Python 3.6 以降と grpcio Python パッケージのインストールが必要です。MacOS、Linux、Cloud Shell のユーザーの場合、以下を実行します。
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
メッセージを公開するには、gcloud beta pubsub lite-topics publish コマンドを使用します。
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--event-time=EVENT_TIME \
--message=MESSAGE_DATA
以下を置き換えます。
TOPIC_ID: Lite トピックの ID
LITE_LOCATION: Lite トピックのロケーション
EVENT_TIME: ユーザー指定のイベント時間。時間形式の詳細については、
gcloud topic datetimes
を実行してください。MESSAGE_DATA: メッセージ データを含む文字列
属性の使用
メッセージ属性は、メッセージに関するメタデータが含まれる Key-Value ペアです。属性はテキスト文字列かバイト文字列で指定します。
属性はメッセージの attributes
フィールドにあります。クライアント ライブラリを使用して属性を設定できます。
gcloud
このコマンドを使用するには、Python 3.6 以降と grpcio Python パッケージのインストールが必要です。MacOS、Linux、Cloud Shell のユーザーの場合、以下を実行します。
sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1
メッセージを公開するには、gcloud beta pubsub lite-topics publish コマンドを使用します。
gcloud pubsub lite-topics publish TOPIC_ID \
--location=LITE_LOCATION \
--message=MESSAGE_DATA \
--attribute=KEY=VALUE,...
以下を置き換えます。
Go
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Go の設定手順を実施してください。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
属性によってメッセージの処理方法を知ることが可能です。サブスクライバーは、メッセージの attributes
フィールドを解析し、その属性に基づいてメッセージを処理できます。
メッセージのバッチ処理
クライアント ライブラリはメッセージをバッチで公開します。バッチが大きいほど、コンピューティング リソースの使用量は少なくなりますが、レイテンシが増加します。バッチサイズはバッチ設定で変更できます。
次の表に、構成できるバッチ処理の設定を一覧表示します。
設定 | 説明 | デフォルト |
---|---|---|
リクエスト サイズ | バッチの最大サイズ(バイト単位)。 | 3.5 MiB |
メッセージの数 | バッチ内のメッセージの最大数。 | メッセージ 1,000 件 |
パブリッシュの遅延 | メッセージをバッチに追加してからバッチを Lite トピックに送信するまでの時間(ミリ秒単位)。 | 50 ミリ秒 |
クライアント ライブラリでバッチ処理の設定を構成できます。
Go
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Go の設定手順を実施してください。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
パブリッシャー アプリケーションが開始されると、クライアント ライブラリによって Lite トピックの各パーティションのバッチが作成されます。たとえば、Lite トピックに 2 つのパーティションがある場合、パブリッシャーは 2 つのバッチを作成し、各バッチをパーティションに送信します。
メッセージをパブリッシュした後、クライアント ライブラリは、バッチが最大リクエスト サイズ、メッセージの最大数、またはパブリッシュの遅延を超えるまでメッセージをバッファします。
メッセージの順序指定
Lite トピックは、メッセージをパブリッシュするときに各パーティション内のメッセージを並べ替えます。同じパーティションにメッセージを割り当てるには、順序指定キーを使用します。
Pub/Sub Lite はパーティションのメッセージを順番に配信し、サブスクライバーは順番にメッセージを処理できます。詳細については、メッセージの受信をご覧ください。
パブリッシュのべき等性
Pub/Sub Lite クライアント ライブラリは、次のバージョンからのべき等性のあるパブリッシュをサポートします。
- java-pubsublite: バージョン 0.10.0
- python-pubsublite: バージョン 1.8.0
- google-cloud-go: pubsublite バージョン 1.7.0
ネットワーク エラーまたはサーバー エラーが原因でメッセージのパブリッシュが再試行された場合、メッセージは 1 回だけ保存されます。べき等性は、同じセッション内でのみ保証されます。新しいパブリッシャー クライアントを使用して同じメッセージを再パブリッシュした場合、べき等性は保証されません。これにより追加のサービス費用が発生したり、パブリッシュのレイテンシが増加することはありません。
べき等性のあるパブリッシュを有効または無効にする
Pub/Sub Lite クライアント ライブラリでは、べき等性のあるパブリッシュがデフォルトで有効になります。べき等性のあるパブリッシュは、それぞれのクライアント ライブラリのパブリッシャー クライアント設定を使用して無効にできます。
べき等性のあるパブリッシングが有効になっている場合、パブリッシュ結果で返されるオフセットは -1
になります。この値が返されるのは、メッセージがすでに正常にパブリッシュされているメッセージの複製として識別されているにもかかわらず、パブリッシュ時にメッセージのオフセットを返すのに必要な情報がサーバーに存在しない場合です。サブスクライバーが受信したメッセージには常に、有効なオフセットが設定されています。
トラブルシューティング
複製を受信した
べき等性は 1 つのセッションに限定されるため、同じメッセージをパブリッシュするためにパブリッシャー クライアントを再作成すると、重複が発生する可能性があります。
Pub/Sub Lite サービスによってパーティションがサブスクライバーに自動的に割り当てられる場合(デフォルト設定)、サブスクライバー クライアントが同じメッセージを複数回受信することがあります。再割り当てが発生すると、メッセージが別のサブスクライバー クライアントに再配信されることがあります。
パブリッシャー エラー
パブリッシャー セッションが非アクティブな状態が 7 日間続いた場合、セッションの状態はサーバー内のガベージ コレクションの対象になります。この期間の後にセッションが再開されると、パブリッシャー クライアントは「Failed Precondition: 想定されるメッセージのパブリッシュ シーケンス番号は...」のようなエラー メッセージで終了し、新しいメッセージを受け入れません。このエラーを解決するには、パブリッシャー クライアントを再作成します。