Diffusion de type "exactement une fois"

Cette page explique comment recevoir et accuser réception des messages à l'aide de la méthode "exactement une fois" la sémantique. Seul le type d'abonnement pull est compatible avec la distribution "exactement une fois", y compris les abonnés qui utilisent API StreamingPull :

Transférer et exporter des abonnements ne prennent pas en charge la distribution de type "exactement une fois".

Diffusion de type "exactement une fois"

Pub/Sub permet la distribution "exactement une fois" dans une région cloud, en fonction d'une règle de pare-feu unique définie par Pub/Sub ID du message.

Lorsque cette fonctionnalité est activée, Pub/Sub fournit les éléments suivants:

  • Aucune nouvelle distribution n'est effectuée une fois le message confirmé.

  • Aucune nouvelle distribution n'est effectuée tant qu'un message est en attente. Un message est considéré en attente jusqu'à l'expiration du délai de confirmation ou jusqu'à ce que le message soit confirmé.

  • En cas de plusieurs envois valides, en raison du délai de confirmation ou accusé de réception négatif déclenché par le client, uniquement L'ID d'accusé de réception peut être utilisé pour accuser réception du message. Toute requête avec d'un ID d'accusé de réception précédent.

Nouvelle distribution ou doublon

Il est important de comprendre la différence entre les prévisions et les imprévus à nouveau.

  • Une nouvelle livraison peut se produire en cas de refus à l'initiative du client accusé de réception d'un message ou lorsque le client n'étend pas l'accusé de réception délai du message avant l'expiration du délai d'accusé de réception. Nouvelles livraisons sont considérés comme valides et fonctionnent comme prévu.

    Pour résoudre les problèmes liés aux renvois, consultez la section Gérer les doublons.

  • Un doublon se produit lorsqu'un message est renvoyé après un accusé de réception réussi ou avant l'expiration du délai d'accusé de réception.

  • Un message distribué conserve le même ID entre deux tentatives.

Les abonnements pour lesquels la distribution de type "exactement une fois" est activée ne reçoivent pas de doublons livraisons.

Prise en charge de la distribution de type "exactement une fois" dans les bibliothèques clientes

  • Les bibliothèques clientes compatibles disposent d'une interface de confirmation avec réponse (par exemple, Go). Vous pouvez utiliser cette interface pour vérifier si la demande de confirmation a abouti. Si la demande d'accusé de réception aboutit, les clients sont assurés de ne pas recevoir de nouvelle livraison. Si la demande de confirmation échoue, le les clients peuvent s'attendre à une nouvelle livraison.

  • Les clients peuvent également utiliser les bibliothèques clientes compatibles sans le d'accusé de réception. Cependant, dans de tels cas, les échecs de confirmation peuvent entraîner une nouvelle distribution silencieuse des messages.

  • Les bibliothèques clientes compatibles disposent d'interfaces permettant de définir la durée de prolongation de bail (exemple: OK). Vous devez définir une valeur élevée pour l'extension de location minimale. pour éviter toute expiration d'accusé de réception liée au réseau. La valeur maximale est définie sur 600 secondes.

Valeurs et plage par défaut des variables associées à la distribution de type "exactement une fois" et les noms des variables peuvent différer selon les bibliothèques clientes. Pour exemple, dans la bibliothèque cliente Java, les variables suivantes contrôlent une distribution de type "exactement une fois".

Variable Description Valeur
setEnableExactlyOnceDelivery Active ou désactive la distribution "exactement une fois". true ou false Default=false
minDurationPerAckExtension Durée minimale, en secondes, pour prolonger le délai d'accusé de réception des modifications. Plage=0 à 600 Par défaut=aucun
maxDurationPerAckExtension Durée maximale, en secondes, pour prolonger le délai d'accusé de réception des modifications. Plage=0 à 600 Par défaut=aucun

Dans le cas d'une distribution de type "exactement une fois", modifyAckDeadline ou acknowledgment La requête envoyée à Pub/Sub échoue lorsque l'ID d'accusé de réception a déjà expiré. Dans ces le service considère l'ID d'accusé de réception expiré comme non valide, une livraison plus récente est peut-être déjà en cours. C'est la conception pour une fois la livraison. Les requêtes acknowledgment et ModifyAckDeadline renvoient ensuite INVALID_ARGUMENT. Lorsque la distribution de type "exactement une fois" est désactivée, ces renvoient OK si les ID d'accusé de réception ont expiré.

Pour garantir que les requêtes acknowledgment et ModifyAckDeadline sont valides d'accusé de réception, pensez à définir la valeur minDurationPerAckExtension sur un nombre élevé.

Créer des abonnements avec distribution de type "exactement une fois"

Vous pouvez créer un abonnement avec distribution de type "exactement une fois" à l'aide de la console Google Cloud, de Google Cloud CLI, de la bibliothèque cliente ou de l'API Pub/Sub.

Abonnement pull

Console

Pour créer un abonnement pull avec distribution de type "exactement une fois", procédez comme suit:

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder aux abonnements

  2. Cliquez sur Créer un abonnement.

  3. Saisissez l'ID de l'abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant.

    L'abonnement reçoit les messages du sujet.

  5. Dans la section Distribution de type "exactement une fois", sélectionnez Activer la distribution de type "exactement une fois".

  6. Cliquez sur Créer.

gcloud

Pour créer un abonnement pull avec distribution de type "exactement une fois", utilisez la méthode gcloud pubsub subscriptions create avec l'option --enable-exactly-once-delivery:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Remplacez les éléments suivants :

  • SUBSCRIPTION_ID : ID de l'abonnement à créer
  • TOPIC_ID : ID du sujet à associer à l'abonnement

REST

Pour créer un abonnement avec distribution de type "exactement une fois", utilisez la méthode projects.subscriptions.create .

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet dans lequel créer l'abonnement
  • SUBSCRIPTION_ID : ID de l'abonnement à créer

Pour créer un abonnement pull avec distribution de type "exactement une fois", spécifiez ce dans le corps de la requête:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet contenant le sujet.
  • TOPIC_ID : ID du sujet à associer à l'abonnement

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	})
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

S'abonner avec la distribution de messages "exactement une fois"

Voici quelques exemples de code pour l'abonnement avec distribution de type "exactement une fois" à l'aide de la bibliothèque cliente.

Abonnement pull

Go

Avant d'essayer cet exemple, suivez les instructions de configuration de Go dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API Go Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Set MinExtensionPeriod high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration de Java dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API Java Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API Node.js Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration de PHP dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API PHP Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration de Python dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API Python Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration de Ruby dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API Ruby Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Avant d'essayer cet exemple, suivez les instructions de configuration de C++ dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API C++ Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Avant d'essayer cet exemple, suivez les instructions de configuration de C# dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API C# Pub/Sub documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js dans le Guide de démarrage rapide de Pub/Sub bibliothèques clientes. Pour en savoir plus, consultez les API Pub/Sub Node.js documentation de référence.

Pour vous authentifier auprès de Pub/Sub, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Surveiller les abonnements à distribution de type "exactement une fois"

La subscription/exactly_once_warning_count enregistre le nombre d'événements peut entraîner d'éventuelles renvois (valides ou en double). Cette métrique comptabilise les de fois que Pub/Sub ne parvient pas à traiter les requêtes associées à ID d'accusé de réception (requête ModifyAckDeadline ou acknowledgment). Raisons l'origine de l'échec peuvent être liés au serveur ou au client. Par exemple, si la persistance utilisée pour gérer l'indisponibilité des informations de livraison de type "exactement une fois", serait un événement basé sur le serveur. Si le client tente d'accuser réception d'un message avec un ID d'accusé de réception non valide, il s'agit d'un événement basé sur le client.

Comprendre la métrique

subscription/exactly_once_warning_count capture des événements qui peuvent ou non de nouvelles livraisons et peut générer du bruit en fonction du comportement du client. Pour exemple: requêtes acknowledgment ou ModifyAckDeadline répétées avec des Les ID d'accusé de réception incrémentent la métrique de manière répétée.

Les métriques suivantes sont également utiles pour comprendre le comportement des clients:

  • subscription/expired_ack_deadlines_count indique le nombre d'expirations d'ID d'accusé de réception. Reconnaissance L'expiration des ID peut entraîner des échecs pour ModifyAckDeadline et Requêtes acknowledgment.

  • service.serviceruntime.googleapis.com/api/request_count peut être utilisée pour capturer les défaillances de ModifyAckDeadline ou acknowledgment lorsqu'elles parviennent à Google Cloud, mais pas atteindre Pub/Sub. Cette métrique ne peut pas être utilisée par exemple, lorsque les clients sont déconnectés de Google Cloud.

Dans la plupart des cas d'événements d'échec pouvant faire l'objet de nouvelles tentatives, les bibliothèques clientes compatibles relancer la requête automatiquement.

Quotas

Les abonnements à la distribution de type "exactement une fois" sont soumis à des quotas supplémentaires. exigences. Ces quotas s'appliquent aux éléments suivants:

  • Nombre de messages consommés par des abonnements avec distribution de type "exactement une fois" activées par région.
  • Nombre de messages confirmés ou dont le délai est prolongé lors de l'utilisation avec la distribution de type "exactement une fois" activée par région.

Pour en savoir plus sur ces quotas, consultez le tableau Quotas.

Distribution de type "exactement une fois" et abonnements commandés

Pub/Sub permet la distribution de type "exactement une fois" avec la distribution commandée.

Dans le cas d'une commande avec distribution de type "exactement une fois", Pub/Sub s'attend à ce que les accusés de réception doivent être dans l’ordre. Si les accusés de réception sont dans le désordre, les requêtes échouent avec des erreurs temporaires. Si le délai de confirmation expire avant qu'un accusé de réception de la livraison soit envoyé, le client que le message soit distribué à nouveau. C'est pourquoi, lorsque vous utilisez la commande distribution de type "exactement une fois", le débit du client est limité à des milliers messages par seconde.

Distribution de type "exactement une fois" et abonnements push

Pub/Sub n'accepte la distribution "exactement une fois" qu'avec les abonnements pull.

Les clients qui consomment les messages des abonnements push accusent réception des messages en répondant aux requêtes push avec une réponse positive. Toutefois, les clients ne savent pas si l'abonnement Pub/Sub a reçu la réponse l'a traité. Cela diffère des abonnements pull, où l'accusé de réception sont initiées par les clients et l'abonnement Pub/Sub répond si la requête a été correctement traitée. Pour cette raison, la sémantique de distribution de type "exactement une fois" ne concorde pas bien avec les abonnements push.

Bon à savoir

  • Si aucun délai d'accusé de réception n'est spécifié au moment de CreateSubscription, les abonnements avec distribution de type "exactement une fois" ont un accusé de réception par défaut de 60 secondes.

  • Des délais d'accusé de réception par défaut plus longs permettent d'éviter en raison d'événements réseau. Les bibliothèques clientes compatibles n'utilisent pas délai par défaut de confirmation de l'abonnement.

  • Les abonnements à la livraison de type exactement une fois ont considérablement augmenté de publication et d'abonnement par rapport aux abonnements standards.

  • Si vous avez besoin d'un débit élevé, vos clients de distribution de type "exactement une fois" doivent également utiliser le traitement pull en flux continu.

  • Un abonnement peut recevoir plusieurs copies du même message en raison de doubles du côté de la publication, même si la distribution "exactement une fois" est activée. Les doublons côté publication peuvent être dus à plusieurs tentatives de publication uniques ou le service Pub/Sub. Les publications uniques multiples par le client éditeur, lors des nouvelles tentatives, entraînent de nouvelles livraisons avec des ID de message différents. Les publications uniques multiples par le service Pub/Sub, pour répondre à une requête de publication client, entraînent de nouvelles livraisons avec les mêmes ID de message.

  • Vous pouvez relancer les échecs dans subscription/exactly_once_warning_count, et les bibliothèques clientes compatibles relancent ces automatiquement. Toutefois, les échecs liés à des ID d'accusé de réception non valides ne peuvent pas une nouvelle tentative.