1회만 전송

이 페이지에서는 1회만 시맨틱스를 사용하여 메시지를 수신하고 확인하는 방법을 설명합니다. 풀 구독 유형만이 StreamingPull API를 사용하는 구독자를 포함한 1회만 전송을 지원합니다.

푸시 및 내보내기 구독은 1회만 전송을 지원하지 않습니다.

1회만 전송

Pub/Sub는 Pub/Sub에서 정의한 고유 메시지 ID를 기반으로 클라우드 리전 내에서 1회만 전송을 지원합니다.

기능이 사용 설정되면 Pub/Sub에서 다음을 제공합니다.

  • 메시지가 성공적으로 확인되면 다시 전송되지 않습니다.

  • 메시지가 대기 중일 때는 다시 전송되지 않습니다. 확인 기한이 만료되거나 메시지가 확인될 때까지 메시지가 대기 중으로 고려됩니다.

  • 전송이 여러 번 유효하게 진행된 경우 확인 기한 만료 또는 클라이언트에서 시작된 부정 확인으로 인해 최신 확인 ID만 사용해서 메시지를 확인할 수 있습니다. 이전 확인 ID가 있는 요청은 실패합니다.

다시 전송과 중복 비교

예상한 재전송과 예기치 않은 재전송의 차이를 이해하는 것이 중요합니다.

  • 클라이언트에서 시작된 메시지에 대한 부정 확인으로 인해 또는 확인 기한 만료 전 클라이언트가 메시지에 대해 확인 기한을 연장하지 않은 경우에 다시 전송이 수행될 수 있습니다. 다시 전송은 유효한 것으로 고려되고 시스템이 의도된 대로 작동합니다.

    재전송 문제를 해결하려면 중복 처리를 참조하세요.

  • 중복은 확인이 성공적으로 완료된 후 또는 확인 기한이 만료되기 전에 메시지가 다시 전송될 경우입니다.

  • 재전송된 메일은 재전송 시도 간에 동일한 메시지 ID를 유지합니다.

1회만 전송이 사용 설정된 구독은 중복 전송을 수신하지 않습니다.

클라이언트 라이브러리의 1회만 전송 지원

  • 지원되는 클라이언트 라이브러리에는 응답 확인을 위한 인터페이스가 있습니다(예: Go). 이 인터페이스를 사용하여 확인 요청이 성공했는지 확인할 수 있습니다. 확인 요청이 성공하면 클라이언트에 재전송이 수신되지 않습니다. 확인 요청이 실패하면 클라이언트가 재전송을 예상할 수 있습니다.

  • 클라이언트가 확인 인터페이스 없이 지원되는 클라이언트 라이브러리를 사용할 수도 있습니다. 그러나 이러한 경우에는 확인 실패로 인해 메시지가 자동으로 재전송될 수 있습니다.

  • 지원되는 클라이언트 라이브러리에는 최소 임대 확장 시간을 설정하기 위한 인터페이스가 포함됩니다(예: Go). 네트워크 관련 확인 만료를 방지하기 위해서는 최소 임대 확장 값을 높은 숫자로 설정해야 합니다. 최댓값은 600초로 설정됩니다.

1회만 전송과 관련된 변수의 기본값과 범위, 변수 이름은 클라이언트 라이브러리마다 다를 수 있습니다. 예를 들어 자바 클라이언트 라이브러리에서는 다음 변수가 1회만 전송을 제어합니다.

변수 설명
setEnableExactlyOnceDelivery 1회만 전송을 사용 설정 또는 중지합니다. true 또는 false 기본값=false
minDurationPerAckExtension 확인 기한 수정을 연장하는 데 사용할 최소 시간(초)입니다. 범위=0~600 기본값=없음
maxDurationPerAckExtension 확인 기한 수정을 연장하는 데 사용할 최대 시간(초)입니다. 범위=0~600 기본값=없음

1회만 전송에서는 확인 ID가 이미 만료된 경우 Pub/Sub에 대한 modifyAckDeadline 또는 acknowledgment 요청이 실패합니다. 이 경우 새 전송이 이미 진행 중일 수 있으므로 서비스에서 만료된 확인 ID를 잘못된 것으로 간주합니다. 이는 1회만 전송의 기본 설정입니다. acknowledgmentModifyAckDeadline 요청이 INVALID_ARGUMENT 응답을 반환합니다. 1회만 전송이 중지되면 확인 ID가 만료된 경우 이러한 요청이 OK를 반환합니다.

acknowledgmentModifyAckDeadline 요청에 유효한 확인 ID가 있는지 확인하려면 minDurationPerAckExtension의 값을 높게 설정합니다.

리전별 고려사항

1회만 전송 보장은 구독자가 동일한 리전의 서비스에 연결할 때만 적용됩니다. 구독자 애플리케이션이 여러 리전에 분산되어 있으면 1회만 전송이 사용 설정되어 있더라도 메시지 전송이 중복될 수 있습니다. 게시자가 모든 리전에 메시지를 전송할 수 있고 1회만 보장이 계속 유지됩니다.

Google Cloud 내에서 애플리케이션을 실행할 때는 기본적으로 동일 리전의 Pub/Sub 엔드포인트에 연결됩니다. 따라서 Google Cloud 내의 단일 리전에서 애플리케이션을 실행하면 일반적으로 단일 리전과 상호작용하도록 보장됩니다.

Google Cloud 외부 또는 여러 리전에서 구독자 애플리케이션을 실행하는 경우 Pub/Sub 클라이언트를 구성할 때 위치별 엔드포인트를 사용하여 단일 리전에 연결하도록 보장할 수 있습니다. Pub/Sub의 모든 위치 엔드포인트는 단일 리전에 연결됩니다. 위치별 엔드포인트에 대한 자세한 내용은 Pub/Sub 엔드포인트를 참조하세요. Pub/Sub의 모든 위치별 엔드포인트 목록은 위치별 엔드포인트 목록을 참조하세요.

1회만 전송 구독 만들기

Google Cloud 콘솔, Google Cloud CLI, 클라이언트 라이브러리 또는 Pub/Sub API를 사용하여 1회만 전송 구독을 만들 수 있습니다.

풀 구독

콘솔

1회만 전송 pull 구독을 만들려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 구독 페이지로 이동합니다.

    구독 페이지로 이동

  2. 구독 만들기를 클릭합니다.

  3. 구독 ID를 입력합니다.

  4. 드롭다운 메뉴에서 하나의 주제를 선택하거나 만듭니다.

    구독은 주제에서 메시지를 수신합니다.

  5. 1회만 전송 섹션에서 1회만 전송 사용 설정을 선택합니다.

  6. 만들기를 클릭합니다.

gcloud

1회만 전송 pull 구독을 만들려면 gcloud pubsub subscriptions create 명령어를 --enable-exactly-once-delivery 플래그와 함께 실행합니다.

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

다음을 바꿉니다.

  • SUBSCRIPTION_ID: 생성할 구독의 ID
  • TOPIC_ID: 구독에 연결할 주제의 ID

REST

1회만 전송 구독을 만들려면 projects.subscriptions.create 메서드를 사용합니다.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

다음을 바꿉니다.

  • PROJECT_ID: 구독을 만들려는 프로젝트의 프로젝트 ID
  • SUBSCRIPTION_ID: 생성할 구독의 ID

1회만 전송 pull 구독을 만들려면 요청 본문에 다음을 지정합니다.

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

다음을 바꿉니다.

  • PROJECT_ID: 주제가 있는 프로젝트의 프로젝트 ID
  • TOPIC_ID: 구독에 연결할 주제의 ID

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	})
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 자바 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 참조하세요.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 참조하세요.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Node.js

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참고 문서를 참조하세요.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

1회만 메시지 전송 구독

다음은 클라이언트 라이브러리를 사용하여 1회만 전송 구독을 위한 몇 가지 코드 샘플입니다.

풀 구독

Go

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()

	// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Set MinExtensionPeriod high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용PHP 설정 안내를 따르세요. 자세한 내용은 Pub/Sub PHP API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용C# 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C# API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

1회만 전송 구독 모니터링

subscription/exactly_once_warning_count 측정항목은 다시 전송으로 이어질 수 있는 이벤트 수를 기록합니다(유효 또는 중복). 이 측정항목은 Pub/Sub에서 확인 ID와 연결된 요청 처리가 실패한 횟수를 계산합니다(ModifyAckDeadline 또는 acknowledgment 요청). 실패 이유는 서버 또는 클라이언트 때문일 수 있습니다. 예를 들어 1회만 전송 정보를 유지하는 데 사용되는 영구적 계층을 사용할 수 없으면 서버 기반 이벤트입니다. 클라이언트가 잘못된 확인 ID를 사용하여 메시지를 확인하려고 시도할 경우에는 클라이언트 기반 이벤트입니다.

측정항목 이해

subscription/exactly_once_warning_count는 실제 다시 전송으로 이어질 수 있거나 없고 클라이언트 동작을 기준으로 노이즈가 있을 수 있는 이벤트를 캡처합니다. 예를 들어 잘못된 확인 ID가 있는 반복되는 acknowledgment 또는 ModifyAckDeadline 요청은 측정항목을 반복해서 증가시킵니다.

다음 측정항목은 클라이언트 동작을 이해하는 데에도 유용합니다.

  • subscription/expired_ack_deadlines_count 측정항목은 확인 ID 만료 수를 보여줍니다. 확인 ID가 만료되면 ModifyAckDeadline 요청과 acknowledgment 요청이 모두 실패할 수 있습니다.

  • 요청이 Google Cloud에 연결되지만 Pub/Sub에 연결되지 않는 경우 service.serviceruntime.googleapis.com/api/request_count 측정항목을 사용하여 ModifyAckDeadline 또는 acknowledgment 요청의 실패를 캡처할 수 있습니다. 예를 들어 클라이언트가 Google Cloud에서 연결 해제될 때는 이 측정항목으로 캡처되지 않는 실패가 있습니다.

재시도 가능한 실패 이벤트의 대부분은 지원되는 클라이언트 라이브러리가 요청을 자동으로 재시도합니다.

할당량

1회만 전송 구독에는 추가 할당량 요구사항이 적용됩니다. 이 할당량은 다음에 적용됩니다.

  • 리전별로 1회만 전송이 사용 설정된 구독에서 소비되는 메시지 수
  • 리전별로 1회만 전송이 사용 설정된 구독을 사용할 때 확인된 메시지 또는 기한이 연장된 메시지 수

이러한 할당량에 대한 자세한 내용은 할당량 주제의 표를 참조하세요.

1회만 전송 및 정렬된 구독

Pub/Sub는 정렬된 전송을 사용하여 1회만 전송을 지원합니다.

1회만 전송으로 정렬을 사용할 경우 Pub/Sub는 확인을 순서대로 진행합니다. 확인 순서가 맞지 않으면 일시적인 오류와 함께 요청이 실패합니다. 전송의 순서에 따른 확인 전에 확인 기한이 만료되면 클라이언트에서 메시지 재전송을 받습니다. 따라서 1회만 전송으로 순서 지정을 사용하면 클라이언트 처리량이 초당 1,000개로 제한됩니다.

1회만 전송 및 푸시 구독

Pub/Sub는 풀 구독에서만 1회만 전송을 지원합니다.

푸시 구독의 메시지를 사용하는 클라이언트는 성공적인 응답으로 푸시 요청에 응답하여 메시지를 확인합니다. 하지만 클라이언트는 Pub/Sub 구독이 응답을 받아서 처리했는지 여부를 알 수 없습니다. 이 방식은 요청이 성공적으로 처리된 경우 클라이언트에서 확인 요청을 보내고 Pub/Sub 구독에서 응답하는 풀 구독과 다릅니다. 따라서 1회만 전송의 시맨틱스는 푸시 구독에 잘 부합하지 않습니다.

알아두어야 할 사항

  • CreateSubscription 시간에 확인 기한이 지정되지 않은 경우 1회만 전송이 사용 설정된 구독에서 기본 확인 기한은 60초입니다.

  • 기본 확인 기한이 길면 네트워크 이벤트로 인해 발생하는 재전송을 방지하는 데 효과적입니다. 지원되는 클라이언트 라이브러리는 기본 구독 확인 기한을 사용하지 않습니다.

  • 1회만 전송 구독은 게시 후 구독 지연 시간이 일반 구독보다 현저히 높습니다.

  • 높은 처리량이 필요한 경우 정확히 한 번의 전송 클라이언트가 스트리밍 가져오기도 사용해야 합니다.

  • 1회만 전송이 사용 설정되더라도 게시 측 중복으로 인해 구독에서 동일한 메시지의 사본을 여러 개 수신할 수 있습니다. 게시 측 중복은 게시 클라이언트 또는 Pub/Sub 서비스가 고유한 게시를 여러 번 재시도할 때 발생할 수 있습니다. 재시도에 게시 클라이언트의 고유한 게시가 여러 개 있으면 여러 메시지 ID를 사용하여 재전송이 이루어집니다. 클라이언트 게시 요청에 응답하기 위한 Pub/Sub 서비스의 여러 고유한 게시는 동일한 메시지 ID를 사용해 재전송됩니다.

  • 실패는 subscription/exactly_once_warning_count에서 재시도할 수 있으며 지원되는 클라이언트 라이브러리가 자동으로 다시 시도합니다. 그러나 잘못된 확인 ID와 관련된 실패는 재시도할 수 없습니다.