Riproduzione ed eliminazione definitiva dei messaggi Pub/Sub Lite

Questa pagina mostra come avviare e monitorare le operazioni di ricerca per Abbonamenti Lite.

La funzionalità di ricerca di Pub/Sub Lite consente di ripetere ed eliminare definitivamente i messaggi. Ha gli stessi casi d'uso di ricerca Pub/Sub. A differenza di Pub/Sub, non è necessario configurare argomenti o sottoscrizioni Lite per utilizzare la ricerca senza dover configurare ad accesso meno frequente per ridurre i costi di archiviazione.

La propagazione della ricerca ai sottoscrittori può essere monitorata utilizzando un operazione a lunga esecuzione. Si tratta di un pattern API utilizzato dai prodotti Google Cloud per monitorare l'avanzamento per attività di lunga durata.

Avvio della ricerca

Le operazioni di ricerca di Pub/Sub Lite vengono avviate fuori banda (ovvero Google Cloud CLI o l'API Pub/Sub Lite separata) e propagati iscritti. Gli abbonati online saranno avvisati della ricerca e della reazione mentre sono in diretta. Gli abbonati offline reagire alla ricerca quando sono online.

Devi specificare una località di destinazione per la ricerca, che può essere una delle seguenti:

  • Inizio del backlog dei messaggi: riproduce tutti i messaggi conservati. Tieni presente che l'importo del backlog disponibile è determinato il periodo di conservazione dei messaggi di argomento Lite e e capacità di archiviazione.
  • Fine del backlog dei messaggi: elimina definitivamente i messaggi saltando tutti quelli attuali dei messaggi pubblicati.
  • Pubblica timestamp: rimanda al primo messaggio con un valore (generato dal server) timestamp di pubblicazione maggiore o uguale al timestamp specificato. In caso contrario, il messaggio può essere localizzato, si estende fino alla fine del backlog dei messaggi. Successiva i messaggi hanno la garanzia di un timestamp di pubblicazione maggiore o uguale a il timestamp specificato, ad eccezione dei timestamp specificati che sono in futuro.
  • Timestamp evento: rimanda al primo messaggio con un evento (specificato dall'utente). maggiore o uguale al timestamp specificato. Se il messaggio non è presente alla fine del backlog dei messaggi. Come timestamp degli eventi forniti dall'utente, i messaggi successivi potrebbero avere timestamp evento inferiori a all'ora specificata e devono essere filtrati dal cliente, se necessario. Se per i messaggi non è impostato un timestamp evento, pertanto i relativi timestamp di pubblicazione vengono utilizzati come alternativa.

Puoi avviare una ricerca di un abbonamento Lite con Google Cloud CLI o l'API Pub/Sub Lite.

gcloud

Per cercare una sottoscrizione Lite, utilizza gcloud pubsub lite-subscriptions seek :

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite

  • LITE_LOCATION: la località della sottoscrizione Lite

  • PUBLISH_TIME: il timestamp di pubblicazione per la ricerca

  • EVENT_TIME: il timestamp dell'evento su cui eseguire la ricerca

  • STARTING_OFFSET: beginning o end

Vedi gcloud topic datetimes per informazioni sui formati dell'ora.

Se specifichi il flag --async e la richiesta ha esito positivo, il comando visualizza l'ID dell'operazione di ricerca:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Utilizza la gcloud pubsub lite-operations describe per ottenere lo stato dell'operazione.

REST

Per cercare una sottoscrizione Lite, invia una richiesta POST come la seguente:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova la sottoscrizione Lite

  • PROJECT_NUMBER: il numero del progetto con la versione Lite abbonamento

  • LITE_LOCATION: la località della sottoscrizione Lite

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite

Per andare all'inizio o alla fine del backlog dei messaggi, imposta quanto segue nel corpo della richiesta:

{
  "namedTarget": NAMED_TARGET
}

Sostituisci quanto segue:

  • NAMED_TARGET: TAIL per l'inizio o HEAD per la fine di il backlog dei messaggi.

Per eseguire la ricerca in base a un timestamp di pubblicazione, imposta i seguenti campi nel corpo della richiesta:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Specifica "eventTime" per eseguire la ricerca fino a un timestamp dell'evento.

Sostituisci quanto segue:

  • TIMESTAMP: un timestamp nel formato UTC RFC 3339, espresso in nanosecondi risoluzione e fino a nove cifre frazionarie. Esempi: "2014-10-02T15:01:23Z" e "2014-10-02T15:01:23.045123456Z".

Se la richiesta ha esito positivo, la risposta è un'operazione a lunga esecuzione in JSON formato:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Java in Librerie client Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Se la richiesta di ricerca ha esito positivo, la risposta è un ID operazione a lunga esecuzione. Leggi le informazioni sul monitoraggio della propagazione della ricerca qui sotto se vuoi sapere quando gli iscritti hanno reagito alla ricerca.

Client supportati

Le operazioni di ricerca richiedono sottoscrittori che utilizzano quanto segue Librerie client di Pub/Sub Lite e versioni minime:

Le operazioni di ricerca non funzionano quando si utilizza Pub/Sub Lite Apache Beam o Apache Spark perché questi che eseguono il proprio tracciamento degli offset all'interno delle partizioni. La soluzione alternativa consiste nello svuotare, cercare e riavviare i flussi di lavoro.

Il servizio Pub/Sub Lite è in grado di rilevare il client di un sottoscrittore che non supporta le operazioni di ricerca (ad esempio, una versione precedente della libreria client o framework non supportato) e interromperà la ricerca con un FAILED_PRECONDITION .

Monitoraggio della propagazione della ricerca

Se viene restituito un ID operazione a lunga esecuzione per la richiesta di ricerca iniziale, indica che la ricerca è stata registrata correttamente nel servizio Pub/Sub Lite e alla fine si propagherà ai sottoscrittori (se il client è supportato, sopra). L'operazione monitora questa propagazione e viene completata una volta sottoscrittori hanno reagito alla ricerca, per tutte le partizioni.

Se gli abbonati sono online, potrebbero trascorrere fino a 30 secondi prima che ricevano il cerca la notifica. Le notifiche di ricerca vengono inviate in modo indipendente per ogni partizione pertanto le partizioni non possono reagire alla ricerca nello stesso istante. Se gli abbonati sono offline, l'operazione di ricerca verrà completata non appena saranno online.

Se una chiamata di ricerca precedente non ha terminato la propagazione ai sottoscrittori, è stata interrotta e sostituita dalla nuova operazione di ricerca. Cerca metadati operazione scade dopo 30 giorni, interrompendo di fatto eventuali operazioni di ricerca incomplete.

Cerca stato dell'operazione

Puoi ottenere lo stato di un'operazione di ricerca utilizzando Google Cloud CLI oppure l'API Pub/Sub Lite.

gcloud

Per ottenere i dettagli di un'operazione Lite, utilizza gcloud pubsub lite-operations describe :

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Sostituisci quanto segue:

  • OPERATION_ID: l'ID dell'operazione Lite

  • LITE_LOCATION: la posizione dell'operazione Lite

Se la richiesta ha esito positivo, la riga di comando visualizza i metadati relativi Operazione Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Per ottenere i dettagli di un'operazione Lite, invia una richiesta GET come il seguenti:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trova l'operazione Lite

  • PROJECT_NUMBER: il numero del progetto con la versione Lite operazione

  • LITE_LOCATION: la posizione dell'operazione Lite

  • OPERATION_ID: l'ID dell'operazione Lite

Se la richiesta ha esito positivo, la risposta è un'operazione a lunga esecuzione in JSON formato:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Operazioni di ricerca elenco

Le operazioni di ricerca completate e attive possono essere elencate utilizzando Google Cloud CLI oppure l'API Pub/Sub Lite.

gcloud

Per elencare le operazioni Lite in un progetto, utilizza la classe gcloud pubsub lite-operations list :

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Sostituisci quanto segue:

  • LITE_LOCATION: la località in cui si trovano le operazioni Lite

  • SUBSCRIPTION: filtra le operazioni per sottoscrizione Lite

  • DONE: true per includere solo le operazioni completate, false per includi solo le operazioni attive

  • LIMIT: un numero intero per limitare il numero di operazioni restituite

Se la richiesta ha esito positivo, la riga di comando visualizza un riepilogo operazioni:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Per elencare le operazioni Lite in un progetto, invia una richiesta GET come la seguente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • REGION: la regione in cui si trovano le operazioni Lite

  • PROJECT_NUMBER: il numero del progetto con la versione Lite suite operativa

  • LITE_LOCATION: la località in cui si trovano le operazioni Lite

Se la richiesta ha esito positivo, la risposta è un elenco di operazioni Lite in Formato JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}