Pub/Sub Lite-Nachrichten wiedergeben und dauerhaft löschen

Auf dieser Seite wird gezeigt, wie Sie Suchvorgänge für Lite-Abos initiieren und verfolgen.

Mit der Suchfunktion von Pub/Sub Lite können Sie Nachrichten wiedergeben und dauerhaft löschen. Es gibt dieselben Anwendungsfälle wie die Pub/Sub-Suche. Im Gegensatz zu Pub/Sub müssen Sie keine Lite-Themen oder -Abos für Suchvorgänge konfigurieren und es entstehen keine zusätzlichen Kosten für die Nutzung.

Die Weitergabe der Suche an Abonnenten kann mit einem lang andauernden Vorgang verfolgt werden. Dies ist ein API-Muster, das von Google Cloud-Produkten verwendet wird, um den Fortschritt von lang andauernden Aufgaben zu verfolgen.

Suchvorgang initiieren

Pub/Sub Lite-Suchvorgänge werden Out-of-Band (d. h. von der Google Cloud CLI oder der separaten Pub/Sub Lite API) initiiert und an die Abonnenten weitergegeben. Online-Abonnenten werden über die Suche benachrichtigt und reagieren, während sie aktiv sind. Offline-Abonnenten reagieren auf die Suche, sobald sie online sind.

Sie müssen einen Zielspeicherort für die Suche angeben. Dies kann einer der folgenden sein:

  • Anfang des Nachrichtenrückstands: Gibt alle aufbewahrten Nachrichten noch einmal aus. Der Umfang des verfügbaren Rückstands hängt von der Aufbewahrungsdauer und der Speicherkapazität des Lite-Themas ab.
  • Ende des Nachrichtenrückstands: Löscht Nachrichten dauerhaft, indem alle aktuell veröffentlichten Nachrichten übersprungen werden.
  • Veröffentlichungszeitstempel: Sucht bis zur ersten Nachricht mit einem (servergenerierten) Veröffentlichungszeitstempel, der größer oder gleich dem angegebenen Zeitstempel ist. Wenn keine solche Nachricht gefunden werden kann, wird bis zum Ende des Nachrichtenrückstands gesucht. Nachfolgende Nachrichten haben garantiert einen Veröffentlichungszeitstempel, der größer oder gleich dem angegebenen Zeitstempel ist, mit Ausnahme von angegebenen Zeitstempeln, die in der Zukunft liegen.
  • Ereigniszeitstempel: Sucht bis zur ersten Nachricht mit einem (benutzerdefinierten) Ereigniszeitstempel, der größer oder gleich dem angegebenen Zeitstempel ist. Wenn keine solche Nachricht gefunden werden kann, wird bis zum Ende des Nachrichtenrückstands gesucht. Da Ereigniszeitstempel vom Nutzer angegeben werden, können nachfolgende Nachrichten Ereigniszeitstempel haben, die kürzer als die angegebene Ereigniszeit sind, und sollten vom Client bei Bedarf gefiltert werden. Wenn für Nachrichten kein Ereigniszeitstempel festgelegt ist, werden ihre Veröffentlichungszeitstempel als Fallback verwendet.

Sie können mit der Google Cloud CLI oder der Pub/Sub Lite API eine Suche nach einem Lite-Abo starten.

gcloud

Verwenden Sie den Befehl gcloud pubsub lite-subscriptions seek, um ein Lite-Abo zu suchen:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Ersetzen Sie Folgendes:

  • SUBSCRIPTION_ID: die ID des Lite-Abos

  • LITE_LOCATION: der Standort des Lite-Abos

  • PUBLISH_TIME: der Veröffentlichungszeitstempel, zu dem gesucht werden soll

  • EVENT_TIME: der Ereigniszeitstempel, zu dem gesucht werden soll

  • STARTING_OFFSET: beginning oder end

Weitere Informationen zu Zeitformaten finden Sie unter gcloud topic datetimes.

Wenn Sie das Flag --async angeben und die Anfrage erfolgreich ist, wird in der Befehlszeile die ID des Suchvorgangs angezeigt:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Verwenden Sie den Befehl gcloud pubsub lite-operations describe, um den Vorgangsstatus abzurufen.

REST

Senden Sie zum Suchen eines Lite-Abos eine POST-Anfrage wie die folgende:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Ersetzen Sie Folgendes:

  • REGION: die Region, in der sich das Lite-Abo befindet

  • PROJECT_NUMBER: die Projektnummer des Projekts mit dem Lite-Abo

  • LITE_LOCATION: der Standort des Lite-Abos

  • SUBSCRIPTION_ID: die ID des Lite-Abos

Legen Sie die folgenden Felder im Anfragetext fest, um zum Anfang oder Ende des Nachrichtenrückstands zu suchen:

{
  "namedTarget": NAMED_TARGET
}

Ersetzen Sie Folgendes:

  • NAMED_TARGET: TAIL für den Beginn oder HEAD für das Ende des Nachrichtenrückstands.

Um nach einem Veröffentlichungszeitstempel zu suchen, legen Sie die folgenden Felder im Anfragetext fest:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Geben Sie "eventTime" an, um zu einem Ereigniszeitstempel zu suchen.

Ersetzen Sie Folgendes:

  • TIMESTAMP: Ein Zeitstempel im Format RFC 3339 UTC mit einer Auflösung im Nanosekundenbereich und bis zu neun Nachkommastellen. Beispiele: "2014-10-02T15:01:23Z" und "2014-10-02T15:01:23.045123456Z".

Wenn die Anfrage erfolgreich ist, ist die Antwort ein Vorgang mit langer Ausführungszeit im JSON-Format:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Einfach loslegen (Go)

Bevor Sie dieses Beispiel testen, folgen Sie der Einrichtungsanleitung für Go in der Schnellstart-Anleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Wenn die Suchanfrage erfolgreich ist, ist die Antwort eine Vorgangs-ID mit langer Ausführungszeit. Lesen Sie die Informationen zum Thema Such-Weitergabe verfolgen weiter unten, wenn Sie wissen möchten, wann Abonnenten auf die Suche reagiert haben.

Unterstützte Clients

Für Such-Vorgänge sind Abonnenten erforderlich, die die folgenden Pub/Sub Lite-Clientbibliotheken und Mindestversionen verwenden:

Suchvorgänge funktionieren nicht, wenn Pub/Sub Lite mit Apache Beam oder Apache Spark verwendet wird, da diese Systeme selbst Offsets innerhalb von Partitionen verfolgen. Um dieses Problem zu umgehen, leeren Sie die Workflows, suchen und starten Sie diese neu.

Der Pub/Sub Lite-Dienst kann einen Abonnentenclient erkennen, der keine Suchvorgänge unterstützt (z. B. eine alte Version einer Clientbibliothek oder ein nicht unterstütztes Framework) und den Suchvorgang mit einem FAILED_PRECONDITION Fehlerstatus abbrechen.

Such-Weitergabe verfolgen

Wenn für die erste Suchanfrage eine ID für einen Vorgang mit langer Ausführungszeit zurückgegeben wird, wurde die Suche erfolgreich im Pub/Sub Lite-Dienst registriert und wird letztendlich an die Abonnenten weitergegeben, sofern der Client unterstützt wird (siehe oben). Der Vorgang verfolgt diese Weitergabe und wird abgeschlossen, sobald bei allen Partitionen Abonnenten auf den Suchvorgang reagiert haben.

Wenn Abonnenten online sind, kann es bis zu 30 Sekunden dauern, bis sie die Suchbenachrichtigung erhalten. Suchbenachrichtigungen werden für jede Partition unabhängig gesendet, sodass Partitionen nicht gleichzeitig auf die Suche reagieren. Wenn Abonnenten offline sind, wird der Suchvorgang abgeschlossen, sobald sie online sind.

Wenn ein vorheriger Suchaufruf noch nicht vollständig an Abonnenten weitergegeben wurde, wird er abgebrochen und durch den neuen Suchvorgang ersetzt. Die Metadaten des Suchvorgangs laufen nach 30 Tagen ab, wodurch unvollständige Suchvorgänge abgebrochen werden.

Status des Suchvorgangs

Sie können den Status eines Suchvorgangs über die Google Cloud CLI oder die Pub/Sub Lite API abrufen.

gcloud

Mit dem Befehl gcloud pubsub lite-operations describe können Sie Details zu einem Lite-Vorgang abrufen:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Ersetzen Sie Folgendes:

  • OPERATION_ID: die ID des Lite-Vorgangs

  • LITE_LOCATION: der Speicherort des Lite-Vorgangs

Wenn die Anfrage erfolgreich ist, zeigt die Befehlszeile Metadaten zum Lite-Vorgang an:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Wenn Sie Details zu einem Lite-Vorgang abrufen möchten, senden Sie eine GET-Anfrage wie die folgende:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Ersetzen Sie Folgendes:

  • REGION: die Region, in der sich der Lite-Vorgang befindet

  • PROJECT_NUMBER: die Projektnummer des Projekts mit dem Lite-Vorgang

  • LITE_LOCATION: der Speicherort des Lite-Vorgangs

  • OPERATION_ID: die ID des Lite-Vorgangs

Wenn die Anfrage erfolgreich ist, ist die Antwort ein Vorgang mit langer Ausführungszeit im JSON-Format:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Suchvorgänge auflisten

Abgeschlossene und aktive Suchvorgänge können über die Google Cloud CLI oder die Pub/Sub Lite API aufgelistet werden.

gcloud

Verwenden Sie den Befehl gcloud pubsub lite-operations list, um Lite-Vorgänge in einem Projekt aufzulisten:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Ersetzen Sie Folgendes:

  • LITE_LOCATION: Standort, an dem sich die Lite-Vorgänge befinden

  • SUBSCRIPTION: Vorgänge nach Lite-Abo filtern

  • DONE: true, um nur vollständige Vorgänge einzubeziehen, false, um nur aktive Vorgänge einzuschließen

  • LIMIT: eine Ganzzahl, um die Anzahl der zurückgegebenen Vorgänge zu begrenzen

Wenn die Anfrage erfolgreich ist, wird in der Befehlszeile eine Zusammenfassung der Lite-Vorgänge angezeigt:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Senden Sie eine GET-Anfrage wie die folgende, um Lite-Vorgänge in einem Projekt aufzulisten:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Ersetzen Sie Folgendes:

  • REGION: die Region, in der sich die Lite-Vorgänge befinden

  • PROJECT_NUMBER: die Projektnummer des Projekts mit den Lite-Vorgängen

  • LITE_LOCATION: Standort, an dem sich die Lite-Vorgänge befinden

Wenn die Anfrage erfolgreich ist, ist die Antwort eine Liste von Lite-Vorgängen im JSON-Format:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}