重放和完全清除 Pub/Sub 精简版消息

本页面介绍如何为精简版订阅启动和跟踪还原操作。

Pub/Sub 精简版还原功能允许您重放和完全清除消息。它具有与 Pub/Sub 还原功能相同的用例。 与 Pub/Sub 不同,您无需将精简版主题或订阅配置为使用还原功能,并且无需支付额外费用。

您可以使用长时间运行的操作跟踪还原功能传播给订阅者的情况。这是 Google Cloud 产品用来跟踪长时间运行的任务进度的 API 模式。

启动还原功能

Pub/Sub 精简版还原操作在带外(即通过 Google Cloud CLI 或单独的 Pub/Sub Lite API)启动并传播到订阅者。在线订阅者将在上线时收到还原通知并做出回应。离线订阅者会在处于在线状态后响应还原。

您必须为还原指定目标位置,该位置可以是下列位置的其中一个:

  • 消息积压开始:重放所有保留的消息。请注意,可用积压量由精简版主题的消息保留期限和存储容量决定。
  • 消息结束积压:通过跳过当前发布的所有消息来完全清除消息。
  • 发布时间戳:还原至(服务器生成的)发布时间戳晚于或等于指定时间戳的第一个消息。如果找不到此消息,则还原至消息积压的结尾。可保证后续消息的发布时间戳晚于或等于指定时间戳,但指定时间戳是将来时间的情况除外。
  • 事件时间戳:还原到(用户指定的)事件时间戳晚于或等于指定时间戳的第一个消息。如果找不到此消息,则还原至消息积压的结尾。由于事件时间戳由用户提供,因此后续消息的事件时间戳可能早于指定的事件时间,并且应由客户端根据需要过滤。如果消息未设置事件时间戳,则其发布时间戳将用作后备。

您可以使用 Google Cloud CLI 或 Pub/Sub Lite API 发起精简版订阅还原。

gcloud

如需还原精简版订阅,请使用 gcloud pubsub lite-subscriptions seek 命令:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

替换以下内容:

  • SUBSCRIPTION_ID:精简版订阅的 ID

  • LITE_LOCATION:精简版订阅的位置

  • PUBLISH_TIME:要还原到的发布时间戳

  • EVENT_TIME:要还原到的事件时间戳

  • STARTING_OFFSETbeginningend

如需了解时间格式,请参阅 gcloud topic datetimes

如果您指定 --async 标志且请求成功,命令行会显示跳转操作的 ID:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

使用 gcloud pubsub lite-operations describe 命令获取操作状态。

REST

要还原精简版订阅,请发送 POST 请求,如下所示:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

替换以下内容:

  • REGION:精简版订阅所在的区域

  • PROJECT_NUMBER:具有精简版订阅的项目的项目编号

  • LITE_LOCATION:精简版订阅的位置

  • SUBSCRIPTION_ID:精简版订阅的 ID

要还原到消息积压项的开头或结尾,请在请求正文中设置以下字段:

{
  "namedTarget": NAMED_TARGET
}

替换以下内容:

  • NAMED_TARGET:消息积压的 TAIL(开始)或 HEAD(结束)。

如需还原到发布时间戳,请在请求正文中设置以下字段:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

指定 "eventTime" 可还原至事件时间戳。

替换以下内容:

  • TIMESTAMP:采用 RFC 3339 世界协调时间 (UTC) 格式的时间戳,精度为纳秒,最多保留九位小数。示例:"2014-10-02T15:01:23Z""2014-10-02T15:01:23.045123456Z"

如果请求成功,则响应是采用 JSON 格式的长时间运行的操作:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

在尝试此示例之前,请按照《快速入门:使用客户端库》中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Python 设置说明进行操作。

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

如果还原请求成功,则响应为长时间运行的操作 ID。如果您需要了解订阅者何时对还原做出反应,请参阅有关跟踪还原传播的信息(见下文)。

支持的客户端

还原操作要求使用以下 Pub/Sub Lite 客户端库和最低版本的订阅者:

将 Pub/Sub Lite 与 Apache BeamApache Spark 搭配使用时,还原操作不起作用,因为这些系统会自行跟踪分区内偏移。解决方法是排空、还原和重启工作流。

Pub/Sub 精简版服务能够检测不支持还原操作(例如旧客户端库版本或不受支持的框架)的订阅者客户端,并通过 FAILED_PRECONDITION 错误状态取消还原操作。

跟踪还原传播

如果为初始还原请求返回长时间运行的操作 ID,则表示还原已成功在 Pub/Sub 精简版服务中注册,并且最终将传播到订阅者(如果客户端支持,则如上所述)。该操作会跟踪此传播,并在订阅者响应所有分区的还原后完成。

如果订阅者在线,则他们最多可能需要 30 秒才能收到还原通知。还原通知会针对每个分区单独发送,因此分区可能不会在同一时刻对还原操作做出反应。如果订阅者离线,则还原操作会在订阅者在线后完成。

如果之前的还原调用尚未完成传播到订阅者,则此调用会中止并被新的还原操作取代。还原操作元数据在 30 天后过期,这实际上会中止任何未完成的还原操作。

还原操作状态

您可以使用 Google Cloud CLI 或 Pub/Sub Lite API 获取还原操作的状态。

gcloud

要获取有关精简版操作的详细信息,请使用 gcloud pubsub lite-operations describe 命令:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

替换以下内容:

  • OPERATION_ID:精简版操作的 ID

  • LITE_LOCATION:精简版操作的位置

如果请求成功,命令行会显示有关精简版操作的元数据:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

如需获取有关精简版操作的详细信息,请发送 GET 请求,如下所示:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

替换以下内容:

  • REGION:精简版操作所在的区域

  • PROJECT_NUMBER:具有精简版操作的项目的项目编号

  • LITE_LOCATION:精简版操作的位置

  • OPERATION_ID:精简版操作的 ID

如果请求成功,则响应是采用 JSON 格式的长时间运行的操作:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

列出还原操作

可以使用 Google Cloud CLI 或 Pub/Sub Lite API 列出已完成和活跃的还原操作。

gcloud

如需列出项目中的精简版操作,请使用 gcloud pubsub lite-operations list 命令:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

替换以下内容:

  • LITE_LOCATION:精简版操作所在的位置

  • SUBSCRIPTION:按精简版订阅过滤操作

  • DONEtrue 以仅包含完成的操作,false 以仅包含活跃的操作

  • LIMIT:此标志值为一个整数,用于限制返回的操作数

如果请求成功,命令行会显示精简版操作的摘要:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

如需列出项目中的精简版操作,请发送 GET 请求,如下所示:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

替换以下内容:

  • REGION:精简版操作所在的区域

  • PROJECT_NUMBER:具有精简版操作的项目的项目编号

  • LITE_LOCATION:精简版操作所在的位置

如果请求成功,则响应是以 JSON 格式列出的精简版操作列表:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}