Gérer les échecs de message

Les abonnés peuvent ne pas être en mesure de gérer les messages pour diverses raisons. Par exemple, des problèmes temporaires peuvent se produire lors de la récupération des données nécessaires au traitement d'un message. Ou bien, un message peut se présenter dans un format que l'abonné n'attend pas.

Cette page explique comment gérer ces échecs de traitement à l'aide d'une stratégie de nouvelle tentative d'abonnement ou en transférant les messages non distribués vers un file d'attente de lettres mortes (également appelé file d'attente de lettres mortes).

Notez que ces fonctionnalités ne sont pas compatibles avec Dataflow. Pour plus d'informations, consultez la section Fonctionnalités Pub/Sub non compatibles de la documentation Dataflow.

Stratégie de nouvelle tentative d'abonnement

Si Pub/Sub tente de distribuer un message, mais que l'abonné ne peut pas l'accuser réception, Pub/Sub tente automatiquement de renvoyer le message. Cette tentative de renvoi est connue sous le nom de "politique de nouvelle tentative d'abonnement". Vous ne pouvez pas activer ni désactiver cette fonctionnalité, mais vous pouvez choisir le type de règle de nouvelle tentative à utiliser.

Lorsque vous créez et configurez votre abonnement pour la première fois, vous pouvez choisir d'utiliser l'une des stratégies de nouvelle tentative suivantes : nouvelle distribution immédiate ou intervalle exponentiel entre les tentatives. Par défaut, les abonnements sont renvoyés immédiatement.

Nouvelle livraison immédiate

Par défaut, Pub/Sub tente de renvoyer le message immédiatement (et éventuellement au même client abonné). Toutefois, si les conditions qui ont empêché l'accusé de réception du message n'ont pas changé, une nouvelle livraison immédiate peut entraîner des problèmes. Dans ce cas, il est possible que Pub/Sub renvoie plusieurs messages dont la réception n'est pas confirmée.

Pour résoudre les problèmes immédiats de renvoi, Pub/Sub vous permet de configurer une règle d'intervalle exponentiel entre les tentatives.

Intervalle exponentiel entre les tentatives

L'intervalle exponentiel entre les tentatives vous permet d'ajouter des délais progressivement plus longs entre les tentatives. Après le premier échec de distribution, Pub/Sub attend un intervalle minimal entre les tentatives avant de réessayer. Pour chaque échec consécutif d'un message, un délai supplémentaire est ajouté au délai, jusqu'à un délai maximal (0 et 600 secondes).

Les intervalles de retard maximal et minimal ne sont pas fixes et doivent être configurés en fonction de facteurs locaux de votre application.

Tenez compte des points suivants concernant l'intervalle exponentiel entre les tentatives:

  • L'intervalle exponentiel entre les tentatives se déclenche dans les actions suivantes :
    • Lorsqu'un accusé de réception négatif est reçu.
    • Lorsque le délai de confirmation d'un message expire.
  • L'intervalle exponentiel entre les tentatives n'est appliqué que par message, plutôt qu'à tous les messages d'un abonnement (global).
  • Lorsque vous utilisez l'intervalle exponentiel entre les tentatives, Pub/Sub continue de distribuer d'autres messages, même si les messages précédents ont reçu des accusés de réception négatifs (sauf si vous utilisez la distribution de messages ordonnés).

Configurer un intervalle exponentiel entre les tentatives

Console

Lorsque vous créez un abonnement, vous pouvez configurer une stratégie d'intervalle exponentiel entre les tentatives en procédant comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements Pub/Sub.

    Accéder aux abonnements

  2. Cliquez sur Créer un abonnement.

  3. Dans le champ ID d'abonnement, saisissez un nom.

    Pour plus d'informations sur l'attribution d'un nom à un abonnement, consultez Consignes pour nommer un sujet ou un abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant.

    L'abonnement reçoit les messages du sujet.

  5. Sélectionnez un type de diffusion.

  6. Sous Règle de nouvelle tentative, sélectionnez Réessayer après un intervalle exponentiel entre les tentatives.

  7. Saisissez un intervalle minimal entre les tentatives et l'intervalle maximal entre les tentatives, compris entre 0 et 600 secondes.

    Les valeurs par défaut sont de 10 secondes pour l'intervalle minimal et de 600 secondes pour l'intervalle maximal.

  8. Cliquez sur Créer.

gcloud

Pour créer un abonnement avec une stratégie d'intervalle exponentiel entre les tentatives, exécutez la commande gcloud pubsub create avec les options indiquées ci-dessous :

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --min-retry-delay=MIN_RETRY_DELAY \
  --max-retry-delay=MAX_RETRY_DELAY

Sujet de lettres mortes

Si le service Pub/Sub tente de distribuer un message, mais que l'abonné ne peut pas l'accuser réception, Pub/Sub peut transférer le message impossible à distribuer dans un sujet de lettre morte.

Fonctionnement des sujets de lettres mortes avec Pub/Sub

Un file d'attente de lettres mortes est une propriété d'abonnement, et non une propriété de sujet. Cela signifie que vous définissez un file d'attente de lettres mortes lorsque vous créez un abonnement, et non lorsque vous créez un sujet.

Si vous créez un file d'attente de lettres mortes, vous pouvez définir les propriétés d'abonnement suivantes:

  • Nombre maximal de tentatives de distribution: valeur numérique indiquant le nombre de tentatives de distribution effectuées par Pub/Sub pour un message spécifique. Si le client abonné ne peut pas accuser réception du message dans le délai défini pour le nombre de tentatives de distribution, le message est transféré vers un file d'attente de lettres mortes.

    • Valeur par défaut = 5
    • Valeur maximale = 100
    • Valeur minimale = 5
  • Projet avec le sujet de lettres mortes : si le sujet de lettres mortes se trouve dans un projet différent de l'abonnement, vous devez spécifier le projet avec le sujet de lettres mortes. Définissez le file d'attente de lettres mortes sur un sujet différent de celui auquel l'abonnement est associé.

Comment le nombre maximal de tentatives de livraison est-il calculé ?

Pub/Sub ne compte les tentatives de distribution que lorsqu'un file d'attente de lettres mortes est correctement configuré et qu'il inclut les autorisations IAM appropriées.

Le nombre maximal de tentatives de distribution est approximatif, car Pub/Sub transfère au mieux les messages non distribuables.

Le nombre suivi de tentatives de distribution d'un message peut également être réinitialisé, en particulier pour un abonnement pull avec des abonnés inactifs. Par conséquent, les messages peuvent être distribués au client abonné plus de fois que le nombre maximal de tentatives de distribution configuré.

Configurer un file d'attente de lettres mortes

Pour que vous puissiez configurer un file d'attente de lettres mortes, le sujet source doit d'abord être associé à un abonnement. Vous pouvez spécifier un file d'attente de lettres mortes lors de la création de l'abonnement ou mettre à jour un abonnement existant pour en utiliser file d'attente de lettres mortes.

Voici le workflow permettant d'activer la gestion des lettres mortes sur un abonnement.

  1. Créez le file d'attente de lettres mortes. Ce sujet est distinct du sujet source.

  2. Définissez le file d'attente de lettres mortes sur l'abonnement du sujet source.

  3. Pour éviter de perdre des messages du file d'attente de lettres mortes, associez au moins un autre abonnement à ce sujet. L'abonnement secondaire reçoit les messages du file d'attente de lettres mortes.

  4. Attribuez les rôles d'éditeur et d'abonné au compte de service Pub/Sub. Pour en savoir plus, consultez Accorder des autorisations de transfert.

Définir un file d'attente de lettres mortes sur un nouvel abonnement

Vous pouvez créer un abonnement et définir un file d'attente de lettres mortes à l'aide de la console Google Cloud, de Google Cloud CLI, des bibliothèques clientes ou de l'API Pub/Sub.

Console

Pour créer un abonnement et définir un sujet de lettres mortes, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder aux abonnements

  2. Cliquez sur Créer un abonnement.

  3. Saisissez l'ID de l'abonnement.

  4. Choisissez ou créez un sujet dans le menu déroulant.

    L'abonnement reçoit les messages du sujet.

  5. Dans la section Sujet de lettres mortes, sélectionnez Activer la gestion des lettres mortes.

  6. Choisissez ou créez un sujet des lettres mortes dans le menu déroulant.

  7. Si le file d'attente de lettres mortes choisi n'est associé à aucun abonnement, le système vous invite à en créer un.

  8. Dans le champ Nombre maximal de tentatives de distribution, indiquez un entier compris entre 5 et 100.

  9. Cliquez sur Créer.

  10. Le panneau des détails affiche une liste des tâches possibles. Si l'un des éléments affiche une icône d'erreur , cliquez sur la tâche concernée pour résoudre le problème.

gcloud

Pour créer un abonnement et définir un sujet de lettres mortes, utilisez la commande gcloud pubsub subscriptions create :

gcloud pubsub subscriptions create subscription-id \
  --topic=topic-id \
  --dead-letter-topic=dead-letter-topic-name \
  [--max-delivery-attempts=max-delivery-attempts] \
  [--dead-letter-topic-project=dead-letter-topic-project]

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id,
   std::string const& dead_letter_topic_id,
   int dead_letter_delivery_attempts) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_dead_letter_policy()->set_dead_letter_topic(
      pubsub::Topic(project_id, dead_letter_topic_id).FullName());
  request.mutable_dead_letter_policy()->set_max_delivery_attempts(
      dead_letter_delivery_attempts);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";

  std::cout << "It will forward dead letter messages to: "
            << sub->dead_letter_policy().dead_letter_topic() << "\n";

  std::cout << "After " << sub->dead_letter_policy().max_delivery_attempts()
            << " delivery attempts.\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using System;

public class CreateSubscriptionWithDeadLetterPolicySample
{
    public Subscription CreateSubscriptionWithDeadLetterPolicy(string projectId, string topicId, string subscriptionId, string deadLetterTopicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is the subscription you want to create with a dead letter policy.
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        // This is an existing topic that you want to attach the subscription with dead letter policy to.
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing topic that the subscription with dead letter policy forwards dead letter messages to.
        var deadLetterTopic = TopicName.FromProjectTopic(projectId, deadLetterTopicId).ToString();
        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = new DeadLetterPolicy
            {
                DeadLetterTopic = deadLetterTopic,
                // The maximum number of times that the service attempts to deliver a
                // message before forwarding it to the dead letter topic. Must be [5-100].
                MaxDeliveryAttempts = 10
            },
            AckDeadlineSeconds = 30
        };

        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        Console.WriteLine("Created subscription: " + subscription.SubscriptionName.SubscriptionId);
        Console.WriteLine($"It will forward dead letter messages to: {subscription.DeadLetterPolicy.DeadLetterTopic}");
        Console.WriteLine($"After {subscription.DeadLetterPolicy.MaxDeliveryAttempts} delivery attempts.");
        // Remember to attach a subscription to the dead letter topic because
        // messages published to a topic with no subscriptions are lost.
        return subscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

// createSubWithDeadLetter creates a subscription with a dead letter policy.
func createSubWithDeadLetter(w io.Writer, projectID, subID string, topicID string, fullyQualifiedDeadLetterTopic string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// topicID := "my-topic"
	// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	topic := client.Topic(topicID)

	subConfig := pubsub.SubscriptionConfig{
		Topic:       topic,
		AckDeadline: 20 * time.Second,
		DeadLetterPolicy: &pubsub.DeadLetterPolicy{
			DeadLetterTopic:     fullyQualifiedDeadLetterTopic,
			MaxDeliveryAttempts: 10,
		},
	}

	sub, err := client.CreateSubscription(ctx, subID, subConfig)
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription (%s) with dead letter topic (%s)\n", sub.String(), fullyQualifiedDeadLetterTopic)
	fmt.Fprintln(w, "To process dead letter messages, remember to add a subscription to your dead letter topic.")
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithDeadLetterPolicyExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is the subscription you want to create with a dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that you want to attach the subscription with dead letter policy
    // to.
    String topicId = "your-topic-id";
    // This is an existing topic that the subscription with dead letter policy forwards dead letter
    // messages to.
    String deadLetterTopicId = "your-dead-letter-topic-id";

    CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample(
        projectId, subscriptionId, topicId, deadLetterTopicId);
  }

  public static void createSubscriptionWithDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);
      ProjectTopicName deadLetterTopicName = ProjectTopicName.of(projectId, deadLetterTopicId);

      DeadLetterPolicy deadLetterPolicy =
          DeadLetterPolicy.newBuilder()
              .setDeadLetterTopic(deadLetterTopicName.toString())
              // The maximum number of times that the service attempts to deliver a
              // message before forwarding it to the dead letter topic. Must be [5-100].
              .setMaxDeliveryAttempts(10)
              .build();

      Subscription request =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setDeadLetterPolicy(deadLetterPolicy)
              .build();

      Subscription subscription = subscriptionAdminClient.createSubscription(request);

      System.out.println("Created subscription: " + subscription.getName());
      System.out.println(
          "It will forward dead letter messages to: "
              + subscription.getDeadLetterPolicy().getDeadLetterTopic());
      System.out.println(
          "After "
              + subscription.getDeadLetterPolicy().getMaxDeliveryAttempts()
              + " delivery attempts.");
      // Remember to attach a subscription to the dead letter topic because
      // messages published to a topic with no subscriptions are lost.
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const deadLetterTopicNameOrId = 'YOUR_DEAD_LETTER_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithDeadLetterPolicy(
  topicNameOrId,
  subscriptionNameOrId,
  deadLetterTopicNameOrId
) {
  // Creates a new subscription
  const options = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(deadLetterTopicNameOrId).name,
      maxDeliveryAttempts: 10,
    },
  };
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(
    `Created subscription ${subscriptionNameOrId} with dead letter topic ${deadLetterTopicNameOrId}.`
  );
  console.log(
    'To process dead letter messages, remember to add a subscription to your dead letter topic.'
  );
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const deadLetterTopicNameOrId = 'YOUR_DEAD_LETTER_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithDeadLetterPolicy(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  deadLetterTopicNameOrId: string
) {
  // Creates a new subscription
  const options: CreateSubscriptionOptions = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(deadLetterTopicNameOrId).name,
      maxDeliveryAttempts: 10,
    },
  };
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(
    `Created subscription ${subscriptionNameOrId} with dead letter topic ${deadLetterTopicNameOrId}.`
  );
  console.log(
    'To process dead letter messages, remember to add a subscription to your dead letter topic.'
  );
}

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with dead letter policy enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $deadLetterTopicName The Pub/Sub topic to use for dead letter policy.
 */
function dead_letter_create_subscription($projectId, $topicName, $subscriptionName, $deadLetterTopicName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $deadLetterTopic = $pubsub->topic($deadLetterTopicName);

    $subscription = $topic->subscribe($subscriptionName, [
        'deadLetterPolicy' => [
            'deadLetterTopic' => $deadLetterTopic
        ]
    ]);

    printf(
        'Subscription %s created with dead letter topic %s' . PHP_EOL,
        $subscription->name(),
        $deadLetterTopic->name()
    );
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy

# TODO(developer)
# project_id = "your-project-id"
# endpoint = "https://my-test-project.appspot.com/push"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

with subscriber:
    request = {
        "name": subscription_path,
        "topic": topic_path,
        "dead_letter_policy": dead_letter_policy,
    }
    subscription = subscriber.create_subscription(request)

print(f"Subscription created: {subscription.name}")
print(
    f"It will forward dead letter messages to: {subscription.dead_letter_policy.dead_letter_topic}."
)
print(
    f"After {subscription.dead_letter_policy.max_delivery_attempts} delivery attempts."
)

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration de Ruby décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

# topic_id             = "your-topic-id"
# subscription_id      = "your-subscription-id"
# dead_letter_topic_id = "your-dead-letter-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic             = pubsub.topic topic_id
dead_letter_topic = pubsub.topic dead_letter_topic_id
subscription      = topic.subscribe subscription_id,
                                    dead_letter_topic:                 dead_letter_topic,
                                    dead_letter_max_delivery_attempts: 10

puts "Created subscription #{subscription_id} with dead letter topic #{dead_letter_topic_id}."
puts "To process dead letter messages, remember to add a subscription to your dead letter topic."

Définir un file d'attente de lettres mortes pour un abonnement existant

Vous pouvez mettre à jour un abonnement et définir un file d'attente de lettres mortes à l'aide de la console Google Cloud, de Google Cloud CLI, des bibliothèques clientes ou de l'API Pub/Sub.

Console

Pour mettre à jour un abonnement et définir une file d'attente de lettres mortes, procédez comme suit.

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder aux abonnements

  2. À côté de l'abonnement à mettre à jour, cliquez sur Autres actions .

  3. Dans le menu contextuel, sélectionnez Modifier.

    Le menu contextuel avec l'option Modifier mise en évidence.

  4. Dans la section Sujet de lettres mortes, sélectionnez Activer la gestion des lettres mortes.

  5. Choisissez ou créez un sujet dans le menu déroulant.

  6. Si le sujet choisi n'est associé à aucun abonnement, le système vous invite à en créer un.

  7. Dans le champ Nombre maximal de tentatives de distribution, indiquez un entier compris entre 5 et 100.

  8. Cliquez sur Update (Mettre à jour).

  9. Le panneau des détails affiche une liste des tâches possibles. Si l'un des éléments affiche une icône d'erreur , cliquez sur la tâche concernée pour résoudre le problème.

gcloud

Pour mettre à jour un abonnement et définir un sujet de lettres mortes, exécutez la commande gcloud pubsub subscriptions update :

gcloud pubsub subscriptions update subscription-id \
  --dead-letter-topic=dead-letter-topic-name \
  [--max-delivery-attempts=max-delivery-attempts] \
  [--dead-letter-topic-project=dead-letter-topic-project]

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& subscription_id,
   std::string const& dead_letter_topic_id,
   int dead_letter_delivery_attempts) {
  google::pubsub::v1::UpdateSubscriptionRequest request;
  request.mutable_subscription()->set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.mutable_subscription()
      ->mutable_dead_letter_policy()
      ->set_dead_letter_topic(
          pubsub::Topic(project_id, dead_letter_topic_id).FullName());
  request.mutable_subscription()
      ->mutable_dead_letter_policy()
      ->set_max_delivery_attempts(dead_letter_delivery_attempts);
  *request.mutable_update_mask()->add_paths() = "dead_letter_policy";
  auto sub = client.UpdateSubscription(request);
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription has been updated to: " << sub->DebugString()
            << "\n";

  std::cout << "It will forward dead letter messages to: "
            << sub->dead_letter_policy().dead_letter_topic() << "\n";

  std::cout << "After " << sub->dead_letter_policy().max_delivery_attempts()
            << " delivery attempts.\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;

public class UpdateDeadLetterPolicySample
{
    public Subscription UpdateDeadLetterPolicy(string projectId, string topicId, string subscriptionId, string deadLetterTopicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is an existing topic that the subscription with dead letter policy is attached to.
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing subscription with a dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        // This is an existing dead letter topic that the subscription with dead letter policy forwards
        // dead letter messages to.
        var deadLetterTopic = TopicName.FromProjectTopic(projectId, deadLetterTopicId).ToString();

        // Construct the subscription with the dead letter policy you expect to have after the update.
        // Here, values in the required fields (name, topic) help identify the subscription.
        var subscription = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = new DeadLetterPolicy
            {
                DeadLetterTopic = deadLetterTopic,
                MaxDeliveryAttempts = 20,
            }
        };

        var request = new UpdateSubscriptionRequest
        {
            Subscription = subscription,
            // Construct a field mask to indicate which field to update in the subscription.
            UpdateMask = new FieldMask { Paths = { "dead_letter_policy" } }
        };
        var updatedSubscription = subscriber.UpdateSubscription(request);
        return updatedSubscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// updateDeadLetter updates an existing subscription with a dead letter policy.
func updateDeadLetter(w io.Writer, projectID, subID string, fullyQualifiedDeadLetterTopic string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// fullyQualifiedDeadLetterTopic := "projects/my-project/topics/my-dead-letter-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateConfig := pubsub.SubscriptionConfigToUpdate{
		DeadLetterPolicy: &pubsub.DeadLetterPolicy{
			DeadLetterTopic:     fullyQualifiedDeadLetterTopic,
			MaxDeliveryAttempts: 20,
		},
	}

	subConfig, err := client.Subscription(subID).Update(ctx, updateConfig)
	if err != nil {
		return fmt.Errorf("Update: %w", err)
	}
	fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateSubscriptionRequest;
import java.io.IOException;

public class UpdateDeadLetterPolicyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with a dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that the subscription with dead letter policy is attached to.
    String topicId = "your-topic-id";
    // This is an existing dead letter topic that the subscription with dead letter policy forwards
    // dead letter messages to.
    String deadLetterTopicId = "your-dead-letter-topic-id";

    UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample(
        projectId, subscriptionId, topicId, deadLetterTopicId);
  }

  public static void updateDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

      System.out.println(
          "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields());

      TopicName topicName = TopicName.of(projectId, topicId);
      TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId);

      // Construct the dead letter policy you expect to have after the update.
      DeadLetterPolicy deadLetterPolicy =
          DeadLetterPolicy.newBuilder()
              .setDeadLetterTopic(deadLetterTopicName.toString())
              .setMaxDeliveryAttempts(20)
              .build();

      // Construct the subscription with the dead letter policy you expect to have
      // after the update. Here, values in the required fields (name, topic) help
      // identify the subscription.
      Subscription subscription =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setDeadLetterPolicy(deadLetterPolicy)
              .build();

      // Construct a field mask to indicate which field to update in the subscription.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("dead_letter_policy.max_delivery_attempts").build();

      UpdateSubscriptionRequest request =
          UpdateSubscriptionRequest.newBuilder()
              .setSubscription(subscription)
              .setUpdateMask(updateMask)
              .build();

      Subscription response = subscriptionAdminClient.updateSubscription(request);

      System.out.println("After: " + response.getAllFields());
      System.out.println(
          "Max delivery attempts is now "
              + response.getDeadLetterPolicy().getMaxDeliveryAttempts());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateDeadLetterPolicy(topicNameOrId, subscriptionNameOrId) {
  const metadata = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(topicNameOrId).name,
      maxDeliveryAttempts: 15,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .subscription(subscriptionNameOrId)
    .setMetadata(metadata);

  console.log('Max delivery attempts updated successfully.');
}

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Set the dead letter policy on an existing subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $deadLetterTopicName The Pub/Sub topic to use for dead letter policy.
 */
function dead_letter_update_subscription(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $deadLetterTopicName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $deadLetterTopic = $pubsub->topic($deadLetterTopicName);

    $subscription = $topic->subscription($subscriptionName);
    $subscription->update([
        'deadLetterPolicy' => [
            'deadLetterTopic' => $deadLetterTopic
        ]
    ]);

    printf(
        'Subscription %s updated with dead letter topic %s' . PHP_EOL,
        $subscription->name(),
        $deadLetterTopic->name()
    );
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask

# TODO(developer)
# project_id = "your-project-id"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

subscription_before_update = subscriber.get_subscription(
    request={"subscription": subscription_path}
)
print(f"Before the update: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

# Construct the subscription with the dead letter policy you expect to have
# after the update. Here, values in the required fields (name, topic) help
# identify the subscription.
subscription = pubsub_v1.types.Subscription(
    name=subscription_path,
    topic=topic_path,
    dead_letter_policy=dead_letter_policy,
)

with subscriber:
    subscription_after_update = subscriber.update_subscription(
        request={"subscription": subscription, "update_mask": update_mask}
    )

print(f"After the update: {subscription_after_update}.")

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration de Ruby décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

# subscription_id       = "your-subscription-id"
# role                  = "roles/pubsub.publisher"
# service_account_email = "serviceAccount:account_name@project_name.iam.gserviceaccount.com"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.dead_letter_max_delivery_attempts = 20
puts "Max delivery attempts is now #{subscription.dead_letter_max_delivery_attempts}."

Accorder des rôles IAM pour utiliser des sujets de lettres mortes

Pour transférer des messages non distribuables vers un sujet de lettres mortes, Pub/Sub doit être autorisé à effectuer les opérations suivantes :

  • Publier des messages sur le sujet
  • Confirmez les messages, ce qui les supprime de l'abonnement.

Pub/Sub crée et gère un compte de service pour chaque projet : service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com. Vous pouvez accorder des autorisations de transfert en attribuant des rôles d'éditeur et d'abonné à ce compte de service.

Console

Pour autoriser Pub/Sub à publier des messages dans un file d'attente de lettres mortes, procédez comme suit:

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder aux abonnements

  2. Cliquez sur le nom de l'abonnement contenant le file d'attente de lettres mortes.

  3. Cliquez sur l'onglet Lettres mortes.

  4. Pour attribuer un rôle d'éditeur, cliquez sur Attribuer le rôle d'éditeur. Si le rôle d'éditeur a bien été attribué, une coche bleue s'affiche.

  5. Pour attribuer un rôle d'abonné, cliquez sur Attribuer le rôle d'abonné. Si le rôle d'éditeur a bien été attribué, une coche bleue s'affiche.

gcloud

Pour autoriser Pub/Sub à publier des messages dans le sujet de lettres mortes, exécutez la commande suivante :

PUBSUB_SERVICE_ACCOUNT="service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding dead-letter-topic-name \
  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
  --role="roles/pubsub.publisher"

Pour autoriser Pub/Sub à accuser réception des messages non distribués, exécutez la commande suivante :

PUBSUB_SERVICE_ACCOUNT="service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub subscriptions add-iam-policy-binding subscription-id \
  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
  --role="roles/pubsub.subscriber"

Suivre les tentatives de livraison

Une fois que vous avez activé un file d'attente de lettres mortes pour un abonnement, chaque message de cet abonnement comporte un champ qui spécifie le nombre de tentatives de distribution:

  • Les messages reçus à partir d'un abonnement pull incluent le champ delivery_attempt.

  • Les messages reçus à partir d'un abonnement push incluent le champ deliveryAttempt.

Les exemples suivants montrent comment obtenir le nombre de tentatives de livraison :

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::cout << "Delivery attempt: " << h.delivery_attempt() << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Avant d'essayer cet exemple, suivez les instructions de configuration de C# décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub C#.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


using Google.Cloud.PubSub.V1;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncWithDeliveryAttemptsSample
{
    public async Task<int> PullMessagesAsyncWithDeliveryAttempts(string projectId, string subscriptionId, bool acknowledge)
    {
        // This is an existing subscription with a dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);

        int deliveryAttempt = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            System.Console.WriteLine($"Delivery Attempt: {message.GetDeliveryAttempt()}");
            if (message.GetDeliveryAttempt() != null)
            {
                deliveryAttempt = message.GetDeliveryAttempt().Value;
            }
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return deliveryAttempt;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions de configuration de Go décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgsDeadLetterDeliveryAttempt(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	sub := client.Subscription(subID)
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		// When dead lettering is enabled, the delivery attempt field is a pointer to the
		// the number of times the service has attempted to delivery a message.
		// Otherwise, the field is nil.
		if msg.DeliveryAttempt != nil {
			fmt.Fprintf(w, "message: %s, delivery attempts: %d", msg.Data, *msg.DeliveryAttempt)
		}
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("got error in Receive: %w", err)
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration de Java décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ReceiveMessagesWithDeliveryAttemptsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with a dead letter policy.
    String subscriptionId = "your-subscription-id";

    ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample(
        projectId, subscriptionId);
  }

  public static void receiveMessagesWithDeliveryAttemptsExample(
      String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        new MessageReceiver() {
          @Override
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            // Handle incoming message, then ack the received message.
            System.out.println("Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
            System.out.println("Delivery Attempt: " + Subscriber.getDeliveryAttempt(message));
            consumer.ack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithDeliveryAttempts(
  projectId,
  subscriptionNameOrId
) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${message.message.data}`);
    console.log(`Delivery Attempt: ${message.deliveryAttempt}`);
    if (message.ackId) {
      ackIds.push(message.ackId);
    }
  }

  // Acknowledge all of the messages. You could also acknowledge
  // these individually, but this is more efficient.
  const ackRequest = {
    subscription: formattedSubscription,
    ackIds: ackIds,
  };
  await subClient.acknowledge(ackRequest);

  console.log('Done.');
}

PHP

Avant d'essayer cet exemple, suivez les instructions de configuration de PHP décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub PHP.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

use Google\Cloud\PubSub\Message;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Get the delivery attempt from a pulled message.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $message The contents of a pubsub message data field.
 */
function dead_letter_delivery_attempt($projectId, $topicName, $subscriptionName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);

    // publish test message
    $topic->publish(new Message([
        'data' => $message
    ]));

    $subscription = $topic->subscription($subscriptionName);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        printf('Received message %s' . PHP_EOL, $message->data());
        printf('Delivery attempt %d' . PHP_EOL, $message->deliveryAttempt());
    }
    print('Done' . PHP_EOL);
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration de Python décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    print(f"With delivery attempts: {message.delivery_attempt}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration de Ruby décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.pull(immediate: false).each do |message|
  puts "Received message: #{message.data}"
  puts "Delivery Attempt: #{message.delivery_attempt}"
  message.acknowledge!
end

Lorsque Pub/Sub transfère un message impossible à distribuer à un file d'attente de lettres mortes, il ajoute les attributs suivants au message:

  • CloudPubSubDeadLetterSourceDeliveryCount: nombre de tentatives d'envoi à l'abonnement source.
  • CloudPubSubDeadLetterSourceSubscription: nom de l'abonnement source.
  • CloudPubSubDeadLetterSourceSubscriptionProject: nom du projet qui contient l'abonnement source.
  • CloudPubSubDeadLetterSourceTopicPublishTime: code temporel de la publication initiale du message
  • CloudPubSubDeadLetterSourceDeliveryErrorMessage: raison pour laquelle le message n'a pas pu être distribué à sa destination d'origine. L'attribut n'existe que pour les abonnements liés à l'exportation.

Surveiller les messages transférés

Après avoir transféré un message non distribuable, le service Pub/Sub supprime le message de l'abonnement. Vous pouvez surveiller les messages transférés avec Cloud Monitoring.

Si vous associez un abonnement au sujet de lettres mortes, les messages utilisent la règle d'expiration de l'abonnement associé plutôt que la période d'expiration de l'abonnement avec la propriété de sujet de lettres mortes.

La métrique subscription/dead_letter_message_count enregistre le nombre de messages non distribuables que Pub/Sub transfère depuis un abonnement.

Pour en savoir plus, consultez la section Surveiller les messages non distribuables transférés.

Supprimer un file d'attente de lettres mortes

Pour arrêter le transfert de messages non distribuables, supprimez le sujet de lettres mortes de l'abonnement.

Vous pouvez supprimer un file d'attente de lettres mortes d'un abonnement à l'aide de la console Google Cloud, de Google Cloud CLI ou de l'API Pub/Sub.

Console

Pour supprimer un sujet de lettres mortes d'un abonnement, procédez comme suit :

  1. Dans la console Google Cloud, accédez à la page Abonnements.

    Accéder aux abonnements

  2. Dans la liste des abonnements, cliquez sur à côté de l'abonnement à mettre à jour.

  3. Dans le menu contextuel, sélectionnez Modifier.

    Le menu contextuel avec l'option Modifier mise en évidence.

  4. Dans la section Sujet de lettres mortes, désélectionnez Activer la gestion des lettres mortes.

  5. Cliquez sur Mettre à jour.

gcloud

Pour supprimer un sujet de lettres mortes d'un abonnement, exécutez la commande gcloud pubsub subscriptions update et l'option --clear-dead-letter-policy :

gcloud pubsub subscriptions update subscription-id \
  --clear-dead-letter-policy

C++

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C++ qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C++.

namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& subscription_id) {
  google::pubsub::v1::UpdateSubscriptionRequest request;
  request.mutable_subscription()->set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.mutable_subscription()->clear_dead_letter_policy();
  *request.mutable_update_mask()->add_paths() = "dead_letter_policy";
  auto sub = client.UpdateSubscription(request);
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription has been updated to: " << sub->DebugString()
            << "\n";
}

C#

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage C# qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour C#.


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;

public class RemoveDeadLetterPolicySample
{
    public Subscription RemoveDeadLetterPolicy(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is an existing topic that the subscription with dead letter policy is attached to.
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing subscription with dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscription = new Subscription()
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = null
        };

        var request = new UpdateSubscriptionRequest
        {
            Subscription = subscription,
            UpdateMask = new FieldMask { Paths = { "dead_letter_policy" } }
        };
        var updatedSubscription = subscriber.UpdateSubscription(request);
        return updatedSubscription;
    }
}

Go

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Go qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

// removeDeadLetterTopic removes the dead letter policy from a subscription.
func removeDeadLetterTopic(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	subConfig, err := client.Subscription(subID).Update(ctx, pubsub.SubscriptionConfigToUpdate{
		DeadLetterPolicy: &pubsub.DeadLetterPolicy{},
	})
	if err != nil {
		return fmt.Errorf("Update: %w", err)
	}
	fmt.Fprintf(w, "Updated subscription config: %+v\n", subConfig)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Java qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Java.


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateSubscriptionRequest;

public class RemoveDeadLetterPolicyExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that the subscription with dead letter policy is attached to.
    String topicId = "your-topic-id";

    RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId);
  }

  public static void removeDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId) throws Exception {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);
      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the subscription you expect to have after the request. Here,
      // values in the required fields (name, topic) help identify the subscription.
      // No dead letter policy is supplied.
      Subscription expectedSubscription =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .build();

      // Construct a field mask to indicate which field to update in the subscription.
      FieldMask updateMask = FieldMask.newBuilder().addPaths("dead_letter_policy").build();

      UpdateSubscriptionRequest request =
          UpdateSubscriptionRequest.newBuilder()
              .setSubscription(expectedSubscription)
              .setUpdateMask(updateMask)
              .build();

      Subscription response = subscriptionAdminClient.updateSubscription(request);

      // You should see an empty dead letter topic field inside the dead letter policy.
      System.out.println("After: " + response.getAllFields());
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Node.js qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function removeDeadLetterPolicy(topicNameOrId, subscriptionNameOrId) {
  const metadata = {
    deadLetterPolicy: null,
  };

  await pubSubClient
    .topic(topicNameOrId)
    .subscription(subscriptionNameOrId)
    .setMetadata(metadata);

  console.log(
    `Removed dead letter topic from ${subscriptionNameOrId} subscription.`
  );
}

PHP

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage PHP qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Remove dead letter policy from an existing subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function dead_letter_remove($projectId, $topicName, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);

    $subscription = $topic->subscription($subscriptionName);

    // Provide deadLetterPolicy in the update mask, but omit from update fields to unset.
    $subscription->update([], [
        'updateMask' => [
            'deadLetterPolicy'
        ]
    ]);

    printf(
        'Removed dead letter topic from subscription %s' . PHP_EOL,
        $subscription->name()
    );
}

Python

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Python qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Python.

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import FieldMask

# TODO(developer)
# project_id = "your-project-id"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

subscription_before_update = subscriber.get_subscription(
    request={"subscription": subscription_path}
)
print(f"Before removing the policy: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct the subscription (without any dead letter policy) that you
# expect to have after the update.
subscription = pubsub_v1.types.Subscription(
    name=subscription_path, topic=topic_path
)

with subscriber:
    subscription_after_update = subscriber.update_subscription(
        request={"subscription": subscription, "update_mask": update_mask}
    )

print(f"After removing the policy: {subscription_after_update}.")

Ruby

Avant d'essayer cet exemple, suivez les instructions d'installation dans le langage Ruby qui se trouvent sur la page Démarrage rapide : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Ruby.

Ruby

Avant d'essayer cet exemple, suivez les instructions de configuration de Ruby décrites dans le guide de démarrage rapide de Pub/Sub à l'aide des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Pub/Sub Ruby.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscription.remove_dead_letter_policy
puts "Removed dead letter topic from #{subscription_id} subscription."

Tarification

Lorsque le service Pub/Sub transfère les messages non distribuables, les frais suivants s'appliquent :

  • Frais de publication facturés sur le compte de facturation associé au projet contenant le sujet de lettres mortes.
  • Frais d'abonnement pour les messages sortants, facturés sur le compte de facturation associé au projet contenant l'abonnement avec la propriété de file d'attente de lettres mortes

Si vous définissez la propriété du file d'attente de lettres mortes d'un abonnement, mais que la règle d'emplacement de stockage des messages associée à ce sujet n'autorise pas la région contenant l'abonnement, des frais de publication pour les messages sortants s'appliquent également.

Les frais de publication des messages sortants sont facturés au projet contenant le file d'attente de lettres mortes. Pour en savoir plus, reportez-vous à la page Tarifs.

Étapes suivantes