Replaying and purging Pub/Sub Lite messages

This page shows how to initiate and track seek operations for Lite subscriptions.

The Pub/Sub Lite seek feature allows you to replay and purge messages. It has the same use cases as Pub/Sub seek. Unlike Pub/Sub, you do not need to configure Lite topics or subscriptions to use seek and there is no additional cost.

The propagation of the seek to subscribers can be tracked using a long-running operation. This is an API pattern used by Google Cloud products to track the progress of long-running tasks.

Initiating seek

Pub/Sub Lite seek operations are initiated out-of-band (that is, from the Google Cloud CLI or separate Pub/Sub Lite API) and propagated to subscribers. Online subscribers will be notified of the seek and react while they are live. Offline subscribers will react to the seek once they are online.

You must specify a target location for the seek, which may be one of the following:

  • Beginning of message backlog: Replays all retained messages. Note that the amount of available backlog is determined by the Lite topic's message retention period and storage capacity.
  • End of message backlog: Purges messages by skipping past all current published messages.
  • Publish timestamp: Seeks to the first message with a (server-generated) publish timestamp greater than or equal to the specified timestamp. If no such message can be located, seeks to the end of the message backlog. Subsequent messages are guaranteed to have a publish timestamp greater than or equal to the specified timestamp, with the exception of specified timestamps that are in the future.
  • Event timestamp: Seeks to the first message with an (user-specified) event timestamp greater than or equal to the specified timestamp. If no such message can be located, seeks to the end of the message backlog. As event timestamps are user supplied, subsequent messages may have event timestamps less than the specified event time and should be filtered by the client, if necessary. If messages do not have an event timestamp set, their publish timestamps are used as a fallback.

You can initiate a seek for a Lite subscription with the Google Cloud CLI or the Pub/Sub Lite API.

gcloud

To seek a Lite subscription, use the gcloud pubsub lite-subscriptions seek command:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Replace the following:

  • SUBSCRIPTION_ID: the ID of the Lite subscription

  • LITE_LOCATION: the location of the Lite subscription

  • PUBLISH_TIME: the publish timestamp to seek to

  • EVENT_TIME: the event timestamp to seek to

  • STARTING_OFFSET: beginning or end

See gcloud topic datetimes for information on time formats.

If you specify the --async flag and the request is successful, the command line displays the ID of the seek operation:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Use the gcloud pubsub lite-operations describe command to get the operation status.

REST

To seek a Lite subscription, send a POST request like the following:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Replace the following:

  • REGION: the region that the Lite subscription is in

  • PROJECT_NUMBER: the project number of the project with the Lite subscription

  • LITE_LOCATION: the location of the Lite subscription

  • SUBSCRIPTION_ID: the ID of the Lite subscription

To seek to the beginning or end of the message backlog, set the following fields in the request body:

{
  "namedTarget": NAMED_TARGET
}

Replace the following:

  • NAMED_TARGET: TAIL for the beginning or HEAD for the end of the message backlog.

To seek to a publish timestamp, set the following fields in the request body:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Specify "eventTime" to seek to an event timestamp.

Replace the following:

  • TIMESTAMP: A timestamp in RFC 3339 UTC format, with nanosecond resolution and up to nine fractional digits. Examples: "2014-10-02T15:01:23Z" and "2014-10-02T15:01:23.045123456Z".

If the request is successful, the response is a long-running operation in JSON format:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Before trying this sample, follow the Go setup instructions in Quickstart: Using Client Libraries. For more information, see the Pub/Sub Go API reference documentation.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Before running this sample, follow the Java setup instructions in Pub/Sub Lite Client Libraries.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Before running this sample, follow the Python setup instructions in Pub/Sub Lite Client Libraries.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

If the seek request is successful, the response is a long-running operation ID. See information about tracking seek propagation below if you need to know when subscribers have reacted to the seek.

Supported clients

Seek operations require subscribers that use the following Pub/Sub Lite client libraries and minimum versions:

Seek operations do not work when Pub/Sub Lite is used with Apache Beam or Apache Spark because these systems perform their own tracking of offsets within partitions. The workaround is to drain, seek and restart the workflows.

The Pub/Sub Lite service is able to detect a subscriber client that does not support seek operations (for example, an old client library version or unsupported framework) and will abort the seek with a FAILED_PRECONDITION error status.

Tracking seek propagation

If a long-running operation ID is returned for the initial seek request, this means the seek was successfully registered in the Pub/Sub Lite service and will eventually propagate to subscribers (if the client is supported, as above). The operation tracks this propagation and completes once subscribers have reacted to the seek, for all partitions.

If subscribers are online, it may take up to 30 seconds for them to receive the seek notification. Seek notifications are sent independently for each partition, thus partitions may not react to the seek at the same instant. If subscribers are offline, the seek operation will complete once they are online.

If a previous seek invocation hasn't finished propagating to subscribers, it is aborted and superseded by the new seek operation. Seek operation metadata expires after 30 days, which effectively aborts any incomplete seek operations.

Seek operation status

You can get the status of a seek operation using the Google Cloud CLI, or the Pub/Sub Lite API.

gcloud

To get details about a Lite operation, use the gcloud pubsub lite-operations describe command:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Replace the following:

  • OPERATION_ID: the ID of the Lite operation

  • LITE_LOCATION: the location of the Lite operation

If the request is successful, the command line displays metadata about the Lite operation:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

To get details about a Lite operations, send a GET request like the following:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Replace the following:

  • REGION: the region that the Lite operation is in

  • PROJECT_NUMBER: the project number of the project with the Lite operation

  • LITE_LOCATION: the location of the Lite operation

  • OPERATION_ID: the ID of the Lite operation

If the request is successful, the response is a long-running operation in JSON format:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Listing seek operations

Completed and active seek operations can be listed using the Google Cloud CLI, or the Pub/Sub Lite API.

gcloud

To list Lite operations in a project, use the gcloud pubsub lite-operations list command:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Replace the following:

  • LITE_LOCATION: the location that the Lite operations are in

  • SUBSCRIPTION: filter operations by Lite subscription

  • DONE: true to include only complete operations, false to include only active operations

  • LIMIT: an integer to limit the number of operations returned

If the request is successful, the command line displays a summary of the Lite operations:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

To list Lite operations in a project, send a GET request like the following:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Replace the following:

  • REGION: the region that the Lite operations are in

  • PROJECT_NUMBER: the project number of the project with the Lite operations

  • LITE_LOCATION: the location that the Lite operations are in

If the request is successful, the response is a list of Lite operations in JSON format:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}