Riproduzione ed eliminazione definitiva dei messaggi Pub/Sub Lite

Questa pagina mostra come avviare e monitorare le operazioni di ricerca per le sottoscrizioni Lite.

La funzionalità di ricerca Pub/Sub Lite consente di riprodurre di nuovo ed eliminare definitivamente i messaggi. Riguarda gli stessi casi d'uso della ricerca di Pub/Sub. A differenza di Pub/Sub, non è necessario configurare argomenti o sottoscrizioni Lite per utilizzare la ricerca e non sono previsti costi aggiuntivi.

La propagazione della ricerca nei sottoscrittori può essere monitorata utilizzando un'operazione a lunga esecuzione. Questo è un pattern API utilizzato dai prodotti Google Cloud per monitorare l'avanzamento delle attività a lunga esecuzione.

Avvio della ricerca

Le operazioni di ricerca di Pub/Sub Lite vengono avviate fuori banda (ovvero da Google Cloud CLI o dall'API Pub/Sub Lite separata) e vengono propagate ai sottoscrittori. Gli abbonati online riceveranno una notifica relativa alla ricerca e all'invio di una reazione mentre sono in diretta. Gli abbonati offline reagiranno alla ricerca quando saranno online.

Devi specificare una località di destinazione per la ricerca, che può essere una delle seguenti:

  • Inizio del backlog dei messaggi: riproduce tutti i messaggi conservati. Tieni presente che la quantità di backlog disponibile è determinata dal periodo di conservazione dei messaggi e dalla capacità di archiviazione dell'argomento Lite.
  • Fine del backlog dei messaggi: elimina definitivamente i messaggi saltando tutti i messaggi pubblicati attuali.
  • Timestamp pubblicazione: cerca il primo messaggio con un timestamp di pubblicazione (generato dal server) maggiore o uguale al timestamp specificato. Se questo messaggio non può essere trovato, cerca fino alla fine del backlog dei messaggi. Per i messaggi successivi è garantito un timestamp di pubblicazione maggiore o uguale al timestamp specificato, ad eccezione dei timestamp specificati che sono nel futuro.
  • Timestamp evento: vai al primo messaggio con un timestamp dell'evento (specificato dall'utente) maggiore o uguale al timestamp specificato. Se questo messaggio non può essere trovato, cerca fino alla fine del backlog dei messaggi. Poiché i timestamp degli eventi vengono forniti dall'utente, i messaggi successivi potrebbero avere timestamp degli eventi inferiori all'ora dell'evento specificata e devono essere filtrati dal client, se necessario. Se per i messaggi non è impostato un timestamp evento, i relativi timestamp di pubblicazione vengono utilizzati come riserva.

Puoi avviare una ricerca per una sottoscrizione Lite con Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per cercare una sottoscrizione Lite, utilizza il comando gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite

  • LITE_LOCATION: la località della sottoscrizione Lite

  • PUBLISH_TIME: il timestamp di pubblicazione da cercare

  • EVENT_TIME: il timestamp dell'evento in cui cercare

  • STARTING_OFFSET: beginning o end

Consulta gcloud topic datetimes per informazioni sui formati dell'ora.

Se specifichi il flag --async e la richiesta ha esito positivo, la riga di comando visualizza l'ID dell'operazione di ricerca:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Utilizza il comando gcloud pubsub lite-operations describe per ottenere lo stato dell'operazione.

REST

Per cercare un abbonamento Lite, invia una richiesta POST come la seguente:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova la sottoscrizione Lite

  • PROJECT_NUMBER: il numero del progetto con la sottoscrizione Lite

  • LITE_LOCATION: la località della sottoscrizione Lite

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite

Per andare all'inizio o alla fine del backlog dei messaggi, imposta i seguenti campi nel corpo della richiesta:

{
  "namedTarget": NAMED_TARGET
}

Sostituisci quanto segue:

  • NAMED_TARGET: TAIL per l'inizio o HEAD per la fine del backlog dei messaggi.

Per cercare un timestamp di pubblicazione, imposta i seguenti campi nel corpo della richiesta:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Specifica "eventTime" per cercare un timestamp di un evento.

Sostituisci quanto segue:

  • TIMESTAMP: un timestamp in formato UTC RFC 3339, con risoluzione in nanosecondi e fino a nove cifre frazionarie. Esempi: "2014-10-02T15:01:23Z" e "2014-10-02T15:01:23.045123456Z".

Se la richiesta ha esito positivo, la risposta è un'operazione a lunga esecuzione in formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Go Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione Java in Librerie client di Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client di Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Se la richiesta di ricerca ha esito positivo, la risposta è un ID operazione a lunga esecuzione. Consulta le informazioni sulla propagazione della ricerca di monitoraggio di seguito se hai bisogno di sapere quando gli abbonati hanno aggiunto una reazione alla ricerca.

Client supportati

Le operazioni di ricerca richiedono sottoscrittori che utilizzano le seguenti librerie client Pub/Sub Lite e versioni minime:

Le operazioni di ricerca non funzionano quando Pub/Sub Lite viene utilizzato con Apache Beam o Apache Spark perché questi sistemi eseguono il proprio monitoraggio degli offset all'interno delle partizioni. La soluzione alternativa consiste nello svuotare, cercare e riavviare i flussi di lavoro.

Il servizio Pub/Sub Lite è in grado di rilevare un client abbonato che non supporta le operazioni di ricerca (ad esempio una versione precedente della libreria client o un framework non supportato) e interromperà la ricerca con uno stato di errore FAILED_PRECONDITION.

Propagazione della ricerca di monitoraggio

Se viene restituito un ID operazione a lunga esecuzione per la richiesta di ricerca iniziale, significa che la ricerca è stata registrata correttamente nel servizio Pub/Sub Lite e alla fine verrà propagata ai sottoscrittori (se il client è supportato, come spiegato in precedenza). L'operazione monitora questa propagazione e completa dopo che gli abbonati hanno reagito alla ricerca, per tutte le partizioni.

Se gli abbonati sono online, potrebbero essere necessari fino a 30 secondi prima che ricevano la notifica di richiesta. Le notifiche di ricerca vengono inviate in modo indipendente per ogni partizione, quindi le partizioni potrebbero non reagire alla ricerca contemporaneamente. Se gli abbonati sono offline, l'operazione di ricerca verrà completata quando saranno online.

Se la propagazione di una chiamata di ricerca precedente ai sottoscrittori non è terminata, viene interrotta e sostituita dalla nuova operazione di ricerca. I metadati dell'operazione di ricerca scadono dopo 30 giorni, interrompendo di fatto qualsiasi operazione di ricerca incomplete.

Cerca stato operazione

Puoi ottenere lo stato di un'operazione di ricerca utilizzando Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per ottenere dettagli su un'operazione Lite, utilizza il comando gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Sostituisci quanto segue:

  • OPERATION_ID: l'ID dell'operazione Lite

  • LITE_LOCATION: la località dell'operazione Lite

Se la richiesta ha esito positivo, la riga di comando visualizza i metadati relativi all'operazione Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Per ottenere dettagli su un'operazione Lite, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova l'operazione Lite

  • PROJECT_NUMBER: il numero del progetto con l'operazione Lite

  • LITE_LOCATION: la località dell'operazione Lite

  • OPERATION_ID: l'ID dell'operazione Lite

Se la richiesta ha esito positivo, la risposta è un'operazione a lunga esecuzione in formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Operazioni di ricerca elenco

Le operazioni di ricerca completate e attive possono essere elencate utilizzando Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per elencare le operazioni Lite in un progetto, utilizza il comando gcloud pubsub lite-operations list:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Sostituisci quanto segue:

  • LITE_LOCATION: la località in cui si trovano le operazioni Lite

  • SUBSCRIPTION: operazioni di filtro per sottoscrizione Lite

  • DONE: true per includere solo le operazioni completate, false per includere solo le operazioni attive

  • LIMIT: un numero intero per limitare il numero di operazioni restituite

Se la richiesta ha esito positivo, la riga di comando visualizza un riepilogo delle operazioni Lite:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Per elencare le operazioni Lite in un progetto, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trovano le operazioni Lite

  • PROJECT_NUMBER: il numero del progetto con le operazioni Lite

  • LITE_LOCATION: la località in cui si trovano le operazioni Lite

Se la richiesta ha esito positivo, la risposta sarà un elenco di operazioni Lite in formato JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}