Como repetir e limpar mensagens do Pub/Sub Lite

Nesta página, você aprenderá a iniciar e rastrear as operações de busca de assinaturas do Lite.

O recurso de busca do Pub/Sub Lite permite reproduzir e limpar mensagens. Ele tem os mesmos casos de uso que a busca do Pub/Sub. Ao contrário do Pub/Sub, não é necessário configurar tópicos ou assinaturas do Lite para usar a busca, e não há custo extra.

É possível rastrear a propagação da busca entre os assinantes usando uma operação de longa duração. Trata-se de um padrão de API usado pelos produtos do Google Cloud para rastrear o progresso das tarefas de longa duração.

Como iniciar a busca

As operações de busca do Pub/Sub Lite são iniciadas fora de banda (ou seja, a Google Cloud CLI ou a API Pub/Sub Lite separada) e propagadas para inscritos. Os assinantes on-line serão notificados sobre a busca e poderão reagir enquanto estiverem ativos. A reação só é possível quando os assinantes estão on-line.

Você precisa especificar um local de destino para a busca, que pode ser um dos seguintes:

  • Início do backlog da mensagem: reproduz todas as mensagens retidas. A quantidade de backlog disponível é determinada pela capacidade de armazenamento e pelo período de retenção de mensagens do tópico do Lite.
  • Final do backlog da mensagem: ignora todas as mensagens publicadas atuais para fazer a limpeza.
  • Carimbo de data/hora da publicação: procura pela primeira mensagem com data/hora de publicação (valor gerado pelo servidor) maior ou igual à data/hora especificada. Se nenhuma mensagem desse tipo for localizada, a busca será feita no final do backlog da mensagem. É garantido que as mensagens posteriores tenham um carimbo de data/hora de publicação maior ou igual ao especificado, exceto os carimbos que estiverem no futuro.
  • Carimbo de data/hora do evento: busca a primeira mensagem com um carimbo de data/hora de evento (valor especificado pelo usuário) maior ou igual ao especificado. Se nenhuma mensagem desse tipo for localizada, a busca será feita no final do backlog da mensagem. Como o carimbo de data/hora do evento é fornecido pelo usuário, as mensagens posteriores podem ter um carimbo de data/hora de evento anterior ao horário especificado e precisam ser filtradas pelo cliente se necessário. Se as mensagens não tiverem um carimbo de data/hora de evento definido, será usado o de publicação como fallback.

É possível iniciar uma busca em uma assinatura do Lite com a CLI do Google Cloud ou a API Pub/Sub Lite.

gcloud

Para buscar uma assinatura do Lite, use o comando gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Substitua:

  • SUBSCRIPTION_ID: o ID da assinatura Lite

  • LITE_LOCATION: o local da assinatura do Lite.

  • PUBLISH_TIME: o carimbo de data/hora de publicação que será buscado;

  • EVENT_TIME: o carimbo de data/hora de evento que será buscado;

  • STARTING_OFFSET: beginning ou end.

Consulte gcloud topic datetimes para ver informações sobre os formatos de tempo.

Se você especificar a sinalização --async e a solicitação for bem-sucedida, o comando exibe o ID da operação de busca:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Use o comando gcloud pubsub lite-operations describe para receber o status da operação.

REST

Para buscar uma assinatura do Lite, envie uma solicitação POST como esta:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Substitua:

  • REGION: a região em que a assinatura do Lite está

  • PROJECT_NUMBER: o número do projeto com a assinatura do Lite

  • LITE_LOCATION: o local da assinatura do Lite.

  • SUBSCRIPTION_ID: o ID da assinatura Lite

Para buscar no início ou final do backlog da mensagem, defina os seguintes campos no corpo da solicitação:

{
  "namedTarget": NAMED_TARGET
}

Substitua:

  • NAMED_TARGET: TAIL para o início ou HEAD para o final do backlog da mensagem.

Para buscar um carimbo de data/hora de publicação, defina os seguintes campos no corpo da solicitação:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Especifique "eventTime" para buscar um carimbo de data/hora de evento.

Substitua:

  • TIMESTAMP: um carimbo de data/hora no formato UTC RFC 3339, com resolução em nanossegundos e até nove dígitos fracionários. Exemplos: "2014-10-02T15:01:23Z" e "2014-10-02T15:01:23.045123456Z".

Se a solicitação for bem-sucedida, a resposta será uma operação de longa duração no formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Antes de executar este exemplo, siga as instruções de configuração do Java nas bibliotecas de cliente do Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Antes de executar este exemplo, siga as instruções de configuração do Java nas bibliotecas de cliente do Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Se a solicitação de busca for bem-sucedida, a resposta será um ID de operação de longa duração. Veja abaixo as informações sobre como rastrear a propagação da busca se precisar saber quando foi a reação dos assinantes.

Clientes compatíveis

As operações de busca exigem assinantes que usem as seguintes bibliotecas do Pub/Sub Lite e versões mínimas:

As operações de busca não funcionam quando o Pub/Sub Lite é usado com o Apache Beam ou o Apache Spark porque esses sistemas realizam o próprio rastreamento de deslocamentos nas partições. A solução é drenar, buscar e reiniciar os fluxos de trabalho.

O serviço do Pub/Sub Lite consegue detectar um cliente do assinante que não é compatível com operações de busca, como uma versão antiga da biblioteca de cliente ou um framework não aceito. Depois, ele cancela a busca com um status de erro FAILED_PRECONDITION.

Como rastrear a propagação da busca

Se um ID de operação de longa duração for retornado na solicitação de busca inicial, isso significa que a busca foi registrada no serviço do Pub/Sub Lite. Ela será então propagada entre os assinantes se o cliente for compatível, como mostrado acima. A operação rastreia essa propagação e é concluída depois que os assinantes reagem à busca em todas as partições.

Se os assinantes estiverem on-line, eles poderão levar até 30 segundos para receber a notificação de busca. Essas notificações são enviadas para cada partição separadamente. Portanto, as partições podem não reagir à busca ao mesmo tempo. Se os assinantes estiverem off-line, a operação de busca será concluída quando eles ficarem on-line.

Se uma invocação de busca anterior ainda não tiver sido propagada entre os assinantes, ela será cancelada e substituída por uma nova operação de busca. Os metadados da operação de busca têm a validade de 30 dias, o que cancela todas as operações não concluídas.

Status da operação de busca

Para conferir o status de uma operação de busca, use a CLI do Google Cloud ou a API Pub/Sub Lite.

gcloud

Para ver detalhes sobre uma operação do Lite, use o comando gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Substitua:

  • OPERATION_ID: o ID da operação do Lite;

  • LITE_LOCATION: o local da operação do Lite

Se a solicitação for bem-sucedida, a linha de comando exibirá metadados sobre a operação do Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Para ver detalhes sobre as operações do Lite, envie uma solicitação GET como esta:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Substitua:

  • REGION: a região em que a operação do Lite está

  • PROJECT_NUMBER: o número do projeto com a operação do Lite;

  • LITE_LOCATION: o local da operação do Lite

  • OPERATION_ID: o ID da operação do Lite;

Se a solicitação for bem-sucedida, a resposta será uma operação de longa duração no formato JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Como listar operações de busca

É possível listar as operações de busca concluídas e ativas usando a CLI do Google Cloud ou a API Pub/Sub Lite.

gcloud

Para listar as operações do Lite em um projeto, use o comando gcloud pubsub lite-operations list:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Substitua:

  • LITE_LOCATION: o local em que as operações do Lite estão

  • SUBSCRIPTION: operações de filtragem da assinatura do Lite;

  • DONE: use true para incluir apenas operações concluídas e false para operações ativas;

  • LIMIT: um número inteiro para limitar a quantidade de operações retornadas.

Se a solicitação for bem-sucedida, a linha de comando exibirá um resumo das operações do Lite:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Para listar as operações do Lite em um projeto, envie uma solicitação GET como esta:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Substitua:

  • REGION: a região em que as operações do Lite estão

  • PROJECT_NUMBER: o número do projeto com as operações do Lite;

  • LITE_LOCATION: o local em que as operações do Lite estão;

Se a solicitação for bem-sucedida, a resposta será uma lista de operações do Lite no formato JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}