Consegna "exactly-once"

Questa pagina spiega come ricevere e confermare i messaggi utilizzando la funzionalità exactly-once di Pub/Sub, che ti consente di monitorare e impedire l'elaborazione duplicata dei messaggi. Quando la funzionalità è attivata, Pub/Sub fornisce la seguente semantica:

  • Gli abbonati possono determinare se i messaggi di conferma sono andati a buon fine.

  • Non si verifica alcuna riconsegna dopo la conferma del messaggio.

  • Non si verifica alcuna riconsegna quando un messaggio è in sospeso. Un messaggio viene considerato in sospeso fino alla scadenza della conferma o fino al riconosciuto.

  • In caso di più consegne valide, a causa della scadenza di conferma scadenza o conferma negativa avviata dal client, solo l'ultima L'ID di conferma può essere utilizzato per confermare il messaggio. Eventuali richieste con un ID di conferma precedente non vanno a buon fine.

Se il criterio "exactly-once" è abilitato, i sottoscrittori possono assicurarsi che i messaggi vengano elaborati tempo seguendo queste linee guida:

  • Conferma i messaggi entro la scadenza dell'accettazione.

  • Mantenere le informazioni sull'avanzamento dell'elaborazione di un messaggio fino al suo con esito positivo.

  • Utilizza le informazioni sull'avanzamento dell'elaborazione di un messaggio per evitare di duplicare il lavoro quando un ack non va a buon fine.

Solo il tipo di sottoscrizione pull supporta l'invio esattamente una volta, inclusi gli abbonati che utilizzano l'API StreamingPull. Le sottoscrizioni push ed esportazione non supportano la consegna "exactly-once".

Pub/Sub supporta la consegna "exactly-once", all'interno di una regione cloud, basata su un ID messaggio unico definito da Pub/Sub.

Ricaricamento e duplicato

È importante comprendere la differenza tra previsto e imprevisto e nuovi caricamenti.

  • Un nuovo invio può verificarsi a causa di un conferma di ricezione negativa di un messaggio avviata dal client o quando il client non estende la scadenza della conferma di ricezione del messaggio prima della scadenza della conferma di ricezione. Nuovi caricamenti sono considerati validi e il sistema funziona come previsto.

    Per risolvere i problemi relativi ai nuovi caricamenti, consulta la sezione Gestione dei duplicati.

  • Si verifica un duplicato quando un messaggio viene inviato nuovamente dopo una conferma di ricezione o prima della scadenza della scadenza di conferma.

  • Un messaggio ricaricato conserva lo stesso ID messaggio tra un tentativo di riconsegna e l'altro.

Le sottoscrizioni con la consegna "exactly-once" abilitata non ricevono duplicati delle consegne.

Supporto della distribuzione "exactly-once" nelle librerie client

  • Le librerie client supportate hanno un'interfaccia per l'acknowledgement con risposta (ad esempio Go). Puoi utilizzare questa interfaccia per verificare se la richiesta di conferma è andata a buon fine. Se la richiesta di conferma ha esito positivo, ai clienti viene garantito non riceveranno una nuova consegna. Se la richiesta di conferma non va a buon fine, i clienti possono aspettarsi una nuova consegna.

  • I client possono anche utilizzare le librerie client supportate senza l'interfaccia di conferma. Tuttavia, in questi casi, gli errori di conferma possono comportare il reinvio silenzioso dei messaggi.

  • Le librerie client supportate includono interfacce per l'impostazione del numero minimo durata dell'estensione del lease (esempio: Vai). Devi impostare un numero elevato per l'estensione di leasing minima per evitare scadenze di conferma relative alla rete. Il valore massimo è impostato su 600 secondi.

I valori e l'intervallo predefiniti per le variabili correlate alla pubblicazione "exactly-once" e nomi delle variabili potrebbero essere diversi nelle librerie client. Ad esempio, nella libreria client Java, le seguenti variabili controllano la pubblicazione esattamente una volta.

Variabile Descrizione Valore
setEnableExactlyOnceDelivery Attiva o disattiva la consegna "exactly-once". true o false Valore predefinito=false
minDurationPerAckExtension Il tempo minimo in secondi da utilizzare per estendere la scadenza di conferma della modifica. Intervallo = da 0 a 600. Valore predefinito = nessuno
maxDurationPerAckExtension Il tempo massimo in secondi da utilizzare per estendere la scadenza di conferma della modifica. Intervallo=da 0 a 600 Predefinito=nessuno

Nel caso di invio esattamente una volta, la richiesta modifyAckDeadline o acknowledgment a Pub/Sub non va a buon fine quando l'ID di conferma è già scaduto. In questi casi, il servizio considera l'ID di conferma scaduto non valido, poiché una spedizione più recente potrebbe essere già in corso. Questa funzionalità è progettata per "exactly-once" la distribuzione dei contenuti. Vedrai quindi che le richieste acknowledgment e ModifyAckDeadline restituiscono una risposta INVALID_ARGUMENT. Quando la consegna esattamente una volta è disattivata, queste richieste restituiscono OK in caso di ID di conferma scaduti.

Per assicurarti che le richieste acknowledgment e ModifyAckDeadline abbiano ID di conferma validi, ti consigliamo di impostare il valore di minDurationPerAckExtension su un numero elevato.

Considerazioni regionali

La garanzia di consegna "exactly-once" si applica solo quando gli abbonati si connettono al servizio nella stessa regione. Se l'applicazione dell'abbonato è distribuita in più regioni, può portare alla consegna duplicata dei messaggi, anche quando la pubblicazione "exactly-once" è abilitata. I publisher possono inviare messaggi a qualsiasi regione e esattamente una volta mantenuta la garanzia.

Quando esegui la tua applicazione all'interno di Google Cloud, per impostazione predefinita si connette all'endpoint Pub/Sub nella stessa regione. Pertanto, l'esecuzione dell'applicazione in una singola regione in Google Cloud in genere garantisce che tu stia interagendo con una singola regione.

Quando esegui l'applicazione dell'abbonato al di fuori di Google Cloud o in più regioni, puoi assicurarti di connetterti a una singola regione utilizzando un endpoint di località durante la configurazione di Pub/Sub di alto profilo. Tutti gli endpoint della località per Pub/Sub puntano al singolo regioni. Per saperne di più sugli endpoint geografici, consulta Endpoint Pub/Sub. Per un elenco di tutti gli endpoint di località per Pub/Sub, consulta Elenco di endpoint di località.

Creare sottoscrizioni con consegna "exactly-once"

Puoi creare una sottoscrizione con invio esattamente una volta utilizzando la console Google Cloud, Google Cloud CLI, la libreria client o l'API Pub/Sub.

Sottoscrizione pull

Console

Per creare un abbonamento pull con invio esattamente una volta:

  1. Nella console Google Cloud, vai alla pagina Abbonamenti.

    Vai agli abbonamenti

  2. Fai clic su Crea sottoscrizione.

  3. Inserisci l'ID abbonamento.

  4. Scegli o crea un argomento dal menu a discesa.

    La sottoscrizione riceve i messaggi dall'argomento.

  5. Nella sezione Consegna "exactly-once", seleziona Abilita la consegna "exactly-once".

  6. Fai clic su Crea.

gcloud

Per creare una sottoscrizione pull con la modalità exactly-once, utilizza il comando gcloud pubsub subscriptions create con il flag --enable-exactly-once-delivery:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID dell'abbonamento da creare
  • TOPIC_ID: l'ID dell'argomento da collegare alla sottoscrizione

REST

Per creare una sottoscrizione con consegna "exactly-once", utilizza il metodo projects.subscriptions.create .

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto in cui creare l'abbonamento
  • SUBSCRIPTION_ID: l'ID dell'abbonamento da creare

Per creare una sottoscrizione pull con invio esattamente una volta, specifica quanto segue nel corpo della richiesta:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto con l'argomento
  • TOPIC_ID: l'ID dell'argomento da collegare alla sottoscrizione

C++

Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Vai

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	})
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python Pub/Sub.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API PHP Pub/Sub.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

Abbonati con la consegna dei messaggi "exactly-once"

Di seguito sono riportati alcuni esempi di codice per l'iscrizione con invio esattamente una volta utilizzando la libreria client.

Sottoscrizione pull

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

Per eseguire l'autenticazione su Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/api/option"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()

	// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
	// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
	client, err := pubsub.NewClient(ctx, projectID, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Set MinExtensionPeriod high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni per la configurazione di Java nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Java.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni per la configurazione di Node.js nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Node.js.

Per eseguire l'autenticazione su Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Prima di provare questo esempio, segui le istruzioni per la configurazione di PHP nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub PHP.

Per eseguire l'autenticazione su Pub/Sub, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni per la configurazione di Python nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta API Pub/Sub Python documentazione di riferimento.

Per eseguire l'autenticazione su Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Prima di provare questo esempio, segui le istruzioni per la configurazione di Ruby nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta API Pub/Sub Ruby documentazione di riferimento.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Prima di provare questo esempio, segui le istruzioni per la configurazione di C++ nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

Per eseguire l'autenticazione su Pub/Sub, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Prima di provare questo esempio, segui le istruzioni per la configurazione di C# nel Guida rapida di Pub/Sub con librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C#.

Per autenticarti a Pub/Sub, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, vedi Configura l'autenticazione per un ambiente di sviluppo locale.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler. 
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida di Pub/Sub che utilizza le librerie client. Per ulteriori informazioni, consulta API Node.js Pub/Sub documentazione di riferimento.

Per eseguire l'autenticazione su Pub/Sub, configura le credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Monitorare le iscrizioni alla consegna "exactly-once"

La subscription/exactly_once_warning_count registra il numero di eventi che possono comportare possibili pubblicazioni di nuovo (valide o duplicate). Questa metrica conteggia le volte in cui Pub/Sub non riesce a elaborare le richieste associate agli ID di conferma (richiesta ModifyAckDeadline o acknowledgment). I motivi dell'errore possono essere basati sul server o sul client. Ad esempio, se il livello di persistenza utilizzato per mantenere le informazioni di invio esattamente una volta non è disponibile, si tratta di un evento basato su server. Se il client prova a confermare la ricezione di un messaggio con ID di conferma non valido, si tratta di un evento basato su client.

Comprendere la metrica

subscription/exactly_once_warning_count acquisisce gli eventi che potrebbero o meno che generano effettivamente ripubblicazioni e può generare rumore in base al comportamento del cliente. Ad esempio, richieste acknowledgment o ModifyAckDeadline ripetute con ID di conferma non validi incrementano la metrica ripetutamente.

Anche le seguenti metriche sono utili per comprendere il comportamento del cliente:

  • subscription/expired_ack_deadlines_count La metrica mostra il numero di scadenze degli ID di conferma. Riconoscimento Le scadenze degli ID possono determinare errori sia per ModifyAckDeadline che per acknowledgment richieste.

  • La metrica service.serviceruntime.googleapis.com/api/request_count può essere utilizzata per rilevare gli errori delle richieste ModifyAckDeadline o acknowledgment nei casi in cui le richieste raggiungono Google Cloud, ma non raggiungono Pub/Sub. Si sono verificati errori che non consentono di acquisizione, ad esempio quando i client sono disconnessi da Google Cloud.

Nella maggior parte dei casi di eventi di errore che è possibile ritentare, le librerie client supportate riprova a eseguire la richiesta automaticamente.

Quote

Gli abbonamenti per la consegna "exactly-once" sono soggetti a quota aggiuntiva i tuoi requisiti. Queste quote vengono applicate a:

  • Numero di messaggi consumati dalle sottoscrizioni con consegna "exactly-once" abilitate per regione.
  • Numero di messaggi confermati o la cui scadenza è stata estesa quando si utilizzano i suddetti abbonamenti con la consegna "exactly-once" abilitata per regione.

Per ulteriori informazioni su queste quote, consulta la tabella Quote.

Consegna "exactly-once" e iscrizioni ordinate

Pub/Sub supporta la consegna "exactly-once" con consegna ordinata.

Quando utilizzi l'ordinamento con la consegna "exactly-once", Pub/Sub si aspetta che i riconoscimenti siano in ordine. Se le conferme non sono nell'ordine corretto, i non riesce le richieste con errori temporanei. Se la scadenza di conferma scadono prima di una conferma di consegna in ordine, il cliente ricevere una nuova consegna del messaggio. Per questo motivo, quando utilizzi l'ordinamento con la pubblicazione esattamente una volta, il throughput del client è limitato a un ordine di migliaia di messaggi al secondo.

Consegna "exactly-once" e iscrizioni push

Pub/Sub supporta la consegna "exactly-once" solo con le sottoscrizioni pull.

I client che utilizzano i messaggi delle sottoscrizioni push riconoscono i messaggi rispondendo alle richieste push con esito positivo. Tuttavia, i client non sanno se l'abbonamento Pub/Sub ha ricevuto la risposta e la ha elaborata. È un'operazione diversa rispetto alle sottoscrizioni pull, in cui l'accettazione vengono avviate dai client e dalla sottoscrizione Pub/Sub risponde se la richiesta è stata elaborata correttamente. Per questo motivo, la semantica di consegna "exactly-once" non è molto in linea con le iscrizioni push.

Aspetti da tenere presenti

  • Se la scadenza della conferma non è specificata al momento della creazione di una sottoscrizione, Gli abbonamenti abilitati per l'invio "exactly-once" avranno un riconoscimento predefinito una scadenza di 60 secondi.

  • Scadenza di conferma predefinite più lunghe sono utili per evitare la reimportazione causata da eventi di rete. Le librerie client supportate non utilizzano scadenza predefinita di conferma dell'abbonamento.

  • Gli abbonamenti per la consegna "exactly-once" hanno registrato un aumento la latenza da pubblicazione a sottoscrizione rispetto alle normali sottoscrizioni.

  • Se hai bisogno di una maggiore velocità in uscita, i client di importazione esattamente una volta devono utilizzare anche lo streaming pull.

  • Una sottoscrizione potrebbe ricevere più copie dello stesso messaggio a causa della presenza di duplicati sul lato della pubblicazione, anche con la consegna "exactly-once" attivata. I duplicati sul lato pubblicazione possono essere dovuti a più tentativi di pubblicazione univoci da parte del di pubblicazione o il servizio Pub/Sub. La presenza di più pubblicazioni univoche da parte del client di pubblicazione, a seguito di diversi nuovi tentativi, comporta ulteriori caricamenti con ID messaggio diversi. Più pubblicazioni univoche del servizio Pub/Sub, per rispondere a una richiesta di pubblicazione del client, comportano riconsegne con gli stessi ID messaggio.

  • Puoi riprovare a eseguire gli errori in subscription/exactly_once_warning_count e nelle librerie client supportate questi nuovi tentativi automaticamente. Tuttavia, gli errori relativi a ID di conferma non validi non possono da riprovare.