Entrega "exatamente uma vez"

Nesta página, explicamos como receber e confirmar mensagens usando o tipo "exatamente uma vez" semântica. Só o tipo de assinatura pull aceita a entrega exatamente uma vez, incluindo assinantes que usam API StreamingPull.

Enviar e exportar inscrições não são compatíveis com a entrega exatamente uma vez.

Entrega "exatamente uma vez"

O Pub/Sub dá suporte à entrega exatamente uma vez, em uma região da nuvem, com base em um serviço exclusivo definido pelo Pub/Sub ID da mensagem.

Quando o recurso está ativado, o Pub/Sub fornece o seguinte:

  • Nenhum reenvio ocorre após a confirmação da mensagem.

  • Nenhum reenvio ocorre enquanto uma mensagem está pendente. Uma mensagem é considerada pendente até que o prazo de confirmação expire ou a mensagem seja confirmado.

  • No caso de várias entregas válidas, devido ao prazo de confirmação ou confirmação negativa iniciada pelo cliente, somente o o ID de confirmação pode ser usado para confirmar a mensagem. Todas as solicitações com um ID de confirmação anterior.

Reenvio versus cópia

É importante entender a diferença entre esperado e inesperado novos envios.

  • Um reenvio pode ocorrer devido a ações negativas iniciadas pelo cliente. confirmação de uma mensagem ou quando o cliente não estende a confirmação prazo final da mensagem antes que o prazo de confirmação expire. Reenvios são considerados válidos e o sistema funciona como pretendido.

    Para resolver problemas com resgates, consulte Como lidar com cópias.

  • Uma cópia é quando uma mensagem é reenviada após uma confirmação bem-sucedida. ou antes do vencimento do prazo de confirmação.

  • Uma mensagem reenviada mantém o mesmo ID entre as tentativas de reenvio.

As assinaturas com a opção "Exatamente uma vez" ativada não recebem cópias entregas.

Suporte de entrega única em bibliotecas de cliente

  • As bibliotecas de cliente compatíveis têm uma interface de confirmação com resposta Por exemplo: Go. Use essa interface para verificar se a solicitação de confirmação foi bem-sucedida. Se a solicitação de confirmação for bem-sucedida, os clientes têm a garantia de não receberão uma nova entrega. Se a solicitação de confirmação falhar, o os clientes podem esperar uma re-entrega.

  • Os clientes também podem usar as bibliotecas de cliente compatíveis sem as de confirmação de identidade virtual. No entanto, nesses casos, as falhas de confirmação podem levar a novas entregas silenciosas.

  • As bibliotecas de cliente suportadas têm interfaces para configurar o mínimo tempo de extensão do lease (exemplo: Go). Defina o valor da extensão de locação mínima como um número alto para evitar expirações de confirmação relacionadas à rede. O valor máximo é definido como 600 segundos.

Os valores e o intervalo padrão para as variáveis relacionadas à entrega "exatamente uma vez" e os nomes das variáveis podem diferir entre as bibliotecas de cliente. Para exemplo, na biblioteca de cliente Java, as seguintes variáveis controlam "exatamente uma vez".

Variável Descrição Valor
setEnableExactlyOnceDelivery Ativa ou desativa a entrega "exatamente uma vez". verdadeiro ou falso Padrão=false
minDurationPerAckExtension O tempo mínimo em segundos a ser usado para estender o prazo de confirmação de modificação. Intervalo=0 a 600 Padrão=nenhum
maxDurationPerAckExtension O tempo máximo em segundos a ser usado para estender o prazo de confirmação de modificação. Intervalo=0 a 600 Padrão=nenhum

No caso de uma única entrega, a modifyAckDeadline ou a acknowledgment solicitação ao Pub/Sub falha quando o ID de confirmação já expirou. Dessa forma, casos, o serviço considera o ID de confirmação expirado como inválido, já que um uma nova entrega pode já estar em andamento. Isso ocorre por padrão para o tipo "exatamente uma vez" entrega. Você verá que as solicitações acknowledgment e ModifyAckDeadline retornam um Resposta INVALID_ARGUMENT. Quando a entrega exatamente uma vez está desativada, esses As solicitações retornam OK nos casos de IDs de confirmação expirados.

Para garantir que as solicitações acknowledgment e ModifyAckDeadline tenham de confirmação de identidade, defina o valor para minDurationPerAckExtension como um número alto.

Criar assinaturas com entrega única

É possível criar uma assinatura com entrega única usando o console do Google Cloud, a Google Cloud CLI, a biblioteca de cliente ou a API Pub/Sub.

Assinatura de pull

Console

Para criar uma assinatura de pull com entrega única, siga estas etapas:

  1. No console do Google Cloud, acesse a página Assinaturas.

    Acessar "Assinaturas"

  2. Clique em Criar assinatura.

  3. Insira o ID da assinatura.

  4. Escolha ou crie um tópico no menu suspenso.

    A assinatura recebe mensagens do tópico.

  5. Na seção Entrega exatamente uma vez, selecione Ativar a entrega exatamente uma vez.

  6. Clique em Criar.

gcloud

Para criar uma assinatura de pull com entrega única, use o gcloud pubsub subscriptions create com a sinalização --enable-exactly-once-delivery:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Substitua:

  • SUBSCRIPTION_ID: o ID da assinatura a ser criada
  • TOPIC_ID: o ID do tópico a ser anexado à assinatura

REST

Para criar uma assinatura com entrega única, use o método projects.subscriptions.create .

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Substitua:

  • PROJECT_ID: o ID do projeto para criar a assinatura
  • SUBSCRIPTION_ID: o ID da assinatura a ser criada

Para criar uma assinatura de pull com entrega única, especifique este no corpo da solicitação:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Substitua:

  • PROJECT_ID: o ID do projeto com o tópico
  • TOPIC_ID: o ID do tópico a ser anexado à assinatura

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	})
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Antes de tentar esse exemplo, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Ruby

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Antes de tentar esse exemplo, siga as instruções de configuração do PHP em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

Assine com entrega de mensagens exatamente uma vez

Veja a seguir alguns exemplos de código para assinatura com entrega única usando a biblioteca de cliente.

Assinatura de pull

Go

Antes de testar este exemplo, siga as instruções de configuração do Go na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API Go do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Set MinExtensionPeriod high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Antes de testar este exemplo, siga as instruções de configuração do Java na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API Java do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Antes de testar este exemplo, siga as instruções de configuração do Node.js na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API Node.js do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Antes de testar este exemplo, siga as instruções de configuração do PHP na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API PHP do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Antes de testar este exemplo, siga as instruções de configuração do Python na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API Python do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Antes de testar este exemplo, siga as instruções de configuração do Ruby na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API Ruby do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Antes de testar este exemplo, siga as instruções de configuração do C++ na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API C++ do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Antes de testar este exemplo, siga as instruções de configuração do C# na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API C# do Pub/Sub documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Antes de testar esta amostra, siga as instruções de configuração do Node.js na Guia de início rápido do Pub/Sub usando bibliotecas de cliente. Para mais informações, consulte a API Pub/Sub Node.js documentação de referência.

Para autenticar no Pub/Sub, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Monitorar assinaturas de entrega única

O subscription/exactly_once_warning_count registra o número de eventos pode resultar em novos resgates (válidos ou duplicados). Essa métrica contabiliza vezes que o Pub/Sub não processa solicitações associadas de confirmação (solicitação ModifyAckDeadline ou acknowledgment). Os motivos a falha pode ser baseada no servidor ou no cliente. Por exemplo, se a persistência usada para manter a informação de entrega "exatamente uma vez" indisponível, ela é um evento baseado em servidor. Se o cliente tentar confirmar uma mensagem com um ID de confirmação inválido, seria um evento baseado em cliente.

Entender a métrica

subscription/exactly_once_warning_count captura eventos que podem ou não levam a reenvios reais e podem gerar ruído com base no comportamento do cliente. Para exemplo: solicitações acknowledgment ou ModifyAckDeadline repetidas com erros de confirmação incrementam a métrica repetidamente.

As métricas a seguir também são úteis para entender o comportamento do cliente:

  • subscription/expired_ack_deadlines_count mostra o número de expirações do ID de confirmação. Reconhecimento As expirações dos ID podem levar a falhas para ModifyAckDeadline e acknowledgment solicitações.

  • service.serviceruntime.googleapis.com/api/request_count pode ser usada para capturar falhas de ModifyAckDeadline ou acknowledgment quando elas chegam ao Google Cloud, mas não alcançar o Pub/Sub. Há falhas nas quais esta métrica não capture, por exemplo, quando os clientes são desconectados do Google Cloud.

Na maioria dos casos de falha que podem ser repetidas, as bibliotecas de cliente com suporte repetir a solicitação automaticamente.

Cotas

As assinaturas de entrega única estão sujeitas a cota adicional e cumprimento de requisitos regulatórios. Essas cotas são aplicadas a:

  • Número de mensagens consumidas de assinaturas com entrega exatamente uma vez ativados por região.
  • Número de mensagens confirmadas ou que têm o prazo estendido ao usar o assinaturas com a entrega única ativada por região.

Para obter mais informações sobre essas cotas, consulte a tabela na Cotas.

Entrega exatamente uma vez e assinaturas solicitadas

O Pub/Sub aceita entregas únicas com entrega ordenada.

Ao usar pedidos com entrega única, o Pub/Sub espera que as confirmações estejam em ordem. Se as confirmações estiverem fora de ordem, falha nas solicitações com erros temporários. Se o prazo de confirmação expire antes de uma confirmação em ordem para a entrega, o cliente receber uma nova entrega da mensagem. Por isso, quando você usa pedidos com uma única entrega, a capacidade de processamento do cliente é limitada a um pedido de mil mensagens por segundo.

Assinaturas de entrega e push exatamente uma vez

O Pub/Sub aceita entrega única apenas com assinaturas de pull.

Os clientes que consomem mensagens de assinaturas de push confirmam as mensagens respondendo às solicitações de push com uma resposta bem-sucedida. No entanto, os clientes não saber se a assinatura do Pub/Sub recebeu a resposta o processou. Isso é diferente das assinaturas de pull, em que a confirmação as solicitações são iniciadas pelos clientes e pela assinatura do Pub/Sub responde se a solicitação foi processada com sucesso. Por isso, semântica de entrega "exatamente uma vez" não se alinha bem com assinaturas de push.

Informações úteis

  • Se o prazo de confirmação não for especificado no momento de CreateSubscription, as assinaturas com a entrega ativada exatamente uma vez terão uma confirmação padrão de 60 segundos.

  • Prazos de confirmação padrão mais longos são benéficos para evitar reenvio causado por eventos de rede. As bibliotecas de cliente compatíveis não usam e o prazo padrão de confirmação da assinatura.

  • As assinaturas de entrega única têm um aumento significativa latência de publicação para inscrição em comparação com as assinaturas regulares.

  • Se você precisar de alta capacidade, seus clientes de entrega única também precisarão usar o pull de streaming.

  • Uma assinatura pode receber várias cópias da mesma mensagem devido a cópias laterais de publicação, mesmo com a entrega "exatamente uma vez" ativada. As duplicatas da publicação podem ocorrer devido a várias tentativas únicas de publicação pelo cliente de publicação ou o serviço do Pub/Sub. Várias publicações únicas pelo cliente editor, em novas tentativas, levam a reenvios com IDs de mensagem diferentes. Várias publicações únicas feitas pelo serviço Pub/Sub para responder a uma solicitação de publicação do cliente leva a reenvios com os mesmos IDs de mensagem.

  • É possível repetir falhas em subscription/exactly_once_warning_count, e as bibliotecas de cliente compatíveis repetirão essas tentativas automaticamente. No entanto, as falhas relacionadas a IDs de confirmação inválidos não podem ser repetida.