Consegna "exactly-once"

Questa pagina spiega come ricevere e confermare i messaggi utilizzando la semantica "exactly-once". Solo il tipo di sottoscrizione pull supporta la distribuzione "exactly-once", inclusi gli abbonati che utilizzano l'API StreamingPull.

Le sottoscrizioni di push ed esportazione non supportano la consegna "exactly-once".

Consegna "exactly-once"

Pub/Sub supporta la consegna "exactly-once", all'interno di una regione cloud, in base a un ID messaggio univoco definito da Pub/Sub.

Quando la funzionalità è abilitata, Pub/Sub fornisce quanto segue:

  • Una volta che il messaggio è stato confermato, non si verifica alcun nuovo recapito.

  • Quando un messaggio è in sospeso, non viene eseguito alcun nuovo recapito. Un messaggio viene considerato in sospeso fino alla scadenza della scadenza della conferma o fino alla sua conferma.

  • In caso di più consegne valide, a causa della scadenza della scadenza della conferma o della conferma negativa avviata dal client, è possibile utilizzare solo l'ID di conferma più recente per confermare il messaggio. Eventuali richieste con un ID di conferma precedente non andranno a buon fine.

Ripubblicazione e duplicazione

È importante capire la differenza tra caricamenti previsti e ricaricati inaspettati.

  • Una nuova consegna può verificarsi a causa della conferma negativa di un messaggio avviata dal client o quando il client non estende la scadenza per la conferma del messaggio prima della scadenza della conferma. I ricaricamenti sono considerati validi e il sistema funziona come previsto.

    Per risolvere i problemi relativi ai nuovi caricamenti, consulta Gestire i duplicati e forzare i nuovi tentativi.

  • Si verifica un duplicato quando un messaggio viene inviato nuovamente dopo una conferma di accettazione o prima della scadenza della scadenza della conferma.

  • Un messaggio ricaricato conserva lo stesso ID messaggio tra un tentativo di nuova consegna e l'altro.

Gli abbonamenti con la consegna "exactly-once" abilitata non ricevono caricamenti duplicati.

Supporto per la distribuzione "exactly-once" nelle librerie client

  • Le librerie client supportate hanno un'interfaccia per il riconoscimento con risposta (esempio: Go). Puoi utilizzare questa interfaccia per verificare se la richiesta di conferma è andata a buon fine. Se la richiesta di conferma ha esito positivo, i client hanno la certezza che non riceveranno una nuova consegna. Se la richiesta di conferma non va a buon fine, i clienti possono aspettarsi una nuova pubblicazione.

  • I client possono utilizzare anche le librerie client supportate senza l'interfaccia di conferma. Tuttavia, in questi casi, gli errori di conferma possono comportare il reinvio silenzioso dei messaggi.

  • Le librerie client supportate hanno interfacce per l'impostazione del tempo minimo di estensione per il lease (ad esempio: Go). Devi impostare un numero elevato per l'estensione di lease minima per evitare scadenze di conferma correlate alla rete. Il valore massimo è impostato su 600 secondi.

I valori predefiniti e l'intervallo per le variabili relative alla pubblicazione "exactly-once" e i nomi delle variabili potrebbero variare a seconda delle librerie client. Ad esempio, nella libreria client Java, le seguenti variabili controllano la distribuzione "exactly-once".

Variabile Descrizione Valore
setEnableExactlyOnceDelivery Attiva o disattiva la consegna "exactly-once". true o false Predefinito=falso
minDurationPerAckExtension Il tempo minimo in secondi da utilizzare per estendere la scadenza per la conferma della modifica. Intervallo=da 0 a 600 Valore predefinito=nessuno
maxDurationPerAckExtension Il tempo massimo in secondi da utilizzare per estendere la scadenza per la conferma della modifica. Intervallo=da 0 a 600 Valore predefinito=nessuno

Nel caso della consegna "exactly-once", la richiesta modifyAckDeadline o acknowledgment a Pub/Sub non va a buon fine quando l'ID di conferma è già scaduto. In questi casi, il servizio considera l'ID di conferma scaduto come non valido, poiché una distribuzione più recente potrebbe essere già in corso. Questo è progettato per la pubblicazione "exactly-once". Quindi vedrai le richieste acknowledgment e ModifyAckDeadline che restituiscono una risposta INVALID_ARGUMENT. Quando la pubblicazione "exactly-once" viene disattivata, queste richieste restituiscono OK nel caso di ID di conferma scaduti.

Per assicurarti che le richieste acknowledgment e ModifyAckDeadline abbiano ID di conferma validi, valuta la possibilità di impostare il valore per minDurationPerAckExtension su un numero elevato.

Creare sottoscrizioni con consegna "exactly-once"

Puoi creare un abbonamento con la consegna "exactly-once" utilizzando la console Google Cloud, Google Cloud CLI, la libreria client o l'API Pub/Sub.

Esegui il pull della sottoscrizione

Console

Per creare una sottoscrizione pull con consegna "exactly-once", segui questi passaggi:

  1. Nella console Google Cloud, vai alla pagina Abbonamenti.

    Vai ad Abbonamenti

  2. Fai clic su Crea sottoscrizione.

  3. Inserisci l'ID abbonamento.

  4. Scegli o crea un argomento dal menu a discesa.

    La sottoscrizione riceve i messaggi dall'argomento.

  5. Nella sezione Pubblicazione esatta una volta, seleziona Attiva la consegna "esattamente una volta".

  6. Fai clic su Crea.

gcloud

Per creare una sottoscrizione pull con la consegna "exactly-once", utilizza il comando gcloud pubsub subscriptions create con il flag --enable-exactly-once-delivery:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --enable-exactly-once-delivery

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID della sottoscrizione da creare
  • TOPIC_ID: l'ID dell'argomento da collegare alla sottoscrizione

REST

Per creare un abbonamento con consegna "exactly-once", utilizza il metodo projects.subscriptions.create.

PUT https://pubsub.googleapis.com/v1/projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
Authorization: Bearer $(gcloud auth print-access-token)

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto in cui creare la sottoscrizione
  • SUBSCRIPTION_ID: l'ID della sottoscrizione da creare

Per creare una sottoscrizione pull con consegna "exactly-once", specifica quanto segue nel corpo della richiesta:

{
  "topic": "projects/PROJECT_ID/topics/TOPIC_ID",
  "enableExactlyOnceDelivery": true,
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto con l'argomento
  • TOPIC_ID: l'ID dell'argomento da collegare alla sottoscrizione

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ riportate nella Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API C++ Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.set_enable_exactly_once_delivery(true);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";
}

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API C# di Pub/Sub.


using Google.Cloud.PubSub.V1;
using Grpc.Core;

public class CreateSubscriptionWithExactlyOnceDeliverySample
{
    public Subscription CreateSubscriptionWithExactlyOnceDelivery(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            EnableExactlyOnceDelivery = true
        };

        Subscription subscription = null;

        try
        {
            subscription = subscriber.CreateSubscription(subscriptionRequest);
        }
        catch (RpcException e) when (e.Status.StatusCode == StatusCode.AlreadyExists)
        {
            // Already exists.  That's fine.
        }
        return subscription;
    }
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Go Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func createSubscriptionWithExactlyOnceDelivery(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
		Topic:                     topic,
		EnableExactlyOnceDelivery: true,
	})
	if err != nil {
		return err
	}
	fmt.Fprintf(w, "Created a subscription with exactly once delivery enabled: %v\n", sub)
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione Java in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithExactlyOnceDelivery {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";

    createSubscriptionWithExactlyOnceDeliveryExample(projectId, topicId, subscriptionId);
  }

  public static void createSubscriptionWithExactlyOnceDeliveryExample(
      String projectId, String topicId, String subscriptionId) throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);

      Subscription subscription =
          subscriptionAdminClient.createSubscription(
              Subscription.newBuilder()
                  .setName(subscriptionName.toString())
                  .setTopic(topicName.toString())
                  // Enable exactly once delivery in the subscription.
                  .setEnableExactlyOnceDelivery(true)
                  .build());

      System.out.println(
          "Created a subscription with exactly once delivery enabled: "
              + subscription.getAllFields());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python Pub/Sub.

from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
    subscription = subscriber.create_subscription(
        request={
            "name": subscription_path,
            "topic": topic_path,
            "enable_exactly_once_delivery": True,
        }
    )
    print(
        f"Created subscription with exactly once delivery enabled: {subscription}"
    )

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId,
  subscriptionNameOrId
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithExactlyOnceDelivery(
  topicNameOrId: string,
  subscriptionNameOrId: string
) {
  // Creates a new subscription
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, {
      enableExactlyOnceDelivery: true,
    });
  console.log(
    `Created subscription ${subscriptionNameOrId} with exactly-once delivery.`
  );
  console.log(
    'To process messages, remember to check the return value of ackWithResponse().'
  );
}

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

require "google/cloud/pubsub"

# Shows how to create a new subscription with exactly once delivery enabled
class PubsubCreateSubscriptionWithExactlyOnceDelivery
  def create_subscription_with_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = topic.subscribe subscription_id, enable_exactly_once_delivery: true
    puts "Created subscription with exactly once delivery enabled: #{subscription_id}"
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription"
    pubsub_create_subscription_with_exactly_once_delivery = PubsubCreateSubscriptionWithExactlyOnceDelivery.new
    pubsub_create_subscription_with_exactly_once_delivery.create_subscription_with_exactly_once_delivery(
      project_id: project_id,
      topic_id: topic_id,
      subscription_id: subscription_id
    )
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubCreateSubscriptionWithExactlyOnceDelivery.run
end

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione PHP nella Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API PHP Pub/Sub.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function create_subscription_with_exactly_once_delivery(
    string $projectId,
    string $topicName,
    string $subscriptionName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $subscription = $topic->subscription($subscriptionName);
    $subscription->create([
        'enableExactlyOnceDelivery' => true
    ]);

    // Exactly Once Delivery status for the subscription
    $status = $subscription->info()['enableExactlyOnceDelivery'];

    printf('Subscription created with exactly once delivery status: %s' . PHP_EOL, $status ? 'true' : 'false');
}

Abbonati con la consegna del messaggio "exactly-once"

Di seguito sono riportati alcuni esempi di codice per l'iscrizione con la consegna "exactly-once" utilizzando la libreria client.

Esegui il pull della sottoscrizione

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Go di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// receiveMessagesWithExactlyOnceDeliveryEnabled instantiates a subscriber client.
// This differs from regular subscribing since you must call msg.AckWithResult()
// or msg.NackWithResult() instead of the regular Ack/Nack methods.
// When exactly once delivery is enabled on the subscription, the message is
// guaranteed to not be delivered again if the ack result succeeds.
func receiveMessagesWithExactlyOnceDeliveryEnabled(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// Set MinExtensionPeriod high to avoid any unintentional
	// acknowledgment expirations (e.g. due to network events).
	// This can lead to high tail latency in case of client crashes.
	sub.ReceiveSettings.MinExtensionPeriod = 600 * time.Second

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		r := msg.AckWithResult()
		// Block until the result is returned and a pubsub.AcknowledgeStatus
		// is returned for the acked message.
		status, err := r.Get(ctx)
		if err != nil {
			fmt.Fprintf(w, "MessageID: %s failed when calling result.Get: %v", msg.ID, err)
		}

		switch status {
		case pubsub.AcknowledgeStatusSuccess:
			fmt.Fprintf(w, "Message successfully acked: %s", msg.ID)
		case pubsub.AcknowledgeStatusInvalidAckID:
			fmt.Fprintf(w, "Message failed to ack with response of Invalid. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusPermissionDenied:
			fmt.Fprintf(w, "Message failed to ack with response of Permission Denied. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusFailedPrecondition:
			fmt.Fprintf(w, "Message failed to ack with response of Failed Precondition. ID: %s", msg.ID)
		case pubsub.AcknowledgeStatusOther:
			fmt.Fprintf(w, "Message failed to ack with response of Other. ID: %s", msg.ID)
		default:
		}
	})
	if err != nil {
		return fmt.Errorf("got err from sub.Receive: %w", err)
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Java di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.cloud.pubsub.v1.AckReplyConsumerWithResponse;
import com.google.cloud.pubsub.v1.AckResponse;
import com.google.cloud.pubsub.v1.MessageReceiverWithAckResponse;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithExactlyOnceConsumerWithResponseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithExactlyOnceConsumerWithResponseExample(projectId, subscriptionId);
  }

  public static void subscribeWithExactlyOnceConsumerWithResponseExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver using `AckReplyConsumerWithResponse`
    // instead of `AckReplyConsumer` to get a future that tracks the result of the ack call.
    // When exactly once delivery is enabled on the subscription, the message is guaranteed
    // to not be delivered again if the ack future succeeds.
    MessageReceiverWithAckResponse receiverWithResponse =
        (PubsubMessage message, AckReplyConsumerWithResponse consumerWithResponse) -> {
          try {
            // Handle incoming message, then ack the message, and receive an ack response.
            System.out.println("Message received: " + message.getData().toStringUtf8());
            Future<AckResponse> ackResponseFuture = consumerWithResponse.ack();

            // Retrieve the completed future for the ack response from the server.
            AckResponse ackResponse = ackResponseFuture.get();

            switch (ackResponse) {
              case SUCCESSFUL:
                // Success code means that this MessageID will not be delivered again.
                System.out.println("Message successfully acked: " + message.getMessageId());
                break;
              case INVALID:
                System.out.println(
                    "Message failed to ack with a response of Invalid. Id: "
                        + message.getMessageId());
                break;
              case PERMISSION_DENIED:
                System.out.println(
                    "Message failed to ack with a response of Permission Denied. Id: "
                        + message.getMessageId());
                break;
              case FAILED_PRECONDITION:
                System.out.println(
                    "Message failed to ack with a response of Failed Precondition. Id: "
                        + message.getMessageId());
                break;
              case OTHER:
                System.out.println(
                    "Message failed to ack with a response of Other. Id: "
                        + message.getMessageId());
                break;
              default:
                break;
            }
          } catch (InterruptedException | ExecutionException e) {
            System.out.println(
                "MessageId: " + message.getMessageId() + " failed when retrieving future");
          } catch (Throwable t) {
            System.out.println("Throwable caught" + t.getMessage());
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiverWithResponse).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Node.js di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API PHP di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages from a subscription
 * with `Exactly Once Delivery` enabled.
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_exactly_once_delivery(
    string $projectId,
    string $subscriptionId
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        // When exactly once delivery is enabled on the subscription,
        // the message is guaranteed to not be delivered again if the ack succeeds.
        // Passing the `returnFailures` flag retries any temporary failures received
        // while acking the msg and also returns any permanently failed msgs.
        // Passing this flag on a subscription with exactly once delivery disabled
        // will always return an empty array.
        $failedMsgs = $subscription->acknowledge($message, ['returnFailures' => true]);

        if (empty($failedMsgs)) {
            printf('Acknowledged message: %s' . PHP_EOL, $message->data());
        } else {
            // Either log or store the $failedMsgs to be retried later
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Python di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber import exceptions as sub_exceptions

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

    # Use `ack_with_response()` instead of `ack()` to get a future that tracks
    # the result of the acknowledge call. When exactly-once delivery is enabled
    # on the subscription, the message is guaranteed to not be delivered again
    # if the ack future succeeds.
    ack_future = message.ack_with_response()

    try:
        # Block on result of acknowledge call.
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        ack_future.result(timeout=timeout)
        print(f"Ack for message {message.message_id} successful.")
    except sub_exceptions.AcknowledgeError as e:
        print(
            f"Ack for message {message.message_id} failed with error: {e.error_code}"
        )

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Ruby di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

require "google/cloud/pubsub"

# Shows how to register callback to acknowledge method and access the result passed in
class PubsubSubscriberExactlyOnceDelivery
  def subscriber_exactly_once_delivery project_id:, topic_id:, subscription_id:
    pubsub = Google::Cloud::Pubsub.new project_id: project_id
    topic = pubsub.topic topic_id
    subscription = pubsub.subscription subscription_id
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"

      # Pass in callback to access the acknowledge result.
      # For subscription with Exactly once delivery disabled the result will be success always.
      received_message.acknowledge! do |result|
        puts "Acknowledge result's status: #{result.status}"
      end
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!
  end

  def self.run
    # TODO(developer): Replace these variables before running the sample.
    project_id = "your-project-id"
    topic_id = "your-topic-id"
    subscription_id = "id-for-new-subcription" # subscription with exactly once delivery enabled
    PubsubSubscriberExactlyOnceDelivery.new.subscriber_exactly_once_delivery project_id: project_id,
                                                                             topic_id: topic_id,
                                                                             subscription_id: subscription_id
  end
end

if $PROGRAM_NAME == __FILE__
  PubsubSubscriberExactlyOnceDelivery.run
end

C++

Prima di provare questo esempio, segui le istruzioni di configurazione di C++ nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API C++ di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack().then([id = m.message_id()](auto f) {
          auto status = f.get();
          std::cout << "Message id " << id
                    << " ack() completed with status=" << status << "\n";
        });
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API C# di Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using static Google.Cloud.PubSub.V1.SubscriberClient;

public class ExactlyOnceDeliverySubscriberAsyncSample
{
    public async Task<IEnumerable<string>> ExactlyOnceDeliverySubscriberAsync(string projectId, string subscriptionId)
    {
        // subscriptionId should be the ID of an exactly-once delivery subscription.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // To get the status of ACKnowledge (ACK) or Not ACKnowledge (NACK) request in exactly once delivery subscriptions,
        // create a subscription handler that inherits from Google.Cloud.PubSub.V1.SubscriptionHandler.
        // For more information see Google.Cloud.PubSub.V1.SubscriptionHandler reference docs here:
        // https://cloud.google.com/dotnet/docs/reference/Google.Cloud.PubSub.V1/latest/Google.Cloud.PubSub.V1.SubscriptionHandler
        var subscriptionHandler = new SampleSubscriptionHandler();
        Task subscriptionTask = subscriber.StartAsync(subscriptionHandler);
        // The subscriber will be running until it is stopped.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Let's make sure that the start task finished successfully after the call to stop.
        await subscriptionTask;
        return subscriptionHandler.SuccessfulAckedIds;
    }

    // Sample handler to handle messages and ACK/NACK responses.
    public class SampleSubscriptionHandler : SubscriptionHandler
    {
        public ConcurrentBag<string> SuccessfulAckedIds { get; } = new ConcurrentBag<string>();

        /// <summary>
        /// The function that processes received messages. It should be thread-safe.
        /// Return <see cref="Reply.Ack"/> to ACKnowledge the message (meaning it won't be received again).
        /// Return <see cref="Reply.Nack"/> to Not ACKnowledge the message (meaning it will be received again).
        /// From the point of view of message acknowledgement, throwing an exception is equivalent to returning <see cref="Reply.Nack"/>.
        /// </summary>
        public override async Task<Reply> HandleMessage(PubsubMessage message, CancellationToken cancellationToken)
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            return await Task.FromResult(Reply.Ack);
        }

        /// <summary>
        /// This method will receive responses for all acknowledge requests.
        /// </summary>
        public override void HandleAckResponses(IReadOnlyList<AckNackResponse> responses)
        {
            foreach (var response in responses)
            {
                if (response.Status == AcknowledgementStatus.Success)
                {
                    SuccessfulAckedIds.Add(response.MessageId);
                }

                string result = response.Status switch
                {
                    AcknowledgementStatus.Success => $"MessageId {response.MessageId} successfully acknowledged.",
                    AcknowledgementStatus.PermissionDenied => $"MessageId {response.MessageId} failed to acknowledge due to a permission denied error.",
                    AcknowledgementStatus.FailedPrecondition => $"MessageId {response.MessageId} failed to acknowledge due to a failed precondition.",
                    AcknowledgementStatus.InvalidAckId => $"MessageId {response.MessageId} failed to acknowledge due an invalid or expired AckId.",
                    AcknowledgementStatus.Other => $"MessageId {response.MessageId} failed to acknowledge due to an unknown reason.",
                    _ => $"Unknown acknowledgement status for messageId {response.MessageId}."
                };

                Console.WriteLine(result);
            }
        }
    }
}

Node.js (TypeScript)

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di Pub/Sub sull'utilizzo delle librerie client. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

Per eseguire l'autenticazione in Pub/Sub, configura le Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
import {Message, PubSub, AckError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForMessagesWithExactlyOnceDelivery(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount++;

    // Use `ackWithResponse()` instead of `ack()` to get a Promise that tracks
    // the result of the acknowledge call. When exactly-once delivery is enabled
    // on the subscription, the message is guaranteed not to be delivered again
    // if the ack Promise resolves.
    try {
      // When the Promise resolves, the value is always AckResponses.Success,
      // signaling that the ack was accepted. Note that you may call this
      // method on a subscription without exactly-once delivery, but it will
      // always return AckResponses.Success.
      await message.ackWithResponse();
      console.log(`Ack for message ${message.id} successful.`);
    } catch (e) {
      // In all other cases, the error passed on reject will explain why. This
      // is only for permanent failures; transient errors are retried automatically.
      const ackError = e as AckError;
      console.log(
        `Ack for message ${message.id} failed with error: ${ackError.errorCode}`
      );
    }
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Monitorare le sottoscrizioni con pubblicazione "exactly-once"

La metrica subscription/exactly_once_warning_count registra il numero di eventi che possono portare a possibili ricaricamenti (validi o duplicati). Questa metrica conteggia il numero di volte in cui Pub/Sub non riesce a elaborare le richieste associate agli ID di conferma (richiesta ModifyAckDeadline o acknowledgment). I motivi dell'errore potrebbero essere basati sul server o sul client. Ad esempio, se il livello di persistenza utilizzato per mantenere le informazioni di consegna "exactly-once" non è disponibile, si tratta di un evento basato su server. Se il client cerca di confermare un messaggio con un ID di conferma non valido, si tratta di un evento basato su client.

Comprendere la metrica

subscription/exactly_once_warning_count acquisisce gli eventi che potrebbero determinare o meno ricaricamenti effettivi e, in base al comportamento del cliente, possono presentare problemi. Ad esempio, le richieste acknowledgment o ModifyAckDeadline ripetute con ID di conferma non validi incrementano la metrica ripetutamente.

Le seguenti metriche sono utili anche per comprendere il comportamento del cliente:

  • La metrica subscription/expired_ack_deadlines_count mostra il numero di scadenze dell'ID di conferma. La scadenza dell'ID di conferma può causare errori per le richieste ModifyAckDeadline e acknowledgment.

  • La metrica service.serviceruntime.googleapis.com/api/request_count può essere utilizzata per acquisire gli errori delle richieste ModifyAckDeadline o acknowledgment nei casi in cui queste raggiungano Google Cloud, ma non raggiungino Pub/Sub. Si sono verificati errori che questa metrica non acquisirà, ad esempio quando i client sono disconnessi da Google Cloud.

Nella maggior parte dei casi di eventi di errore che è possibile riprovare, le librerie client supportate riprovano automaticamente la richiesta.

Quote

Le sottoscrizioni con pubblicazione "exactly-once" sono soggette a requisiti di quota aggiuntivi. Questa quota viene applicata a:

  • Numero di messaggi utilizzati dalle sottoscrizioni con la consegna "exactly-once" abilitata per regione.
  • Numero di messaggi confermati o la cui scadenza è estesa se si utilizzano abbonamenti con la consegna "exactly-once" abilitata per regione.

Per ulteriori informazioni su queste quote, consulta la tabella nell'argomento Quote.

Consegna "exactly-once" e abbonamenti ordinati

Pub/Sub supporta la consegna "exactly-once" con la consegna ordinata.

Quando utilizzi l'ordinamento con la consegna "exactly-once", Pub/Sub si aspetta che le conferme siano in ordine. Se le conferme sono nell'ordine sbagliato, il servizio non supera le richieste con errori temporanei. Se la scadenza per la conferma scade prima della conferma dell'ordine per la consegna, il client riceverà un nuovo recapito del messaggio. Per questo motivo, quando utilizzi l'ordinamento con la consegna "exactly-once", la velocità effettiva del client è limitata a un ordine di migliaia di messaggi al secondo.

Consegna "exactly-once" e abbonamenti push

Pub/Sub supporta la distribuzione "exactly-once" solo con le sottoscrizioni pull.

I client che utilizzano i messaggi delle sottoscrizioni push confermano i messaggi rispondendo alle richieste push con una risposta corretta. Tuttavia, i client non sanno se la sottoscrizione Pub/Sub ha ricevuto la risposta e l'ha elaborata. È diverso dalle sottoscrizioni pull, in cui le richieste di conferma vengono avviate dai client e la sottoscrizione Pub/Sub risponde se la richiesta è stata elaborata correttamente. Per questo motivo, la semantica della consegna "exactly-once" non è in linea con le sottoscrizioni push.

Aspetti da tenere presenti

  • Se la scadenza per la conferma non è specificata al momento di CreateSubscription, le sottoscrizioni abilitate per la pubblicazione "exactly-once" avranno una scadenza predefinita di conferma di 60 secondi.

  • Scadenze di conferma più lunghe sono utili per evitare la ripubblicazione causata da eventi di rete. Le librerie client supportate non utilizzano la scadenza predefinita di conferma degli abbonamenti.

  • Gli abbonamenti con pubblicazione "exactly-once" hanno una latenza di pubblicazione-sottoscrizione significativamente maggiore rispetto agli abbonamenti normali.

  • Se richiedi una velocità effettiva elevata, i client di distribuzione "exactly-once" devono anche utilizzare il streaming pull.

  • Una sottoscrizione potrebbe ricevere più copie dello stesso messaggio a causa di duplicati sul lato di pubblicazione, anche con la consegna "exactly-once" abilitata. I duplicati lato pubblicazione possono essere dovuti a più nuovi tentativi di pubblicazione univoci da parte del client di pubblicazione o del servizio Pub/Sub. Più pubblicazioni univoche da parte del cliente editore, tra diversi tentativi, portano a nuovi caricamenti con ID messaggio diversi. Più pubblicazioni univoche da parte del servizio Pub/Sub, per rispondere a una richiesta di pubblicazione del cliente, comportano il ricaricamento con gli stessi ID messaggio.

  • Puoi riprovare a risolvere gli errori in subscription/exactly_once_warning_count. Le librerie client supportate li ripeteranno automaticamente. Tuttavia, gli errori relativi a ID di conferma non validi non possono essere riprovati.