Diffusion de type "exactement une fois"

Cette page explique comment recevoir et confirmer des messages à l'aide de la fonctionnalité de traitement de type "exactement une fois" de Pub/Sub, qui vous permet de suivre et d'éviter le traitement en double des messages. Lorsque la fonctionnalité est activée, Pub/Sub fournit la sémantique suivante:

  • Les abonnés peuvent déterminer si les accusés de réception des messages ont réussi.

  • Aucune nouvelle diffusion n'est effectuée une fois le message confirmé.

  • Aucune nouvelle diffusion n'est effectuée tant qu'un message est en attente. Un message est considéré comme en attente jusqu'à l'expiration du délai d'accusé de réception ou jusqu'à ce que le message soit accusé de réception.

  • En cas de diffusions valides multiples, en raison de l'expiration du délai d'accusé de réception ou d'un accusé de réception négatif déclenché par le client, seul le dernier ID d'accusé de réception peut être utilisé pour accuser réception du message. Toutes les requêtes avec un ID de confirmation précédent échouent.

Lorsque le traitement "exactement une fois" est activé, les abonnés peuvent s'assurer que les messages sont traités une seule fois en suivant ces consignes:

  • Confirmez les messages dans le délai de confirmation.

  • Conservez les informations sur l'avancement du traitement d'un message jusqu'à ce qu'il soit confirmé.

  • Utilisez les informations sur la progression du traitement d'un message pour éviter les doublons lorsque l'acquittement échoue.

Seul le type d'abonnement pull est compatible avec la distribution "exactement une fois", y compris les abonnés qui utilisent l'API StreamingPull. Les abonnements push et d'exportation ne sont pas compatibles avec la distribution de type "exactement une fois".

Pub/Sub prend en charge la distribution de type "exactement une fois", dans une région cloud, sur la base d'un ID de message unique défini par Pub/Sub.

Nouvel envoi et doublon

Il est important de bien comprendre la différence entre les nouvelles diffusions attendues et inattendues.

  • Une nouvelle distribution peut se produire en raison d'un accusé de réception négatif d'un message initié par le client ou lorsque le client ne prolonge pas le délai d'accusé de réception du message avant son expiration. Les nouvelles livraisons sont considérées comme valides et le système fonctionne comme prévu.

    Pour résoudre les problèmes de nouvelle diffusion, consultez Gérer les doublons.

  • Un message est considéré comme un doublon lorsqu'il est renvoyé après un accusé de réception réussi ou avant l'expiration du délai d'expiration de l'accusé de réception.

  • Un message réexpédié conserve le même ID de message entre les tentatives de réexpédition.

Les abonnements pour lesquels la distribution de type "exactement une fois" est activée ne reçoivent pas de diffusions en double.

Compatibilité avec la distribution de type "exactement une fois" dans les bibliothèques clientes

  • Les bibliothèques clientes compatibles disposent d'une interface pour l'acquittement avec réponse (par exemple, Go). Vous pouvez utiliser cette interface pour vérifier si la requête d'acquittement a abouti. Si la requête d'acquittement aboutit, les clients ne reçoivent pas de nouvelle diffusion. Si la demande d'acquittement échoue, les clients peuvent s'attendre à une nouvelle livraison.

  • Les clients peuvent également utiliser les bibliothèques clientes compatibles sans l'interface d'acquittement. Toutefois, dans ce cas, les échecs de confirmation peuvent entraîner des nouvelles diffusions silencieuses des messages.

  • Les bibliothèques clientes compatibles disposent d'interfaces permettant de définir la durée minimale de prolongation du bail (par exemple, Go). Vous devez définir la valeur de l'extension de bail minimum sur un nombre élevé pour éviter toute expiration de l'acquittement liée au réseau. La valeur maximale est définie sur 600 secondes.

Les valeurs et la plage par défaut des variables liées à la diffusion exactement une fois, ainsi que les noms des variables, peuvent varier d'une bibliothèque cliente à l'autre. Par exemple, dans la bibliothèque cliente Java, les variables suivantes contrôlent la diffusion exactement une fois.

Variable Description Valeur
setEnableExactlyOnceDelivery Active ou désactive la distribution de type "exactement une fois". "true" ou "false" (par défaut : "false")
minDurationPerAckExtension Durée minimale, en secondes, à utiliser pour prolonger le délai de confirmation de la modification. Plage : 0 à 600 ; valeur par défaut : aucune
maxDurationPerAckExtension Durée maximale (en secondes) à utiliser pour prolonger le délai de confirmation de la modification. Plage : 0 à 600 ; valeur par défaut : aucune

En cas de distribution de type "exactement une fois", la requête modifyAckDeadline ou acknowledgment envoyée à Pub/Sub échoue lorsque l'ID d'acquittement a déjà expiré. Dans ce cas, le service considère l'ID de confirmation expiré comme non valide, car une diffusion plus récente est peut-être déjà en cours. Cela est volontaire pour la diffusion "exactement une fois". Vous voyez ensuite que les requêtes acknowledgment et ModifyAckDeadline renvoient une réponse INVALID_ARGUMENT. Lorsque la diffusion de type "exactement une fois" est désactivée, ces requêtes renvoient OK en cas d'ID de confirmation expirés.

Pour vous assurer que les requêtes acknowledgment et ModifyAckDeadline disposent d'ID de confirmation valides, envisagez de définir la valeur de minDurationPerAckExtension sur un nombre élevé.

Points à prendre en compte concernant les régions

La garantie de distribution de type "exactement une fois" ne s'applique que lorsque les abonnés se connectent au service dans la même région. Si votre application d'abonné est répartie sur plusieurs régions, cela peut entraîner la distribution de messages en double, même lorsque la distribution de type "exactement une fois" est activée. Les éditeurs peuvent envoyer des messages à n'importe quelle région, et la garantie d'envoi exact est toujours maintenue.

Lorsque vous exécutez votre application dans Google Cloud, elle se connecte par défaut au point de terminaison Pub/Sub de la même région. Par conséquent, exécuter votre application dans une seule région de Google Cloudvous garantit généralement d'interagir avec une seule région.

Lorsque vous exécutez votre application d'abonné en dehors de Google Cloudou dans plusieurs régions, vous pouvez vous assurer de vous connecter à une seule région à l'aide d'un point de terminaison géographique lorsque vous configurez votre client Pub/Sub. Tous les points de terminaison d'emplacement pour Pub/Sub pointent vers des régions uniques. Pour en savoir plus sur les points de terminaison géographiques, consultez la section Points de terminaison Pub/Sub. Pour obtenir la liste de tous les points de terminaison localisés pour Pub/Sub, consultez la section Liste des points de terminaison localisés.

Créer des abonnements avec la distribution de type "exactement une fois"

Vous pouvez créer un abonnement avec une diffusion exactement une fois à l'aide de la console Google Cloud , de la Google Cloud CLI, de la bibliothèque cliente ou de l'API Pub/Sub.

Abonnement pull

Console

Pour créer un abonnement pull avec envoi exactement une fois, procédez comme suit:

  1. Dans la console Google Cloud , accédez à la page Abonnements.

    Accéder aux abonnements

  2. Cliquez sur Créer un abonnement.

  3. Saisissez l'ID de l'abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant.

    L'abonnement reçoit les messages du sujet.

  5. Dans la section Distribution de type "exactement une fois", sélectionnez Activer la distribution de type "exactement une fois".

  6. Cliquez sur Créer.

gcloud

Pour créer un abonnement pull avec une diffusion exactement une fois, exécutez la commande gcloud pubsub subscriptions create avec l'option --enable-exactly-once-delivery:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID de l'abonnement à créer
  • TOPIC_ID : ID du sujet à associer à l'abonnement

REST

Pour créer un abonnement avec une distribution "exactly-once", utilisez la méthode projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet dans lequel créer l'abonnement
  • SUBSCRIPTION_ID : ID de l'abonnement à créer

Pour créer un abonnement pull avec une distribution "exactly-once", spécifiez-le dans le corps de la requête:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet contenant le sujet.
  • TOPIC_ID : ID du sujet à associer à l'abonnement

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	})
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

S'abonner avec la distribution de messages de type "exactement une fois"

Vous trouverez ci-dessous quelques exemples de code pour vous abonner avec une diffusion exactement une fois à l'aide de la bibliothèque cliente.

Abonnement pull

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()

	// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Set MinExtensionPeriod high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
  apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration pour PHP du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub PHP.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration pour Ruby du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Avant d'essayer cet exemple, suivez les instructions de configuration pour C# du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub C#.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
const pubSubClient = new PubSub({
  apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
});

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Surveiller les abonnements de diffusion de type "exactement une fois"

La métrique subscription/exactly_once_warning_count enregistre le nombre d'événements pouvant entraîner des nouvelles diffusions (valides ou en double). Cette métrique compte les échecs de traitement des requêtes associées à des ID de confirmation (requête ModifyAckDeadline ou acknowledgment). Les raisons de l'échec peuvent être liées au serveur ou au client. Par exemple, si la couche de persistance utilisée pour gérer les informations de diffusion "exactement une fois" n'est pas disponible, il s'agit d'un événement basé sur le serveur. Si le client tente de confirmer un message avec un ID de confirmation non valide, il s'agit d'un événement basé sur le client.

Comprendre la métrique

subscription/exactly_once_warning_count capture les événements qui peuvent ou non entraîner des nouvelles diffusions et peut être bruyant en fonction du comportement du client. Par exemple, les requêtes acknowledgment ou ModifyAckDeadline répétées avec des ID de confirmation non valides incrémentent la métrique à plusieurs reprises.

Les métriques suivantes sont également utiles pour comprendre le comportement des clients:

  • La métrique subscription/expired_ack_deadlines_count indique le nombre d'expirations d'ID de confirmation. L'expiration des ID de confirmation peut entraîner des échecs pour les requêtes ModifyAckDeadline et acknowledgment.

  • La métrique service.serviceruntime.googleapis.com/api/request_count peut être utilisée pour capturer les échecs des requêtes ModifyAckDeadline ou acknowledgment lorsque les requêtes atteignent Google Cloud , mais n'atteignent pas Pub/Sub. Il existe des échecs que cette métrique ne capturera pas, par exemple lorsque les clients sont déconnectés de Google Cloud.

Dans la plupart des cas d'événements d'échec pouvant être réessayés, les bibliothèques clientes compatibles relancent automatiquement la requête.

Quotas

Les abonnements de distribution de type "exactement une fois" sont soumis à des exigences de quota supplémentaires. Ces quotas s'appliquent aux éléments suivants:

  • Nombre de messages consommés à partir d'abonnements avec la distribution de type "exactement une fois" activée par région.
  • Nombre de messages confirmés ou dont la date limite est prolongée lorsque vous utilisez des abonnements avec distribution "exactement une fois" activée par région.

Pour en savoir plus sur ces quotas, consultez le tableau de la section Quotas.

Diffusion de type "exactement une fois" et abonnements ordonnés

Pub/Sub est compatible avec la distribution de type "exactement une fois" avec la distribution ordonnée.

Lorsque vous utilisez la distribution de type "exactement une fois", Pub/Sub s'attend à ce que les accusés de réception soient dans l'ordre. Si les accusés de réception sont dans le désordre, le service échoue les requêtes avec des erreurs temporaires. Si le délai de confirmation expire avant qu'un accusé de réception dans l'ordre de livraison ne soit reçu, le client recevra une nouvelle diffusion du message. Par conséquent, lorsque vous utilisez la commande avec une distribution exactement une fois, le débit client est limité à un ordre de mille messages par seconde.

Diffusion de type "exactement une fois" et abonnements push

Pub/Sub n'est compatible avec la distribution de type "exactement une fois" que pour les abonnements pull.

Les clients qui consomment des messages provenant des abonnements push confirment les messages en répondant aux requêtes push par une réponse de réussite. Toutefois, les clients ne savent pas si l'abonnement Pub/Sub a reçu la réponse et l'a traitée. Cela diffère des abonnements pull, où les requêtes d'acquittement sont lancées par les clients et que l'abonnement Pub/Sub répond si la requête a bien été traitée. Par conséquent, la sémantique de distribution de type "exactement une fois" ne s'accorde pas bien avec les abonnements push.

Bon à savoir

  • Si le délai d'expiration de l'accusé de réception n'est pas spécifié au moment de la création de l'abonnement, le délai d'expiration de l'accusé de réception par défaut est de 60 secondes pour les abonnements pour lesquels la diffusion de type "exactement une fois" est activée.

  • Les délais de confirmation par défaut plus longs sont utiles pour éviter la nouvelle diffusion causée par des événements réseau. Les bibliothèques clientes compatibles n'utilisent pas le délai de confirmation d'abonnement par défaut.

  • Les abonnements avec distribution de type "exactement une fois" ont une latence de publication vers l'abonnement nettement plus élevée que les abonnements standards.

  • Si vous avez besoin d'un débit élevé, vos clients de diffusion "exactement une fois" doivent également utiliser le flux pull.

  • Un abonnement peut recevoir plusieurs copies du même message en raison de doublons côté publication, même si la diffusion de type "exactement une fois" est activée. Les doublons côté publication peuvent être dus à plusieurs tentatives de publication uniques par le client de publication ou le service Pub/Sub. Plusieurs publications uniques par le client de publication, lors des nouvelles tentatives, entraînent des nouvelles diffusions avec différents ID de message. La publication unique multiple du service Pub/Sub pour répondre à une requête de publication client entraîne des nouvelles diffusions avec les mêmes ID de message.

  • Vous pouvez réessayer les échecs dans subscription/exactly_once_warning_count, et les bibliothèques clientes compatibles les réessaient automatiquement. Toutefois, les échecs liés à des ID de confirmation non valides ne peuvent pas être réessayés.