Memutar ulang dan menghapus permanen pesan Pub/Sub Lite

Halaman ini menunjukkan cara memulai dan melacak operasi pencarian untuk langganan Lite.

Fitur pencarian Pub/Sub Lite memungkinkan Anda memutar ulang dan menghapus pesan. Fungsi ini memiliki kasus penggunaan yang sama dengan penelusuran Pub/Sub. Tidak seperti Pub/Sub, Anda tidak perlu mengonfigurasi topik atau langganan Lite untuk menggunakan pencarian dan tidak ada biaya tambahan.

Penyebaran pencarian ke pelanggan dapat dilacak menggunakan operasi yang berjalan lama. Ini adalah pola API yang digunakan oleh produk Google Cloud untuk melacak progres tugas yang berjalan lama.

Memulai pencarian

Operasi penelusuran Pub/Sub Lite dimulai di luar band (yaitu, dari Google Cloud CLI atau Pub/Sub Lite API terpisah) dan disebarkan ke pelanggan. Subscriber online akan diberi tahu tentang pencarian dan reaksi saat live. Pelanggan offline akan bereaksi terhadap pencarian setelah mereka online.

Anda harus menentukan lokasi target untuk pencarian, yang dapat berupa salah satu hal berikut:

  • Awal antrean pesan: Memutar ulang semua pesan yang dipertahankan. Perhatikan bahwa jumlah backlog yang tersedia ditentukan oleh periode retensi pesan dan kapasitas penyimpanan topik Lite.
  • End of message backlog: Menghapus pesan dengan melewati semua pesan yang saat ini dipublikasikan.
  • Stempel waktu publikasi: Mencari pesan pertama dengan stempel waktu publikasi (yang dibuat server) yang lebih besar dari atau sama dengan stempel waktu yang ditentukan. Jika pesan tersebut tidak dapat ditemukan, cari ke akhir backlog pesan. Pesan berikutnya dijamin memiliki stempel waktu publikasi yang lebih besar dari atau sama dengan stempel waktu yang ditentukan, dengan pengecualian stempel waktu yang ditentukan yang akan datang.
  • Stempel waktu peristiwa: Mencari pesan pertama dengan stempel waktu peristiwa (yang ditentukan pengguna) yang lebih besar dari atau sama dengan stempel waktu yang ditentukan. Jika pesan tersebut tidak dapat ditemukan, cari ke akhir backlog pesan. Karena stempel waktu peristiwa disediakan oleh pengguna, pesan berikutnya mungkin memiliki stempel waktu peristiwa yang lebih rendah dari waktu peristiwa yang ditentukan dan harus difilter oleh klien, jika perlu. Jika pesan tidak memiliki stempel waktu peristiwa yang ditetapkan, stempel waktu publikasinya akan digunakan sebagai penggantian.

Anda dapat memulai penelusuran untuk langganan Lite dengan Google Cloud CLI atau Pub/Sub Lite API.

gcloud

Untuk mencari langganan Lite, gunakan perintah gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Ganti kode berikut:

  • SUBSCRIPTION_ID: ID langganan Lite

  • LITE_LOCATION: lokasi langganan Lite

  • PUBLISH_TIME: stempel waktu publikasi yang akan dicari

  • EVENT_TIME: stempel waktu peristiwa yang akan dicari

  • STARTING_OFFSET: beginning atau end

Lihat gcloud topic datetimes untuk mengetahui informasi tentang format waktu.

Jika Anda menentukan flag --async dan permintaan berhasil, command line akan menampilkan ID operasi pencarian:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Gunakan perintah gcloud pubsub lite-operations describe untuk mendapatkan status operasi.

REST

Untuk mencari langganan Lite, kirim permintaan POST seperti berikut:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Ganti kode berikut:

  • REGION: region tempat langganan Lite berada

  • PROJECT_NUMBER: nomor project dari project dengan langganan Lite

  • LITE_LOCATION: lokasi langganan Lite

  • SUBSCRIPTION_ID: ID langganan Lite

Untuk mencari ke awal atau akhir backlog pesan, tetapkan kolom berikut dalam isi permintaan:

{
  "namedTarget": NAMED_TARGET
}

Ganti kode berikut:

  • NAMED_TARGET: TAIL untuk awal atau HEAD untuk akhir backlog pesan.

Untuk mencari stempel waktu publikasi, tetapkan kolom berikut dalam isi permintaan:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Tentukan "eventTime" untuk mencari stempel waktu peristiwa.

Ganti kode berikut:

  • TIMESTAMP: Stempel waktu dalam format UTC RFC 3339, dengan resolusi nanodetik dan hingga sembilan digit pecahan. Contoh: "2014-10-02T15:01:23Z" dan "2014-10-02T15:01:23.045123456Z".

Jika permintaan berhasil, responsnya adalah operasi yang berjalan lama dalam format JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Java di Library Klien Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Python di Library Klien Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Jika permintaan pencarian berhasil, responsnya adalah ID operasi yang berjalan lama. Lihat informasi tentang melacak penyebaran pencarian di bawah jika Anda perlu mengetahui kapan subscriber bereaksi terhadap pencarian.

Klien yang didukung

Operasi pencarian memerlukan pelanggan yang menggunakan library klien Pub/Sub Lite dan versi minimum berikut:

Operasi pencarian tidak berfungsi saat Pub/Sub Lite digunakan dengan Apache Beam atau Apache Spark karena sistem ini melakukan pelacakan offsetnya sendiri dalam partisi. Solusi adalah menghabiskan, mencari, dan memulai ulang alur kerja.

Layanan Pub/Sub Lite dapat mendeteksi klien pelanggan yang tidak mendukung operasi pencarian (misalnya, versi library klien lama atau framework yang tidak didukung) dan akan membatalkan pencarian dengan status error FAILED_PRECONDITION.

Melacak penyebaran pencarian

Jika ID operasi yang berjalan lama ditampilkan untuk permintaan pencarian awal, ini berarti pencarian berhasil didaftarkan di layanan Pub/Sub Lite dan pada akhirnya akan diterapkan ke pelanggan (jika klien didukung, seperti di atas). Operasi melacak penyebaran ini dan selesai setelah pelanggan bereaksi terhadap pencarian, untuk semua partisi.

Jika subscriber sedang online, mungkin perlu waktu hingga 30 detik untuk menerima notifikasi penelusuran. Notifikasi pencarian dikirim secara terpisah untuk setiap partisi, sehingga partisi mungkin tidak bereaksi terhadap pencarian pada saat yang sama. Jika pelanggan offline, operasi pencarian akan selesai setelah mereka online.

Jika pemanggilan pencarian sebelumnya belum selesai disebarkan ke subscriber, pemanggilan tersebut akan dibatalkan dan diganti dengan operasi pencarian baru. Metadata operasi pencarian berakhir masa berlakunya setelah 30 hari, yang secara efektif membatalkan operasi pencarian yang tidak lengkap.

Status operasi pencarian

Anda bisa mendapatkan status operasi pencarian menggunakan Google Cloud CLI, atau Pub/Sub Lite API.

gcloud

Untuk mendapatkan detail tentang operasi Lite, gunakan perintah gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Ganti kode berikut:

  • OPERATION_ID: ID operasi Lite

  • LITE_LOCATION: lokasi operasi Lite

Jika permintaan berhasil, command line akan menampilkan metadata tentang operasi Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Untuk mendapatkan detail tentang operasi Lite, kirim permintaan GET seperti berikut:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Ganti kode berikut:

  • REGION: region tempat operasi Lite berada

  • PROJECT_NUMBER: nomor project dari project dengan operasi Lite

  • LITE_LOCATION: lokasi operasi Lite

  • OPERATION_ID: ID operasi Lite

Jika permintaan berhasil, responsnya adalah operasi yang berjalan lama dalam format JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Mencantumkan operasi pencarian

Operasi pencarian yang telah selesai dan aktif dapat dicantumkan menggunakan Google Cloud CLI, atau Pub/Sub Lite API.

gcloud

Untuk mencantumkan operasi Lite dalam project, gunakan perintah gcloud pubsub lite-operations list:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Ganti kode berikut:

  • LITE_LOCATION: lokasi tempat operasi Lite berada

  • SUBSCRIPTION: memfilter operasi menurut langganan Lite

  • DONE: true untuk hanya menyertakan operasi yang lengkap, false untuk hanya menyertakan operasi aktif

  • LIMIT: bilangan bulat untuk membatasi jumlah operasi yang ditampilkan

Jika permintaan berhasil, command line akan menampilkan ringkasan operasi Lite:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Untuk mencantumkan operasi Lite dalam project, kirim permintaan GET seperti berikut:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Ganti kode berikut:

  • REGION: region tempat operasi Lite berada

  • PROJECT_NUMBER: nomor project dari project dengan operasi Lite

  • LITE_LOCATION: lokasi tempat operasi Lite berada

Jika permintaan berhasil, responsnya adalah daftar operasi Lite dalam format JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}