Pub/Sub Lite 메시지 재생 및 삭제

이 페이지에서는 Lite 구독의 탐색 작업을 시작하고 추적하는 방법을 설명합니다.

Pub/Sub Lite 탐색 기능을 사용하면 메시지를 재생하고 삭제할 수 있습니다. Pub/Sub 탐색과 동일한 사용 사례가 있습니다. Pub/Sub와 달리, 탐색을 사용하도록 Lite 주제 또는 구독을 구성할 필요가 없으며 추가 비용이 없습니다.

구독자에 대한 탐색 전파는 장기 실행 작업을 사용하여 추적할 수 있습니다. 이는 Google Cloud 제품에서 장기 실행 작업의 진행 상황을 추적하는 데 사용되는 API 패턴입니다.

탐색 시작

Pub/Sub 라이트 탐색 작업은 대역 외부(즉, Google Cloud CLI 또는 별도의 Pub/Sub Lite API)에서 시작되고 구독자로 전파됩니다. 온라인 구독자는 서비스를 사용하는 동안 탐색 및 반응 알림을 받습니다. 오프라인 구독자는 온라인 상태가 되면 탐색에 반응합니다.

탐색의 타겟 위치를 지정해야 하며, 위치는 다음 중 하나일 수 있습니다.

  • 메시지 백로그 시작: 모든 보관 메시지를 재생합니다. 사용 가능한 백로그의 양은 Lite 주제의 메시지 보관 기간 및 저장용량에 따라 결정됩니다.
  • 메시지 백로그 끝: 현재 게시된 모든 메시지를 건너뛰어 메시지를 삭제합니다.
  • 게시 타임스탬프: 서버에서 생성된 게시 타임스탬프가 지정된 타임스탬프보다 크거나 같은 첫 번째 메시지를 탐색합니다. 이러한 메시지를 찾을 수 없다면 메시지 백로그의 끝을 찾습니다. 후속 메시지의 게시 타임스탬프는 지정된 타임스탬프보다 크거나 같지만 이후 시간으로 지정된 타임스탬프는 예외입니다.
  • 이벤트 타임스탬프: 사용자가 지정한 이벤트 타임스탬프가 지정된 타임스탬프보다 크거나 같은 첫 번째 메시지를 탐색합니다. 이러한 메시지를 찾을 수 없다면 메시지 백로그의 끝을 찾습니다. 이벤트 타임스탬프는 사용자가 제공하므로 후속 메시지의 이벤트 타임스탬프가 지정된 이벤트 시간보다 짧을 수 있으며 필요한 경우 클라이언트에서 필터링해야 합니다. 메시지에 이벤트 타임스탬프가 설정되어 있지 않으면 게시 타임스탬프가 대체용으로 사용됩니다.

Google Cloud CLI 또는 Pub/Sub Lite API를 사용하여 라이트 구독 탐색을 시작할 수 있습니다.

gcloud

Lite 구독을 탐색하려면 gcloud pubsub lite-subscriptions seek 명령어를 사용합니다.

gcloud pubsub lite-subscriptions seek SUBSCRIPTION_ID \
  --location=LITE_LOCATION \
  (--publish-time=PUBLISH_TIME | --event-time=EVENT_TIME | \
       --starting-offset=STARTING_OFFSET) \
  [--async]

다음을 바꿉니다.

  • SUBSCRIPTION_ID: 라이트 구독의 ID

  • LITE_LOCATION: 라이트 구독의 위치

  • PUBLISH_TIME: 탐색할 게시 타임스탬프

  • EVENT_TIME: 탐색할 이벤트 타임스탬프

  • STARTING_OFFSET: beginning 또는 end

시간 형식에 대한 자세한 내용은 gcloud topic datetimes를 참조하세요.

--async 플래그를 지정하고 요청이 성공하면 명령줄에 탐색 작업의 ID가 표시됩니다.

Check operation [projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID] for status.

gcloud pubsub lite-operations describe 명령어를 사용하여 작업 상태를 가져옵니다.

REST

Lite 구독을 탐색하려면 다음과 같이 POST 요청을 보냅니다.

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID:seek
Authorization: Bearer $(gcloud auth print-access-token)

다음을 바꿉니다.

  • REGION: 라이트 구독이 있는 리전

  • PROJECT_NUMBER: 라이트 구독이 있는 프로젝트의 프로젝트 번호

  • LITE_LOCATION: 라이트 구독의 위치

  • SUBSCRIPTION_ID: 라이트 구독의 ID

메시지 백로그의 시작 또는 끝을 탐색하려면 요청 본문에 다음 필드를 설정합니다.

{
  "namedTarget": NAMED_TARGET
}

다음을 바꿉니다.

  • NAMED_TARGET: 메시지 백로그의 시작인 경우 TAIL, 끝인 경우 HEAD입니다.

게시 타임스탬프를 탐색하려면 요청 본문에 다음 필드를 설정합니다.

{
  "timeTarget": {
    "publishTime": TIMESTAMP
  }
}

이벤트 타임스탬프를 탐색하려면 "eventTime"을 지정합니다.

다음을 바꿉니다.

  • TIMESTAMP: RFC 3339 UTC 형식의 타임스탬프로, 나노초 단위이며 최대 9자리 소수까지 가능합니다. 예를 들면 "2014-10-02T15:01:23Z""2014-10-02T15:01:23.045123456Z"입니다.

요청이 성공하면 응답은 JSON 형식의 장기 실행 작업입니다.

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

// seekSubscription initiates a seek operation for a subscription.
func seekSubscription(w io.Writer, projectID, region, zone, subID string, seekTarget pubsublite.SeekTarget, waitForOperation bool) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// subID := "my-subscription"
	// seekTarget := pubsublite.Beginning
	// waitForOperation := false

	// Possible values for seekTarget:
	// - pubsublite.Beginning: replays from the beginning of all retained
	//   messages.
	// - pubsublite.End: skips past all current published messages.
	// - pubsublite.PublishTime(<time>): delivers messages with publish time
	//   greater than or equal to the specified timestamp.
	// - pubsublite.EventTime(<time>): seeks to the first message with event
	//   time greater than or equal to the specified timestamp.

	// Waiting for the seek operation to complete is optional. It indicates when
	// subscribers for all partitions are receiving messages from the seek
	// target. If subscribers are offline, the operation will complete once they
	// are online.

	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %w", err)
	}
	defer client.Close()

	// Initiate an out-of-band seek for a subscription to the specified target.
	// If an operation is returned, the seek has been successfully registered
	// and will eventually propagate to subscribers.
	subPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", projectID, zone, subID)
	seekOp, err := client.SeekSubscription(ctx, subPath, seekTarget)
	if err != nil {
		return fmt.Errorf("client.SeekSubscription got err: %w", err)
	}
	fmt.Fprintf(w, "Seek operation initiated: %s\n", seekOp.Name())

	if waitForOperation {
		_, err = seekOp.Wait(ctx)
		if err != nil {
			return fmt.Errorf("seekOp.Wait got err: %w", err)
		}
		metadata, err := seekOp.Metadata()
		if err != nil {
			return fmt.Errorf("seekOp.Metadata got err: %w", err)
		}
		fmt.Fprintf(w, "Seek operation completed with metadata: %v\n", metadata)
	}
	return nil
}

자바

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SeekTarget;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.OperationMetadata;
import com.google.cloud.pubsublite.proto.SeekSubscriptionResponse;

public class SeekSubscriptionExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    // Choose a target location within the message backlog to seek a subscription to.
    // Possible values for SeekTarget:
    // - SeekTarget.of(BacklogLocation.BEGINNING): replays from the beginning of all retained
    //   messages.
    // - SeekTarget.of(BacklogLocation.END): skips past all current published messages.
    // - SeekTarget.ofPublishTime(<timestamp>): delivers messages with publish time greater than
    //   or equal to the specified timestamp.
    // - SeekTarget.ofEventTime(<timestamp>): seeks to the first message with event time greater
    //   than or equal to the specified timestamp.
    SeekTarget target = SeekTarget.of(BacklogLocation.BEGINNING);

    // Optional: Wait for the seek operation to complete, which indicates when subscribers for all
    // partitions are receiving messages from the seek target. If subscribers are offline, the
    // operation will complete once they are online.
    boolean waitForOperation = false;

    seekSubscriptionExample(
        cloudRegion, zoneId, projectNumber, subscriptionId, target, waitForOperation, regional);
  }

  public static void seekSubscriptionExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String subscriptionId,
      SeekTarget target,
      boolean waitForOperation,
      boolean regional)
      throws Exception {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      // Initiate an out-of-band seek for a subscription to the specified target. If an operation
      // is returned, the seek has been successfully registered and will eventually propagate to
      // subscribers.
      OperationFuture<SeekSubscriptionResponse, OperationMetadata> seekFuture =
          adminClient.seekSubscription(subscriptionPath, target);
      System.out.println("Seek operation " + seekFuture.getName() + " initiated successfully.");

      if (waitForOperation) {
        System.out.println("Waiting for operation to complete...");
        seekFuture.get();
        System.out.println("Operation completed. Metadata:\n" + seekFuture.getMetadata().get());
      }
    }
  }
}

Python

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Python 설정 안내를 따르세요.

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, SubscriptionPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# seek_target = BacklogLocation.BEGINNING
# wait_for_operation = False
# regional = True

# Possible values for seek_target:
# - BacklogLocation.BEGINNING: replays from the beginning of all retained
#   messages.
# - BacklogLocation.END: skips past all current published messages.
# - PublishTime(<datetime>): delivers messages with publish time greater
#   than or equal to the specified timestamp.
# - EventTime(<datetime>): seeks to the first message with event time
#   greater than or equal to the specified timestamp.

# Waiting for the seek operation to complete is optional. It indicates when
# subscribers for all partitions are receiving messages from the seek
# target. If subscribers are offline, the operation will complete once they
# are online.

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)

client = AdminClient(cloud_region)
try:
    # Initiate an out-of-band seek for a subscription to the specified
    # target. If an operation is returned, the seek has been successfully
    # registered and will eventually propagate to subscribers.
    seek_operation = client.seek_subscription(subscription_path, seek_target)
    print(f"Seek operation: {seek_operation.operation.name}")
except NotFound:
    print(f"{subscription_path} not found.")
    return

if wait_for_operation:
    print("Waiting for operation to complete...")
    seek_operation.result()
    print(f"Operation completed. Metadata:\n{seek_operation.metadata}")

탐색 요청이 성공하면 응답은 장기 실행 작업 ID입니다. 구독자가 탐색에 반응한 시점을 알고 싶다면 아래 탐색 전파 추적에 대한 정보를 참조하세요.

지원되는 클라이언트

탐색 작업에는 다음과 같은 Pub/Sub Lite 클라이언트 라이브러리와 최소 버전을 사용하는 구독자가 필요합니다.

Apache Beam 또는 Apache Spark와 함께 Pub/Sub Lite를 사용할 경우 탐색 작업이 작동하지 않습니다. 이러한 시스템이 파티션 내에서 자체적으로 오프셋을 추적하기 때문입니다. 해결 방법은 워크플로를 드레이닝하고 탐색한 후 다시 시작하는 것입니다.

Pub/Sub Lite 서비스는 탐색 작업을 지원하지 않는 구독자 클라이언트(예: 이전 클라이언트 라이브러리 버전 또는 지원되지 않는 프레임워크)를 감지할 수 있으며 FAILED_PRECONDITION 오류 상태와 함께 탐색을 취소합니다.

탐색 전파 추적

초기 탐색 요청에 대해 장기 실행 작업 ID가 반환되면 이는 Pub/Sub Lite 서비스에 탐색이 성공적으로 등록되었으며, 이후에 구독자에게 전파됨을 의미합니다(위와 같이 클라이언트가 지원되는 경우). 작업은 이 전파를 추적하고 구독자가 모든 파티션에 대해 탐색에 반응하면 완료됩니다.

온라인 상태인 구독자는 탐색 알림을 수신하는 데 최대 30초가 걸릴 수 있습니다. 탐색 알림은 각 파티션에 대해 독립적으로 전송되므로 파티션이 동시에 탐색에 반응하지 않을 수 있습니다. 구독자가 오프라인 상태인 경우 온라인 상태가 되면 탐색 작업이 완료됩니다.

이전 탐색 호출이 구독자에게 전파되지 않은 경우 취소되고 새 탐색 작업으로 대체됩니다. 탐색 작업 메타데이터는 30일 후에 만료되므로 불완전한 탐색 작업을 효과적으로 취소합니다.

작업 상태 탐색

Google Cloud CLI 또는 Pub/Sub Lite API를 사용하여 탐색 작업 상태를 가져올 수 있습니다.

gcloud

Lite 작업에 대한 세부정보를 가져오려면 gcloud pubsub lite-operations describe 명령어를 사용합니다.

gcloud pubsub lite-operations describe OPERATION_ID \
  --location=LITE_LOCATION

다음을 바꿉니다.

  • OPERATION_ID: Lite 작업의 ID

  • LITE_LOCATION: 라이트 작업 위치

요청이 성공하면 명령줄에 Lite 작업에 대한 메타데이터가 표시됩니다.

metadata:
  '@type': type.googleapis.com/google.cloud.pubsublite.v1.OperationMetadata
  createTime: '2021-01-02T03:04:05Z'
  target: projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID
  verb: seek
name: projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID

REST

Lite 작업에 대한 세부정보를 가져오려면 다음과 같이 GET 요청을 보냅니다.

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID
Authorization: Bearer $(gcloud auth print-access-token)

다음을 바꿉니다.

  • REGION: 라이트 작업이 있는 리전

  • PROJECT_NUMBER: Lite 작업이 있는 프로젝트의 프로젝트 번호

  • LITE_LOCATION: 라이트 작업 위치

  • OPERATION_ID: Lite 작업의 ID

요청이 성공하면 응답은 JSON 형식의 장기 실행 작업입니다.

{
  "name": projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID,
  ...
}

탐색 작업 나열

완료 및 활성 탐색 작업은 Google Cloud CLI 또는 Pub/Sub Lite API를 사용하여 나열할 수 있습니다.

gcloud

프로젝트의 Lite 작업을 나열하려면 gcloud pubsub lite-operations list 명령어를 사용합니다.

gcloud pubsub lite-operations list \
    --location=LITE_LOCATION \
    [--subscription=SUBSCRIPTION] \
    [--done=DONE] \
    [--limit=LIMIT]

다음을 바꿉니다.

  • LITE_LOCATION: 라이트 작업이 있는 위치

  • SUBSCRIPTION: Lite 구독으로 작업 필터링

  • DONE: 완료 작업만 포함하려면 true, 활성 작업만 포함하려면 false입니다.

  • LIMIT: 반환되는 작업 수를 제한하는 정수

요청이 성공하면 명령줄에 Lite 작업의 요약이 표시됩니다.

OPERATION_ID  TARGET                                                                         CREATE_TIME           DONE   ERROR_CODE  MESSAGE
operation2    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-05-06T07:08:00Z  True
operation1    projects/PROJECT_NUMBER/locations/LITE_LOCATION/subscriptions/SUBSCRIPTION_ID  2021-01-02T03:04:00Z  True

REST

프로젝트의 Lite 작업을 나열하려면 다음과 같이 GET 요청을 보냅니다.

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations
Authorization: Bearer $(gcloud auth print-access-token)

다음을 바꿉니다.

  • REGION: 라이트 작업이 있는 리전

  • PROJECT_NUMBER: Lite 구독이 있는 프로젝트의 프로젝트 번호

  • LITE_LOCATION: 라이트 작업이 있는 위치

요청이 성공하면 응답은 JSON 형식의 Lite 작업 목록입니다.

{
  "operations": [
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/LITE_LOCATION/operations/OPERATION_ID",
          ...
      }
  ]
}