Rouvrir et supprimer définitivement des messages Pub/Sub Lite

Cette page explique comment lancer et suivre les opérations de recherche pour les abonnements Lite.

La fonctionnalité de recherche Pub/Sub Lite vous permet de rouvrir et de supprimer définitivement des messages. Ses cas d'utilisation sont identiques à ceux de la recherche Pub/Sub. Contrairement à Pub/Sub, vous n'avez pas besoin de configurer de sujets ou d'abonnements Lite pour utiliser la fonctionnalité de recherche, et celle-ci n'entraîne aucun coût supplémentaire.

La propagation de la recherche aux abonnés peut être suivie à l'aide d'une opération de longue durée. Il s'agit d'un modèle d'API utilisé par les produits Google Cloud pour suivre la progression des tâches de longue durée.

Lancer une recherche

Les opérations de recherche Pub/Sub Lite sont lancées hors bande (c'est-à-dire à partir de la Google Cloud CLI ou de l'API Pub/Sub Lite distincte) et propagées aux abonnés. Les abonnés en ligne sont informés de la recherche et peuvent y réagir en direct. Les abonnés hors connexion réagissent à la recherche une fois qu'ils sont en ligne.

Vous devez spécifier un emplacement cible pour la recherche, lequel peut être l'un des éléments suivants :

  • Début du traitement des messages en attente : rouvre tous les messages conservés. Notez que la quantité de messages en attente disponibles est déterminée par la période de conservation des messages et la capacité de stockage du sujet Lite.
  • Fin des messages en attente : supprime définitivement les messages en ignorant tous les messages publiés.
  • Horodatage de publication : lance la recherche jusqu'au premier message présentant un horodatage de publication (généré par le serveur) ultérieur ou égal à l'horodatage spécifié. Si aucun message de ce type ne peut être localisé, la recherche est lancée jusqu'à la fin des messages en attente. Les messages suivants sont assurés d'avoir un horodatage de publication ultérieur ou égal à l'horodatage spécifié, sauf lorsque les horodatages spécifiés se situent dans le futur.
  • Horodatage de l'événement : lance la recherche jusqu'au premier message présentant un horodatage (spécifié par l'utilisateur) ultérieur ou égal à l'horodatage spécifié. Si aucun message de ce type ne peut être localisé, la recherche est lancée jusqu'à la fin des messages en attente. Étant donné que les horodatages des événements sont fournis par l'utilisateur, les messages suivants peuvent présenter des horodatages antérieurs à l'heure de l'événement spécifiée et doivent être filtrés par le client, si nécessaire. Si aucun horodatage d'événement n'est défini pour les messages, leurs horodatages de publication sont utilisés comme solution de remplacement.

Vous pouvez rechercher un abonnement Lite à l'aide de la Google Cloud CLI ou de la l'API Pub/Sub Lite ;

gcloud

Pour rechercher un abonnement Lite, exécutez la commande gcloud pubsub lite-subscriptions seek :

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID de l'abonnement Lite

  • LITE_LOCATION : emplacement de l'abonnement Lite

  • PUBLISH_TIME : horodatage de publication à rechercher

  • EVENT_TIME : horodatage d'événement à rechercher.

  • STARTING_OFFSET : beginning ou end

Pour en savoir plus sur les formats de date et d'heure, consultez la page gcloud topic datetimes.

Si vous spécifiez l'option --async et que la requête aboutit, la commande affiche l'ID de l'opération de recherche:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Exécutez la commande gcloud pubsub lite-operations describe pour obtenir l'ID de l'opération.

REST

Pour rechercher un abonnement Lite, envoyez une requête POST comme suit :

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • REGION: région dans laquelle se trouve l'abonnement Lite

  • PROJECT_NUMBER: numéro du projet associé à l'abonnement Lite

  • LITE_LOCATION : emplacement de l'abonnement Lite

  • SUBSCRIPTION_ID : ID de l'abonnement Lite

Pour rechercher le début ou la fin des messages en attente, définissez les champs suivants dans le corps de la requête :

{
  "namedTarget": NAMED_TARGET
}

Remplacez les éléments suivants :

  • NAMED_TARGET : TAIL pour le début ou HEAD pour la fin des messages en attente.

Pour rechercher un horodatage de publication, définissez les champs suivants dans le corps de la requête :

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Spécifiez "eventTime" pour rechercher un horodatage d'événement.

Remplacez les éléments suivants :

  • TIMESTAMP : horodatage au format RFC 3339 UTC, avec une précision de l'ordre de la nanoseconde et jusqu'à neuf chiffres décimaux. Exemples : "2014-10-02T15:01:23Z" et "2014-10-02T15:01:23.045123456Z".

Si la requête aboutit, la réponse est une opération de longue durée au format JSON :

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java dans la section Bibliothèques clientes de Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Avant d'exécuter cet exemple, suivez les instructions de configuration de Python dans la section Bibliothèques clientes de Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Si la requête de recherche aboutit, la réponse est un ID d'opération de longue durée. Consultez les informations ci-après sur le suivi de la propagation de la recherche si vous devez connaître le moment auquel les abonnés ont réagi à la recherche.

Clients compatibles

Les opérations de recherche nécessitent des abonnés qui utilisent les bibliothèques clientes Pub/Sub Lite suivantes et les versions minimales :

Les opérations de recherche ne fonctionnent pas lorsque Pub/Sub Lite est utilisé avec Apache Beam ou Apache Spark, car ces systèmes effectuent leur propre suivi des décalages au sein des partitions. La solution consiste à drainer, rechercher et redémarrer les workflows.

Le service Pub/Sub Lite est capable de détecter un client abonné qui n'est pas compatible avec les opérations de recherche (par exemple, une ancienne version de bibliothèque cliente ou un framework non compatible) et annule la recherche avec un état d'erreur FAILED_PRECONDITION.

Suivre la propagation de la recherche

Si un ID d'opération de longue durée est renvoyé pour la requête de recherche initiale, cela signifie que la recherche a bien été enregistrée dans le service Pub/Sub Lite et qu'elle finira par se propager aux abonnés (si le client est compatible, comme ci-dessus). L'opération suit cette propagation et se termine une fois que les abonnés ont réagi à la recherche, pour toutes les partitions.

Si les abonnés sont en ligne, la réception de la notification de recherche peut prendre jusqu'à 30 secondes. Les notifications de recherche sont envoyées indépendamment pour chaque partition. Par conséquent, les partitions peuvent ne pas réagir à la recherche au même moment. Si les abonnés sont hors connexion, l'opération de recherche se termine une fois qu'ils sont en ligne.

Si la propagation d'un appel de recherche précédent auprès des abonnés n'est pas terminée, cet appel est annulé et remplacé par la nouvelle opération de recherche. Les métadonnées de l'opération de recherche expirent au bout de 30 jours, ce qui entraîne l'abandon effectif des opérations de recherche incomplètes.

État de l'opération de recherche

Vous pouvez obtenir l'état d'une opération de recherche à l'aide de la Google Cloud CLI ou de l'API Pub/Sub Lite.

gcloud

Pour obtenir des détails sur une opération Lite, utilisez la commande gcloud pubsub lite-operations describe :

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Remplacez les éléments suivants :

  • OPERATION_ID : ID de l'opération Lite

  • LITE_LOCATION: emplacement de l'opération Lite

Si la requête aboutit, la ligne de commande affiche des métadonnées sur l'opération Lite :

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Pour obtenir des détails sur une opération Lite, envoyez une requête GET comme suit :

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • REGION: région dans laquelle se trouve l'opération Lite

  • PROJECT_NUMBER : numéro du projet contenant l'opération Lite

  • LITE_LOCATION: emplacement de l'opération Lite

  • OPERATION_ID : ID de l'opération Lite

Si la requête aboutit, la réponse est une opération de longue durée au format JSON :

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Lister les opérations de recherche

Vous pouvez répertorier les opérations de recherche terminées et actives à l'aide de la Google Cloud CLI. l'API Pub/Sub Lite.

gcloud

Pour répertorier les opérations Lite d'un projet, exécutez la commande gcloud pubsub lite-operations list :

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Remplacez les éléments suivants :

  • LITE_LOCATION: emplacement dans lequel se trouvent les opérations Lite

  • SUBSCRIPTION : filtrer les opérations par abonnement Lite

  • DONE : true pour n'inclure que les opérations complètes et false pour n'inclure que les opérations actives.

  • LIMIT : entier permettant de limiter le nombre d'opérations renvoyées.

Si la requête aboutit, la ligne de commande affiche un résumé des opérations Lite :

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Pour répertorier les opérations Lite dans un projet, envoyez une requête GET comme suit :

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • REGION: région dans laquelle se trouvent les opérations Lite

  • PROJECT_NUMBER : numéro du projet contenant les opérations Lite

  • LITE_LOCATION: emplacement dans lequel se trouvent les opérations Lite

Si la requête aboutit, la réponse est une liste d'opérations Lite au format JSON :

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}