Vuelve a reproducir y borra definitivamente mensajes de Pub/Sub Lite

En esta página, se muestra cómo iniciar operaciones de búsqueda y hacerles un seguimiento para Suscripciones Lite.

La función de búsqueda de Pub/Sub Lite te permite volver a reproducir y borrar definitivamente mensajes. Tiene los mismos casos de uso que búsqueda de Pub/Sub. A diferencia de Pub/Sub, no necesitas configurar temas o suscripciones Lite para usar la búsqueda y no hay el costo.

Se puede hacer un seguimiento de la propagación de la búsqueda a los suscriptores con un operación de larga duración. Este es un patrón de API que usan los productos de Google Cloud para seguir el progreso de tareas de larga duración.

Iniciando la búsqueda

Las operaciones de búsqueda de Pub/Sub Lite se inician fuera de banda (es decir, desde (Google Cloud CLI o una API Lite independiente de Pub/Sub) y se propagan a suscriptores. Los suscriptores en línea serán notificaciones del salto y de la reacción mientras están en vivo. Los suscriptores sin conexión harán lo siguiente reaccionan a la solicitud una vez que están en línea.

Debes especificar una ubicación de destino para el salto, que puede ser una de las lo siguiente:

  • Inicio de la lista de mensajes pendientes: repite todos los mensajes retenidos. Ten en cuenta que la cantidad de trabajo pendiente disponible está determinada por el El período de retención de mensajes del tema Lite y de almacenamiento de Google.
  • Fin de las tareas pendientes del mensaje: Borra definitivamente los mensajes omitiendo todos los mensajes actuales. mensajes publicados.
  • Marca de tiempo de publicación: Busca el primer mensaje con un (generado por el servidor). una marca de tiempo de publicación mayor o igual que la marca de tiempo especificada. Si no existe el mensaje, busca hasta el final de las tareas pendientes de mensajes. Posterior los mensajes tengan una marca de tiempo de publicación mayor o igual que la marca de tiempo especificada, con la excepción de las marcas de tiempo especificadas que se en el futuro.
  • Marca de tiempo del evento: Busca el primer mensaje con un evento (especificado por el usuario). La marca de tiempo es mayor o igual que la marca de tiempo especificada. Si el mensaje no existe busca hasta el final de las tareas pendientes de mensajes. Como marcas de tiempo de eventos son proporcionadas por el usuario, es posible que los mensajes posteriores tengan marcas de tiempo de eventos menores que el la hora del evento especificada y el cliente debe filtrarla, de ser necesario. Si mensajes no tienen establecida una marca de tiempo de evento, se usan sus marcas de tiempo de publicación como resguardo.

Puedes iniciar la búsqueda de una suscripción Lite con Google Cloud CLI o API de Pub/Sub Lite.

gcloud

Para encontrar una suscripción Lite, usa el comando gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Reemplaza lo siguiente:

  • SUBSCRIPTION_ID: Es el ID de la suscripción Lite.

  • LITE_LOCATION: Es la ubicación de la suscripción a Lite.

  • PUBLISH_TIME: Es la marca de tiempo de publicación que se buscará.

  • EVENT_TIME: Es la marca de tiempo del evento que se debe buscar.

  • STARTING_OFFSET: beginning o end

Consulta gcloud topic datetimes para información sobre los formatos de hora.

Si especificas la marca --async y la solicitud se realiza de forma correcta, el comando muestra el ID de la operación de búsqueda:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Usa el comando gcloud pubsub lite-operations describe para obtener el estado operativo.

REST

Para buscar una suscripción Lite, envía una solicitud POST como la siguiente:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Reemplaza lo siguiente:

  • REGION: Es la región en la que se encuentra la suscripción Lite.

  • PROJECT_NUMBER: Es el número de proyecto con la suscripción Lite.

  • LITE_LOCATION: Es la ubicación de la suscripción a Lite.

  • SUBSCRIPTION_ID: Es el ID de la suscripción Lite.

Para buscar el principio o el final de las tareas pendientes de mensajes, establece los siguientes parámetros en el cuerpo de la solicitud:

{
  "namedTarget": NAMED_TARGET
}

Reemplaza lo siguiente:

  • NAMED_TARGET: TAIL para el principio o HEAD para el final de la lista de mensajes pendientes.

Para buscar una marca de tiempo de publicación, configura los siguientes campos en el cuerpo de la solicitud:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Especifica "eventTime" para buscar una marca de tiempo de evento.

Reemplaza lo siguiente:

  • TIMESTAMP: Una marca de tiempo en formato RFC 3339 UTC, con nanosegundos resolución y hasta nueve dígitos fraccionarios. Ejemplos: "2014-10-02T15:01:23Z" y "2014-10-02T15:01:23.045123456Z".

Si la solicitud se realiza correctamente, la respuesta es una operación de larga duración en JSON formato:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Comienza a usarlo

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Python en las bibliotecas cliente de Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Si la solicitud de búsqueda se realiza correctamente, la respuesta es un ID de operación de larga duración. Obtener información sobre la propagación de la búsqueda de seguimiento a continuación si necesitas saber cuándo reaccionaron los suscriptores a la búsqueda.

Clientes admitidos

Las operaciones de búsqueda requieren suscriptores que utilicen lo siguiente: Bibliotecas cliente de Pub/Sub Lite y versiones mínimas:

Las operaciones de búsqueda no funcionan cuando se usa Pub/Sub Lite con Apache Beam o Apache Spark porque estos los sistemas realizan su propio seguimiento de las compensaciones dentro de las particiones. La solución alternativa es desviar, buscar y reiniciar los flujos de trabajo.

El servicio de Pub/Sub Lite puede detectar un cliente suscriptor que no admita operaciones de búsqueda (por ejemplo, una versión anterior de biblioteca cliente o framework no compatible) y anulará la búsqueda con un estado de error FAILED_PRECONDITION.

Seguimiento de la propagación de búsqueda

Si se devuelve un ID de operación de larga duración para la solicitud de búsqueda inicial, este Significa que la búsqueda se registró correctamente en el servicio de Pub/Sub Lite. y, con el tiempo, se propagarán a los suscriptores (si el cliente es compatible, arriba). La operación hace un seguimiento de esta propagación y se completa una vez que los suscriptores reaccionaron a la búsqueda, para todas las particiones.

Si los suscriptores están en línea, pueden tardar hasta 30 segundos en recibir la notificación de salto. Las notificaciones de búsqueda se envían independientemente para cada partición por lo que es posible que las particiones no reaccionen a la búsqueda en el mismo instante. Si los suscriptores están sin conexión, la operación de búsqueda se completará cuando estén en línea.

Si una invocación de búsqueda anterior aún no termina de propagarse a los suscriptores, es anulada y sustituida por la nueva operación de búsqueda. Buscar metadatos de operaciones vence luego de 30 días, lo que anula efectivamente cualquier operación de búsqueda incompleta.

Buscar el estado de la operación

Puedes obtener el estado de una operación de búsqueda con Google Cloud CLI API de Pub/Sub Lite.

gcloud

Para obtener detalles sobre una operación de Lite, usa el comando gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Reemplaza lo siguiente:

  • OPERATION_ID: Es el ID de la operación de Lite.

  • LITE_LOCATION: Es la ubicación de la operación de Lite.

Si la solicitud se realiza correctamente, la línea de comandos mostrará los metadatos sobre el Operación Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Para obtener detalles sobre operaciones de Lite, envía una solicitud GET como la siguiente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Reemplaza lo siguiente:

  • REGION: Es la región en la que se encuentra la operación Lite.

  • PROJECT_NUMBER: el número del proyecto con la operación de Lite

  • LITE_LOCATION: Es la ubicación de la operación de Lite.

  • OPERATION_ID: Es el ID de la operación de Lite.

Si la solicitud se realiza correctamente, la respuesta es una operación de larga duración en JSON formato:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Cómo enumerar operaciones de búsqueda

Las operaciones de búsqueda completadas y activas pueden mostrarse en una lista con Google Cloud CLI. la API de Pub/Sub Lite.

gcloud

Para enumerar las operaciones Lite de un proyecto, usa gcloud pubsub lite-operations list :

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Reemplaza lo siguiente:

  • LITE_LOCATION: Es la ubicación en la que se encuentran las operaciones de Lite.

  • SUBSCRIPTION: Filtra operaciones por suscripción Lite.

  • DONE: Es true para incluir solo las operaciones completas, de false a incluir solo operaciones activas

  • LIMIT: Es un número entero para limitar la cantidad de operaciones que se muestran.

Si la solicitud se realiza correctamente, la línea de comandos mostrará un resumen del operaciones:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Para enumerar las operaciones Lite de un proyecto, envía una solicitud GET como la siguiente:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Reemplaza lo siguiente:

  • REGION: Es la región en la que se encuentran las operaciones de Lite.

  • PROJECT_NUMBER: el número del proyecto con las operaciones de Lite

  • LITE_LOCATION: Es la ubicación en la que se encuentran las operaciones de Lite.

Si la solicitud se realiza correctamente, la respuesta es una lista de operaciones de Lite en formato JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}