Memutar ulang dan menghapus pesan Pub/Sub Lite

Halaman ini menunjukkan cara memulai dan melacak operasi pencarian untuk langganan Lite.

Fitur pencarian Pub/Sub Lite memungkinkan Anda memutar ulang dan menghapus pesan secara permanen. Ini memiliki kasus penggunaan yang sama dengan pencarian Pub/Sub. Tidak seperti Pub/Sub, Anda tidak perlu mengonfigurasi topik atau langganan Lite untuk menggunakan pencarian dan tidak ada biaya tambahan.

Penyebaran pencarian ke pelanggan dapat dilacak menggunakan operasi yang berjalan lama. Ini adalah pola API yang digunakan oleh produk Google Cloud untuk melacak progres tugas yang berjalan lama.

Memulai pencarian

Operasi pencarian Pub/Sub Lite dimulai secara out-of-band (yaitu, dari Google Cloud CLI atau Pub/Sub Lite API terpisah) dan disebarkan ke pelanggan. Pelanggan online akan diberi tahu tentang pencarian dan reaksi saat mereka sedang live. Pelanggan offline akan bereaksi terhadap pencarian tersebut setelah online.

Anda harus menentukan lokasi target untuk pencarian, yang mungkin merupakan salah satu hal berikut:

  • Awal backlog pesan: Memutar ulang semua pesan yang disimpan. Perhatikan bahwa jumlah backlog yang tersedia ditentukan oleh periode retensi pesan dan kapasitas penyimpanan topik Lite.
  • Akhir dari backlog pesan: Menghapus pesan dengan melewati semua pesan yang saat ini dipublikasikan.
  • Stempel waktu publikasi: Mencari pesan pertama dengan stempel waktu publikasi (yang dihasilkan server) lebih besar dari atau sama dengan stempel waktu yang ditentukan. Jika pesan tersebut tidak dapat ditemukan, cari ke akhir backlog pesan. Pesan berikutnya dijamin memiliki stempel waktu publikasi yang lebih besar dari atau sama dengan stempel waktu yang ditentukan, dengan pengecualian stempel waktu yang ditentukan yang ada di masa mendatang.
  • Stempel waktu peristiwa: Mencari pesan pertama dengan stempel waktu peristiwa (ditentukan oleh pengguna) yang lebih besar dari atau sama dengan stempel waktu yang ditentukan. Jika pesan tersebut tidak dapat ditemukan, cari ke akhir backlog pesan. Karena stempel waktu peristiwa diberikan oleh pengguna, pesan berikutnya mungkin memiliki stempel waktu peristiwa yang lebih singkat dari waktu peristiwa yang ditentukan dan harus difilter oleh klien, jika perlu. Jika pesan tidak memiliki stempel waktu peristiwa yang ditetapkan, stempel waktu publikasinya akan digunakan sebagai penggantian.

Anda dapat memulai pencarian langganan Lite dengan Google Cloud CLI atau Pub/Sub Lite API.

gcloud

Untuk mencari langganan Lite, gunakan perintah gcloud pubsub lite-subscriptions seek:

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

Ganti kode berikut:

  • SUBSCRIPTION_ID: ID langganan Lite

  • LITE_LOCATION: lokasi langganan Lite

  • PUBLISH_TIME: stempel waktu publikasi yang akan dicari

  • EVENT_TIME: stempel waktu peristiwa yang akan dicari

  • STARTING_OFFSET: beginning atau end

Lihat gcloud topic datetimes untuk mengetahui informasi format waktu.

Jika Anda menentukan flag --async dan permintaan berhasil, command line akan menampilkan ID operasi pencarian:

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

Gunakan perintah gcloud pubsub lite-operations describe untuk mendapatkan status operasi.

REST

Untuk mencari langganan Lite, kirim permintaan POST seperti berikut:

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

Ganti kode berikut:

  • REGION: region tempat langganan Lite

  • PROJECT_NUMBER: nomor project project dengan langganan Lite

  • LITE_LOCATION: lokasi langganan Lite

  • SUBSCRIPTION_ID: ID langganan Lite

Untuk mencari ke awal atau akhir backlog pesan, tetapkan kolom berikut dalam isi permintaan:

{
  "namedTarget": NAMED_TARGET
}

Ganti kode berikut:

  • NAMED_TARGET: TAIL untuk awal atau HEAD untuk akhir backlog pesan.

Untuk mencari stempel waktu publikasi, tetapkan kolom berikut dalam isi permintaan:

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

Tentukan "eventTime" untuk mencari stempel waktu peristiwa.

Ganti kode berikut:

  • TIMESTAMP: Stempel waktu dalam format RFC 3339 UTC, dengan resolusi nanodetik dan maksimal sembilan digit pecahan. Contoh: "2014-10-02T15:01:23Z" dan "2014-10-02T15:01:23.045123456Z".

Jika permintaan berhasil, responsnya adalah operasi yang berjalan lama dalam format JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

Java

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Java di Library Klien Pub/Sub Lite.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

Sebelum menjalankan contoh ini, ikuti petunjuk penyiapan Python di Library Klien Pub/Sub Lite.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

Jika permintaan pencarian berhasil, responsnya adalah ID operasi yang berjalan lama. Lihat informasi tentang melacak propagasi pencari di bawah jika Anda perlu mengetahui kapan pelanggan bereaksi terhadap pencarian.

Klien yang didukung

Operasi pencarian memerlukan pelanggan yang menggunakan library klien Pub/Sub Lite dan versi minimum berikut:

Operasi pencarian tidak berfungsi jika Pub/Sub Lite digunakan dengan Apache Beam atau Apache Spark karena sistem ini melakukan pelacakan offsetnya sendiri dalam partisi. Solusinya adalah menguras, mencari, dan memulai ulang alur kerja.

Layanan Pub/Sub Lite dapat mendeteksi klien pelanggan yang tidak mendukung operasi pencarian (misalnya, versi library klien lama atau framework yang tidak didukung) dan akan membatalkan pencarian dengan status error FAILED_PRECONDITION.

Melacak propagasi pencari

Jika ID operasi yang berjalan lama ditampilkan untuk permintaan pencarian awal, ini berarti pencarian berhasil didaftarkan di layanan Pub/Sub Lite dan pada akhirnya akan disebarkan ke pelanggan (jika klien didukung, seperti di atas). Operasi tersebut melacak propagasi ini dan selesai setelah pelanggan bereaksi terhadap pencarian, untuk semua partisi.

Jika pelanggan online, mungkin perlu waktu hingga 30 detik bagi mereka untuk menerima notifikasi pencari. Notifikasi pencari dikirim secara independen untuk setiap partisi, sehingga partisi mungkin tidak bereaksi terhadap pencarian pada saat yang sama. Jika pelanggan offline, operasi pencarian akan selesai setelah mereka online.

Jika pemanggilan pencari sebelumnya belum selesai diterapkan ke pelanggan, pemanggilan pencari akan dibatalkan dan digantikan oleh operasi pencarian baru. Metadata operasi pencarian akan berakhir setelah 30 hari, sehingga akan membatalkan semua operasi pencarian yang belum selesai.

Status operasi pencarian

Anda bisa mendapatkan status operasi pencarian menggunakan Google Cloud CLI, atau Pub/Sub Lite API.

gcloud

Untuk mendapatkan detail tentang operasi Lite, gunakan perintah gcloud pubsub lite-operations describe:

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

Ganti kode berikut:

  • OPERATION_ID: ID operasi Lite

  • LITE_LOCATION: lokasi operasi Lite

Jika permintaan berhasil, command line akan menampilkan metadata tentang operasi Lite:

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Untuk mendapatkan detail tentang operasi Lite, kirim permintaan GET seperti berikut:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Ganti kode berikut:

  • REGION: region tempat operasi Lite berada

  • PROJECT_NUMBER: nomor project project dengan operasi Lite

  • LITE_LOCATION: lokasi operasi Lite

  • OPERATION_ID: ID operasi Lite

Jika permintaan berhasil, responsnya adalah operasi yang berjalan lama dalam format JSON:

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Operasi pencarian listingan

Operasi pencarian yang sudah selesai dan aktif dapat dicantumkan menggunakan Google Cloud CLI, atau Pub/Sub Lite API.

gcloud

Untuk mencantumkan operasi Lite dalam sebuah project, gunakan perintah gcloud pubsub lite-operations list:

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

Ganti kode berikut:

  • LITE_LOCATION: lokasi tempat operasi Lite berada

  • SUBSCRIPTION: operasi filter menurut langganan Lite

  • DONE: true untuk hanya menyertakan operasi lengkap, false untuk hanya menyertakan operasi aktif

  • LIMIT: bilangan bulat untuk membatasi jumlah operasi yang ditampilkan

Jika permintaan berhasil, command line akan menampilkan ringkasan operasi Lite:

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

Untuk mencantumkan operasi Lite dalam sebuah project, kirim permintaan GET seperti berikut:

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

Ganti kode berikut:

  • REGION: region tempat operasi Lite berada

  • PROJECT_NUMBER: nomor project project dengan operasi Lite

  • LITE_LOCATION: lokasi tempat operasi Lite berada

Jika permintaan berhasil, responsnya adalah daftar operasi Lite dalam format JSON:

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}