Riproduzione ed eliminazione definitiva dei messaggi Pub/Sub Lite

Questa pagina mostra come avviare e monitorare le operazioni di ricerca per gli abbonamenti Lite.

La funzionalità di ricerca di Pub/Sub Lite consente di ripetere ed eliminare definitivamente i messaggi. Ha gli stessi casi d'uso della ricerca Pub/Sub. A differenza di Pub/Sub, non è necessario configurare gli argomenti o le sottoscrizioni Lite per utilizzare la ricerca e non sono previsti costi aggiuntivi.

La propagazione della ricerca ai sottoscrittori può essere monitorata utilizzando un'operazione a lunga esecuzione. Si tratta di un pattern API utilizzato dai prodotti Google Cloud per monitorare l'avanzamento delle attività a lunga esecuzione.

Avvio della ricerca

Le operazioni di ricerca di Pub/Sub Lite vengono avviate fuori banda (ovvero da Google Cloud CLI o dall'API Pub/Sub Lite separata) e propagate ai sottoscrittori. Gli abbonati online riceveranno una notifica in merito alla ricerca e alla reazione durante la trasmissione dal vivo. Gli abbonati offline reagiranno alla ricerca una volta che saranno online.

Devi specificare una località di destinazione per la ricerca, che può essere una delle seguenti:

  • Inizio del backlog dei messaggi: riproduce tutti i messaggi conservati. Tieni presente che la quantità di backlog disponibile è determinata dal periodo di conservazione dei messaggi e dalla capacità di archiviazione dell'argomento Lite.
  • Fine del backlog dei messaggi: elimina definitivamente i messaggi saltando tutti i messaggi pubblicati attuali.
  • Timestamp pubblicazione: cerca il primo messaggio con un timestamp di pubblicazione (generato dal server) maggiore o uguale al timestamp specificato. Se il messaggio non è disponibile, si va alla fine del backlog dei messaggi. Per i messaggi successivi è garantito che avranno un timestamp di pubblicazione maggiore o uguale al timestamp specificato, ad eccezione dei timestamp specificati futuri.
  • Timestamp evento: cerca il primo messaggio con un timestamp evento (specificato dall'utente) maggiore o uguale al timestamp specificato. Se questo messaggio non è presente, si va alla fine del backlog dei messaggi. Poiché i timestamp degli eventi vengono forniti dall'utente, i messaggi successivi potrebbero avere timestamp degli eventi inferiori all'ora dell'evento specificata e devono essere filtrati dal client, se necessario. Se per i messaggi non è impostato un timestamp evento, i relativi timestamp di pubblicazione vengono utilizzati come fallback.

Puoi avviare una ricerca di un abbonamento Lite con Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per cercare una sottoscrizione Lite, utilizza il comando gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite

  • LITE_LOCATION: la località della sottoscrizione Lite

  • PUBLISH_TIME: il timestamp di pubblicazione per la ricerca

  • EVENT_TIME: il timestamp dell'evento su cui eseguire la ricerca

  • STARTING_OFFSET: beginning o end

Per informazioni sui formati dell'ora, consulta gcloud topic datetimes.

Se specifichi il flag --async e la richiesta ha esito positivo, la riga di comando visualizza l'ID dell'operazione di ricerca:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Usa il comando gcloud pubsub lite-operations describe per ottenere lo stato dell'operazione.

REST

Per cercare una sottoscrizione Lite, invia una richiesta POST come la seguente:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova la sottoscrizione Lite

  • PROJECT_NUMBER: il numero del progetto con la sottoscrizione Lite

  • LITE_LOCATION: la località della sottoscrizione Lite

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite

Per andare all'inizio o alla fine del backlog dei messaggi, imposta i seguenti campi nel corpo della richiesta:

{
  "namedTarget": NAMED_TARGET
}

Sostituisci quanto segue:

  • NAMED_TARGET: TAIL per l'inizio o HEAD per la fine del backlog dei messaggi.

Per eseguire la ricerca in base a un timestamp di pubblicazione, imposta i seguenti campi nel corpo della richiesta:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Specifica "eventTime" per eseguire la ricerca fino a un timestamp dell'evento.

Sostituisci quanto segue:

  • TIMESTAMP: un timestamp nel formato UTC RFC 3339, con risoluzione in nanosecondi e fino a nove cifre frazionarie. Esempi: "2014-10-02T15:01:23Z" e "2014-10-02T15:01:23.045123456Z".

Se la richiesta ha esito positivo, la risposta è un'operazione a lunga esecuzione in formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Guida rapida: utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java nelle librerie client di Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python nelle librerie client di Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Se la richiesta di ricerca ha esito positivo, la risposta è un ID operazione a lunga esecuzione. Consulta le informazioni sul monitoraggio della propagazione della ricerca di seguito se hai bisogno di sapere quando i sottoscrittori hanno reagito alla ricerca.

Client supportati

Le operazioni di ricerca richiedono ai sottoscrittori che utilizzano le seguenti librerie client Pub/Sub Lite e versioni minime:

Le operazioni di ricerca non funzionano quando Pub/Sub Lite viene utilizzato con Apache Beam o Apache Spark, perché questi sistemi eseguono il monitoraggio degli offset all'interno delle partizioni. La soluzione alternativa è svuotare, cercare e riavviare i flussi di lavoro.

Il servizio Pub/Sub Lite è in grado di rilevare un client sottoscrittore che non supporta le operazioni di ricerca (ad esempio, una vecchia versione della libreria client o un framework non supportato) e interrompe la ricerca con uno stato di errore FAILED_PRECONDITION.

Monitoraggio della propagazione della ricerca

Se viene restituito un ID operazione a lunga esecuzione per la richiesta di ricerca iniziale, significa che la ricerca è stata registrata correttamente nel servizio Pub/Sub Lite e alla fine verrà propagata ai sottoscrittori (se il client è supportato, come sopra). L'operazione monitora questa propagazione e viene completata una volta che i sottoscrittori hanno reagito alla ricerca, per tutte le partizioni.

Se gli abbonati sono online, potrebbero essere necessari fino a 30 secondi prima che ricevano la notifica di ricerca. Le notifiche di ricerca vengono inviate in modo indipendente per ogni partizione, perciò le partizioni potrebbero non reagire alla ricerca nello stesso istante. Se gli abbonati sono offline, l'operazione di ricerca verrà completata una volta online.

Se una chiamata di ricerca precedente non ha terminato la propagazione ai sottoscrittori, viene interrotta e sostituita dalla nuova operazione di ricerca. I metadati dell'operazione di ricerca scadono dopo 30 giorni, interrompendo di fatto tutte le operazioni di ricerca incomplete.

Cerca stato dell'operazione

Puoi ottenere lo stato di un'operazione di ricerca utilizzando Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per ottenere i dettagli su un'operazione Lite, utilizza il comando gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Sostituisci quanto segue:

  • OPERATION_ID: l'ID dell'operazione Lite

  • LITE_LOCATION: la posizione dell'operazione Lite

Se la richiesta ha esito positivo, la riga di comando visualizza i metadati sull'operazione Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Per ottenere i dettagli di un'operazione Lite, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova l'operazione Lite

  • PROJECT_NUMBER: il numero del progetto con l'operazione Lite

  • LITE_LOCATION: la posizione dell'operazione Lite

  • OPERATION_ID: l'ID dell'operazione Lite

Se la richiesta ha esito positivo, la risposta è un'operazione a lunga esecuzione in formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Operazioni di ricerca elenco

Le operazioni di ricerca completate e attive possono essere elencate utilizzando Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per elencare le operazioni Lite in un progetto, utilizza il comando gcloud pubsub lite-operations list:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Sostituisci quanto segue:

  • LITE_LOCATION: la località in cui si trovano le operazioni Lite

  • SUBSCRIPTION: filtra le operazioni per sottoscrizione Lite

  • DONE: true per includere solo le operazioni complete, false per includere solo le operazioni attive

  • LIMIT: un numero intero per limitare il numero di operazioni restituite

Se la richiesta ha esito positivo, la riga di comando visualizza un riepilogo delle operazioni Lite:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Per elencare le operazioni Lite in un progetto, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trovano le operazioni Lite

  • PROJECT_NUMBER: il numero del progetto con le operazioni Lite

  • LITE_LOCATION: la località in cui si trovano le operazioni Lite

Se la richiesta ha esito positivo, la risposta è un elenco di operazioni Lite in formato JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}