このページでは、Lite サブスクリプションのシーク オペレーションを開始およびトラッキングする方法について説明します。
Pub/Sub Lite のシーク機能を使用すると、メッセージの再生と消去を行えます。ユースケースは Pub/Sub のシークと同じです。Pub/Sub とは異なり、シークを使用するために Lite トピックやサブスクリプションを構成する必要はなく、追加コストもかかりません。
シークのサブスクライバーへの伝播は、長時間実行オペレーションを使用して追跡できます。これは、Google Cloud プロダクトが長時間実行タスクの進行状況を追跡するために使用する API パターンです。
シーク開始
Pub/Sub Lite のシーク オペレーションは、帯域外で(Google Cloud CLI や個別の Pub/Sub Lite API から)開始され、サブスクライバーに伝播されます。オンラインのチャンネル登録者には、シークが通知され、ライブの間は反応します。オフラインのチャンネル登録者は、オンラインになるとシークに応答します。
シークの対象先を指定する必要があります。次のいずれかになります。
- メッセージ バックログの先頭: 保持されているすべてのメッセージをリプレイします。なお、使用可能なバックログの量は、Lite トピックのメッセージ保持期間とストレージ容量によって決まります。
- メッセージ バックログの終了: 現在公開されているすべてのメッセージをスキップして、メッセージを消去します。
- 公開タイムスタンプ:(サーバーで生成された)公開タイムスタンプが指定したタイムスタンプ以上である最初のメッセージまでシークします。そのようなメッセージが見つからない場合は、メッセージ バックログの終了までシークします。後続のメッセージは、指定したタイムスタンプ以降のパブリッシュ タイムスタンプが保証されます。ただし、未来のタイムスタンプは除きます。
- イベント タイムスタンプ: (ユーザー指定の)イベント タイムスタンプが、指定したタイムスタンプ以上となる最初のメッセージまでシークします。そのようなメッセージが見つからない場合は、メッセージ バックログの終了までシークします。イベントのタイムスタンプはユーザーが指定するため、後続のメッセージには指定されたイベント時間より小さいイベント タイムスタンプが含まれる可能性があり、必要に応じてクライアントでフィルタリングする必要があります。メッセージにイベント タイムスタンプが設定されていない場合は、そのパブリッシュ タイムスタンプがフォールバックとして使用されます。
Lite サブスクリプションのシークは、Google Cloud CLI か Pub/Sub Lite API を使用して開始できます。
gcloud
Lite サブスクリプションをシークするには、gcloud pubsub lite-subscriptions seek
コマンドを使用します。
gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \ --location=LITE_LOCATION \ (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \ --starting-offset=STARTING_OFFSET) \ [--async]
以下を置き換えます。
SUBSCRIPTION_ID: Lite サブスクリプションの ID
LITE_LOCATION: Lite サブスクリプションのロケーション
PUBLISH_TIME: シークする公開タイムスタンプ
EVENT_TIME: シークするイベント タイムスタンプ
STARTING_OFFSET:
beginning
またはend
時間形式に関する詳細は、gcloud topic datetimes
をご覧ください。
--async
フラグを指定し、リクエストが成功すると、コマンドラインによってシーク オペレーションの ID が表示されます。
Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.
オペレーションのステータスを取得するには、gcloud pubsub lite-operations describe
コマンドを使用します。
REST
Lite サブスクリプションをシークするには、次のような POST
リクエストを送信します。
POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek Authorization: Bearer $(gcloud auth print-access-token)
次のように置き換えます。
REGION: Lite サブスクリプションが含まれているリージョン
PROJECT_NUMBER: Lite サブスクリプションを持つプロジェクトのプロジェクト番号
LITE_LOCATION: Lite サブスクリプションのロケーション
SUBSCRIPTION_ID: Lite サブスクリプションの ID
メッセージ バックログの先頭または終了までシークするには、リクエスト本文に次のフィールドを設定します。
{ "namedTarget": NAMED_TARGET }
以下を置き換えます。
- NAMED_TARGET: メッセージ バックログの先頭の場合は
TAIL
、末尾の場合はHEAD
。
公開タイムスタンプまでシークするには、リクエスト本文に次のフィールドを設定します。
{ "timeTarget": { "publishTime": TIMESTAMP } }
イベント タイムスタンプまでシークするには、"eventTime"
を指定します。
以下を置き換えます。
- TIMESTAMP: RFC 3339 UTC 形式のタイムスタンプ。精度はナノ秒であり、小数点以下は 9 桁までです。例:
"2014-10-02T15:01:23Z"
、"2014-10-02T15:01:23.045123456Z"
。
リクエストが成功した場合、レスポンスは JSON 形式の長時間実行オペレーションとなります。
{ "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID, ... }
Go
このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Python
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。
シーク リクエストが成功した場合、レスポンスは長時間実行オペレーション ID です。サブスクライバーがシークに反応したタイミングを知る必要がある場合は、以下のシークの伝播のトラッキングをご覧ください。
サポートされているクライアント
シーク オペレーションには、次の Pub/Sub Lite クライアント ライブラリと最小バージョンを使用するチャンネル登録者が必要です。
- java-pubsublite: バージョン 0.15.0
- java-pubsublite-kafka: バージョン 0.6.0。コンシューマーでも自動 commit を有効にする必要があります。
- python-pubsublite: バージョン 0.6.0
- google-cloud-go: pubsublite バージョン 0.10.0
Pub/Sub Lite を Apache Beam または Apache Spark と併用すると、これらのシステムではパーティション内でオフセットが独自に追跡されるため、シーク オペレーションは機能しません。回避策は、ワークフローをドレイン、シークして再起動することです。
Pub/Sub Lite サービスは、シーク オペレーションをサポートしていないチャンネル登録者のクライアント(古いクライアント ライブラリのバージョンやサポートされていないフレームワークなど)を検出でき、FAILED_PRECONDITION
エラー ステータスによりシークが中止されます。
シーク伝播のトラッキング
最初のシーク リクエストに対して長時間実行オペレーション ID が返された場合、そのシークは Pub/Sub Lite サービスに正しく登録され、最終的にチャンネル登録者に伝播されます(クライアントがサポートされている場合は、上記のとおりです)。オペレーションはこの伝播を追跡し、すべてのパーティションでチャンネル登録者がシークに応答すると完了します。
サブスクライバーがオンラインの場合は、シーク通知の受信に最大 30 秒かかることがあります。シーク通知は、パーティションごとに独立して送信されるため、これらのパーティションが同時にシークに応答するとは限りません。サブスクライバーがオフラインの場合は、それがオンラインになるとシーク オペレーションが完了します。
前のシーク呼び出しがチャンネル登録者への電波を完了していない場合は、中止され、新しいシーク オペレーションに置き換えられます。シーク オペレーションのメタデータは 30 日後に期限切れとなり、不完全なシーク オペレーションが実質的に中止されます。
オペレーション ステータスをシークする
シーク オペレーションのステータスは、Google Cloud CLI か Pub/Sub Lite API を使用して取得できます。
gcloud
Lite オペレーションの詳細を取得するには、gcloud pubsub lite-operations describe
コマンドを使用します。
gcloud pubsub lite-operations describe OPERATION_ID \ --location=LITE_LOCATION
以下を置き換えます。
OPERATION_ID: Lite オペレーションの ID
LITE_LOCATION: Lite オペレーションのロケーション
リクエストが成功すると、コマンドラインによって Lite オペレーションに関するメタデータが表示されます。
metadata: '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata createTime: '2021-01-02T03:04:05Z' target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID verb: seek name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
REST
Lite オペレーションの詳細を取得するには、次のような GET
リクエストを送信します。
GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID Authorization: Bearer $(gcloud auth print-access-token)
次のように置き換えます。
REGION: Lite オペレーションが含まれているリージョン
PROJECT_NUMBER: Lite オペレーションを持つプロジェクトのプロジェクト番号
LITE_LOCATION: Lite オペレーションのロケーション
OPERATION_ID: Lite オペレーションの ID
リクエストが成功した場合、レスポンスは JSON 形式の長時間実行オペレーションとなります。
{ "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID, ... }
リスト内のシーク オペレーション
完了したシーク オペレーションとアクティブなシーク オペレーションは、Google Cloud CLI か Pub/Sub Lite API を使用して表示できます。
gcloud
プロジェクト内の Lite オペレーションを一覧表示するには、gcloud pubsub lite-operations list
コマンドを使用します。
gcloud pubsub lite-operations list \
--location=LITE_LOCATION \
[--subscription=SUBSCRIPTION] \
[--done=DONE] \
[--limit=LIMIT]
次のように置き換えます。
LITE_LOCATION: Lite オペレーションが含まれているロケーション
SUBSCRIPTION: Lite サブスクリプションでオペレーションをフィルタする
DONE: 完了したオペレーションのみを含めるには
true
、アクティブなオペレーションのみを含めるにはfalse
LIMIT: 返されるオペレーションの数を制限する整数
リクエストが成功すると、コマンドラインによって Lite オペレーションの概要が表示されます。
OPERATION_ID TARGET CREATE_TIME DONE ERROR_CODE MESSAGE operation2 projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID 2021-05-06T07:08:00Z True operation1 projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID 2021-01-02T03:04:00Z True
REST
プロジェクト内の Lite オペレーションを一覧表示するには、次のような GET
リクエストを送信します。
GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations Authorization: Bearer $(gcloud auth print-access-token)
次のように置き換えます。
REGION: Lite オペレーションが含まれているリージョン
PROJECT_NUMBER: Lite オペレーションを持つプロジェクトのプロジェクト番号
LITE_LOCATION: Lite オペレーションが含まれているロケーション
リクエストが成功した場合、レスポンスは JSON 形式の Lite オペレーションのリストになります。
{ "operations": [ { "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID", ... }, { "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID", ... } ] }