Pub/Sub Lite メッセージの再生と消去

このページでは、Lite サブスクリプションのシーク オペレーションを開始およびトラッキングする方法について説明します。

Pub/Sub Lite のシーク機能を使用すると、メッセージの再生と消去を行えます。ユースケースは Pub/Sub のシークと同じです。Pub/Sub とは異なり、シークを使用するために Lite トピックやサブスクリプションを構成する必要はなく、追加コストもかかりません。

シークのサブスクライバーへの伝播は、長時間実行オペレーションを使用して追跡できます。これは、Google Cloud プロダクトが長時間実行タスクの進行状況を追跡するために使用する API パターンです。

シークの開始

Pub/Sub Lite のシーク オペレーションは、帯域外で(Google Cloud CLI や個別の Pub/Sub Lite API から)開始され、サブスクライバーに伝播されます。オンラインのチャンネル登録者には、シークが通知され、ライブの間は反応します。オフラインのチャンネル登録者は、オンラインになるとシークに応答します。

シークの対象先を指定する必要があります。次のいずれかになります。

  • メッセージ バックログの先頭: 保持されているすべてのメッセージをリプレイします。なお、使用可能なバックログの量は、Lite トピックのメッセージ保持期間とストレージ容量によって決まります。
  • メッセージ バックログの終了: 現在公開されているすべてのメッセージをスキップして、メッセージを消去します。
  • 公開タイムスタンプ:(サーバーで生成された)公開タイムスタンプが指定したタイムスタンプ以上である最初のメッセージまでシークします。そのようなメッセージが見つからない場合は、メッセージ バックログの終了までシークします。後続のメッセージは、指定したタイムスタンプ以降のパブリッシュ タイムスタンプが保証されます。ただし、未来のタイムスタンプは除きます。
  • イベント タイムスタンプ: (ユーザー指定の)イベント タイムスタンプが、指定したタイムスタンプ以上となる最初のメッセージまでシークします。そのようなメッセージが見つからない場合は、メッセージ バックログの終了までシークします。イベントのタイムスタンプはユーザーが指定するため、後続のメッセージには指定されたイベント時間より小さいイベント タイムスタンプが含まれる可能性があり、必要に応じてクライアントでフィルタリングする必要があります。メッセージにイベント タイムスタンプが設定されていない場合は、そのパブリッシュ タイムスタンプがフォールバックとして使用されます。

Lite サブスクリプションのシークは、Google Cloud CLI か Pub/Sub Lite API を使用して開始できます。

gcloud

Lite サブスクリプションをシークするには、gcloud pubsub lite-subscriptions seek コマンドを使用します。

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

以下を置き換えます。

  • SUBSCRIPTION_ID: Lite サブスクリプションの ID

  • LITE_LOCATION: Lite サブスクリプションのロケーション

  • PUBLISH_TIME: シークする公開タイムスタンプ

  • EVENT_TIME: シークするイベント タイムスタンプ

  • STARTING_OFFSET: beginningまたはend

時間形式に関する詳細は、gcloud topic datetimes をご覧ください。

--async フラグを指定し、リクエストが成功すると、コマンドラインによってシーク オペレーションの ID が表示されます。

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

オペレーションのステータスを取得するには、gcloud pubsub lite-operations describe コマンドを使用します。

REST

Lite サブスクリプションをシークするには、次のような POST リクエストを送信します。

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

次のように置き換えます。

  • REGION: Lite サブスクリプションが含まれているリージョン

  • PROJECT_NUMBER: Lite サブスクリプションを持つプロジェクトのプロジェクト番号

  • LITE_LOCATION: Lite サブスクリプションのロケーション

  • SUBSCRIPTION_ID: Lite サブスクリプションの ID

メッセージ バックログの先頭または終了までシークするには、リクエスト本文に次のフィールドを設定します。

{
  "namedTarget": NAMED_TARGET
}

以下を置き換えます。

  • NAMED_TARGET: メッセージ バックログの先頭の場合は TAIL、末尾の場合は HEAD

公開タイムスタンプまでシークするには、リクエスト本文に次のフィールドを設定します。

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

イベント タイムスタンプまでシークするには、"eventTime" を指定します。

以下を置き換えます。

  • TIMESTAMP: RFC 3339 UTC 形式のタイムスタンプ。精度はナノ秒であり、小数点以下は 9 桁までです。例: "2014-10-02T15:01:23Z""2014-10-02T15:01:23.045123456Z"

リクエストが成功した場合、レスポンスは JSON 形式の長時間実行オペレーションとなります。

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

シーク リクエストが成功した場合、レスポンスは長時間実行オペレーション ID です。サブスクライバーがシークに反応したタイミングを知る必要がある場合は、以下のシークの伝播のトラッキングをご覧ください。

サポートされているクライアント

シーク オペレーションには、次の Pub/Sub Lite クライアント ライブラリと最小バージョンを使用するチャンネル登録者が必要です。

Pub/Sub Lite を Apache Beam または Apache Spark と併用すると、これらのシステムではパーティション内でオフセットが独自に追跡されるため、シーク オペレーションは機能しません。回避策は、ワークフローをドレイン、シークして再起動することです。

Pub/Sub Lite サービスは、シーク オペレーションをサポートしていないチャンネル登録者のクライアント(古いクライアント ライブラリのバージョンやサポートされていないフレームワークなど)を検出でき、FAILED_PRECONDITION エラー ステータスによりシークが中止されます。

シーク伝播のトラッキング

最初のシーク リクエストに対して長時間実行オペレーション ID が返された場合、そのシークは Pub/Sub Lite サービスに正しく登録され、最終的にチャンネル登録者に伝播されます(クライアントがサポートされている場合は、上記のとおりです)。オペレーションはこの伝播を追跡し、すべてのパーティションでチャンネル登録者がシークに応答すると完了します。

サブスクライバーがオンラインの場合は、シーク通知の受信に最大 30 秒かかることがあります。シーク通知は、パーティションごとに独立して送信されるため、これらのパーティションが同時にシークに応答するとは限りません。サブスクライバーがオフラインの場合は、それがオンラインになるとシーク オペレーションが完了します。

前のシーク呼び出しがチャンネル登録者への電波を完了していない場合は、中止され、新しいシーク オペレーションに置き換えられます。シーク オペレーションのメタデータは 30 日後に期限切れとなり、不完全なシーク オペレーションが実質的に中止されます。

シーク オペレーションのステータス

シーク オペレーションのステータスは、Google Cloud CLI か Pub/Sub Lite API を使用して取得できます。

gcloud

Lite オペレーションの詳細を取得するには、gcloud pubsub lite-operations describe コマンドを使用します。

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

以下を置き換えます。

  • OPERATION_ID: Lite オペレーションの ID

  • LITE_LOCATION: Lite オペレーションのロケーション

リクエストが成功すると、コマンドラインによって Lite オペレーションに関するメタデータが表示されます。

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Lite オペレーションの詳細を取得するには、次のような GET リクエストを送信します。

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

次のように置き換えます。

  • REGION: Lite オペレーションが含まれているリージョン

  • PROJECT_NUMBER: Lite オペレーションを持つプロジェクトのプロジェクト番号

  • LITE_LOCATION: Lite オペレーションのロケーション

  • OPERATION_ID: Lite オペレーションの ID

リクエストが成功した場合、レスポンスは JSON 形式の長時間実行オペレーションとなります。

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

シーク オペレーションの一覧表示

完了したシーク オペレーションとアクティブなシーク オペレーションは、Google Cloud CLI か Pub/Sub Lite API を使用して表示できます。

gcloud

プロジェクト内の Lite オペレーションを一覧表示するには、gcloud pubsub lite-operations list コマンドを使用します。

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

次のように置き換えます。

  • LITE_LOCATION: Lite オペレーションが含まれているロケーション

  • SUBSCRIPTION: Lite サブスクリプションでオペレーションをフィルタする

  • DONE: 完了したオペレーションのみを含めるには true、アクティブなオペレーションのみを含めるには false

  • LIMIT: 返されるオペレーションの数を制限する整数

リクエストが成功すると、コマンドラインによって Lite オペレーションの概要が表示されます。

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

プロジェクト内の Lite オペレーションを一覧表示するには、次のような GET リクエストを送信します。

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

次のように置き換えます。

  • REGION: Lite オペレーションが含まれているリージョン

  • PROJECT_NUMBER: Lite オペレーションを持つプロジェクトのプロジェクト番号

  • LITE_LOCATION: Lite オペレーションが含まれているロケーション

リクエストが成功した場合、レスポンスは JSON 形式の Lite オペレーションのリストになります。

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}