Gestionar fallos de mensajes

Es posible que los suscriptores no puedan gestionar los mensajes por varios motivos. Por ejemplo, puede haber problemas transitorios al recuperar los datos necesarios para procesar un mensaje. También puede que el mensaje tenga un formato que el suscriptor no espera.

En esta página se explica cómo gestionar estos errores de procesamiento mediante una política de reintento de suscripción o reenviando los mensajes no entregados a un tema de mensajes fallidos (también conocido como cola de mensajes fallidos).

Ten en cuenta que Dataflow no admite estas funciones. Para obtener más información, consulta la sección Funciones de Pub/Sub no admitidas de la documentación de Dataflow.

Política de reintentos de suscripciones

Si Pub/Sub intenta enviar un mensaje, pero el suscriptor no puede confirmarlo, Pub/Sub intenta volver a enviar el mensaje automáticamente. Este intento de reenvío se conoce como política de reintentos de suscripción. No es una función que puedas activar o desactivar. Sin embargo, puedes elegir el tipo de política de reintentos que quieras usar.

Cuando crees y configures tu suscripción por primera vez, puedes elegir una de las siguientes políticas de reintento: reenvío inmediato o retroceso exponencial. De forma predeterminada, las suscripciones usan la retransmisión inmediata.

Reenvío inmediato

De forma predeterminada, Pub/Sub intenta reenviar el mensaje inmediatamente (y, posiblemente, al mismo cliente suscriptor). Sin embargo, si no han cambiado las condiciones que han impedido que se confirme la recepción del mensaje, la retransmisión inmediata puede provocar problemas. En este caso, es posible que Pub/Sub vuelva a enviar varios mensajes que no se puedan confirmar.

Para solucionar los problemas de reenvío inmediato, Pub/Sub te permite configurar una política de retirada exponencial.

Tiempo de espera exponencial

El tiempo de espera exponencial te permite añadir retrasos cada vez más largos entre los intentos. Después del primer error de entrega, Pub/Sub espera un tiempo de espera mínimo antes de volver a intentarlo. Por cada fallo consecutivo de un mensaje, se añade más tiempo al retraso, hasta un retraso máximo (0 y 600 segundos).

Los intervalos de retraso máximo y mínimo no son fijos y deben configurarse en función de los factores locales de tu aplicación.

Ten en cuenta las siguientes consideraciones sobre el tiempo de espera exponencial:

  • El tiempo de espera exponencial se activa en las siguientes acciones:
    • Cuando se recibe una confirmación negativa.
    • Cuando vence el plazo de confirmación de un mensaje.
  • El retroceso exponencial solo se aplica por mensaje, en lugar de a todos los mensajes de una suscripción (global).
  • Mientras se usa la retirada exponencial, Pub/Sub sigue enviando otros mensajes, aunque los mensajes anteriores hayan recibido confirmaciones negativas (a menos que uses el envío de mensajes ordenado).

Usa la política de reintentos para retrasar el envío y el procesamiento de un subconjunto de mensajes y así hacer frente a la imposibilidad transitoria de procesar algunos mensajes al enviarse. La función se aplica de la mejor forma posible y cada mensaje se evalúa por separado según la política de reintentos.

No recomendamos usar esta función para introducir retrasos intencionados en la entrega de mensajes. Si envías una confirmación negativa (nack) de un gran número de mensajes en una suscripción configurada con una política de reintentos, es posible que algunos de esos mensajes se entreguen con un tiempo de espera menor o sin él. Pub/Sub también puede ralentizar la entrega de todos los mensajes si envías un gran número de respuestas negativas.

Si necesitas programar entregas, te recomendamos que uses Cloud Tasks.

Configurar el tiempo de espera exponencial

Consola

Cuando crees una suscripción, puedes configurar una política de reintentos con retroceso exponencial siguiendo estos pasos:

  1. En la Google Cloud consola, ve a la página Suscripciones de Pub/Sub.

    Ir a suscripciones

  2. Haz clic en Crear suscripción.

  3. En el campo ID de suscripción, introduce un nombre.

    Para obtener información sobre cómo asignar un nombre a una suscripción, consulta las directrices para asignar un nombre a un tema o a una suscripción.

  4. Elige o crea un tema en el menú desplegable.

    La suscripción recibe mensajes del tema.

  5. Seleccione un Tipo de publicación.

  6. En Política de reintentos, seleccione Reintentar después de un tiempo de espera exponencial.

  7. Introduce un Tiempo de espera mínimo y un Tiempo de espera máximo de entre 0 y 600 segundos.

    Los valores predeterminados son 10 segundos para el tiempo de espera mínimo y 600 segundos para el tiempo de espera máximo.

  8. Haz clic en Crear.

gcloud

Para crear una suscripción con una política de reintentos con retroceso exponencial, ejecuta el comando gcloud pubsub create con las marcas que se muestran a continuación:

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
  --topic=TOPIC_ID \
  --min-retry-delay=MIN_RETRY_DELAY \
  --max-retry-delay=MAX_RETRY_DELAY

Tema de mensajes fallidos

Si el servicio Pub/Sub intenta entregar un mensaje, pero el suscriptor no puede confirmar la recepción, Pub/Sub puede reenviar el mensaje que no se ha podido entregar a un tema de mensajes fallidos.

Cómo funcionan los temas de mensajes fallidos con Pub/Sub

Un tema de mensajes fallidos es una propiedad de suscripción, no una propiedad de tema. Esto significa que debes definir un tema de mensajes fallidos cuando crees una suscripción, no cuando crees un tema.

Si creas un tema de mensajes fallidos, puedes definir las siguientes propiedades de suscripción:

  • Número máximo de intentos de entrega: valor numérico que indica el número de intentos de entrega que Pub/Sub realiza para un mensaje específico. Si el cliente de la suscripción no puede confirmar la recepción del mensaje en el número de intentos de entrega configurado, el mensaje se reenvía a un tema de mensajes fallidos.

    • Valor predeterminado = 5
    • Valor máximo = 100
    • Valor mínimo = 5
  • Proyecto con el tema de mensajes fallidos: si el tema de mensajes fallidos está en un proyecto distinto de la suscripción, debes especificar el proyecto con el tema de mensajes fallidos. Asigna al tema de mensajes fallidos un tema distinto del tema al que está asociada la suscripción.

Cómo se calcula el número máximo de intentos de entrega

Pub/Sub solo contabiliza los intentos de entrega cuando un tema de mensajes fallidos está configurado correctamente e incluye los permisos de gestión de identidades y accesos correctos.

El número máximo de intentos de entrega es aproximado porque Pub/Sub reenvía los mensajes que no se pueden entregar en la medida de lo posible.

El número de intentos de entrega registrados de un mensaje también puede restablecerse a cero, sobre todo en el caso de una suscripción de extracción con suscriptores inactivos. Por lo tanto, es posible que los mensajes se entreguen al cliente suscriptor más veces que el número máximo de intentos de entrega configurado.

Configurar un tema de mensajes fallidos

Para configurar un tema de mensajes fallidos, el tema de origen debe tener primero una suscripción. Puedes especificar un tema de mensajes fallidos al crear la suscripción o actualizar una suscripción que ya tengas para que tenga un tema de mensajes fallidos.

A continuación, se muestra el flujo de trabajo para habilitar el envío de mensajes fallidos en una suscripción.

  1. Crea el tema de mensajes fallidos. Este tema es independiente del tema de origen.

  2. Define el tema de mensajes fallidos en la suscripción del tema de origen.

  3. Para no perder los mensajes del tema de mensajes fallidos, vincula al menos otra suscripción al tema de mensajes fallidos. La suscripción secundaria recibe mensajes del tema de mensajes fallidos.

  4. Asigna los roles de editor y suscriptor a la cuenta de servicio de Pub/Sub. Para obtener más información, consulta Conceder permisos de reenvío.

Definir un tema de mensajes fallidos en una suscripción nueva

Puedes crear una suscripción y definir un tema de mensajes fallidos mediante la consolaGoogle Cloud , la CLI de Google Cloud, las bibliotecas de cliente o la API Pub/Sub.

Consola

Para crear una suscripción y definir un tema de mensajes fallidos, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Suscripciones.

    Ir a Suscripciones

  2. Haz clic en Crear suscripción.

  3. Introduce el ID de suscripción.

  4. Elige o crea un tema en el menú desplegable.

    La suscripción recibe mensajes del tema.

  5. En la sección Mensajes fallidos, selecciona Habilitar mensajes fallidos.

  6. Elige o crea un tema de mensajes fallidos en el menú desplegable.

  7. Si el tema de mensajes fallidos elegido no tiene ninguna suscripción, el sistema te pedirá que crees una.

  8. En el campo Número máximo de intentos de entrega, especifica un número entero entre 5 y 100.

  9. Haz clic en Crear.

  10. En el panel de detalles se muestra una lista de posibles tareas. Si alguno de los elementos muestra un icono de error , haz clic en el elemento de acción para resolver el problema.

gcloud

Para crear una suscripción y definir un tema de mensajes fallidos, usa el comando gcloud pubsub subscriptions create

gcloud pubsub subscriptions create subscription-id \
  --topic=topic-id \
  --dead-letter-topic=dead-letter-topic-name \
  [--max-delivery-attempts=max-delivery-attempts] \
  [--dead-letter-topic-project=dead-letter-topic-project]

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& topic_id,
   std::string const& subscription_id,
   std::string const& dead_letter_topic_id,
   int dead_letter_delivery_attempts) {
  google::pubsub::v1::Subscription request;
  request.set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.set_topic(pubsub::Topic(project_id, topic_id).FullName());
  request.mutable_dead_letter_policy()->set_dead_letter_topic(
      pubsub::Topic(project_id, dead_letter_topic_id).FullName());
  request.mutable_dead_letter_policy()->set_max_delivery_attempts(
      dead_letter_delivery_attempts);
  auto sub = client.CreateSubscription(request);
  if (sub.status().code() == google::cloud::StatusCode::kAlreadyExists) {
    std::cout << "The subscription already exists\n";
    return;
  }
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription was successfully created: "
            << sub->DebugString() << "\n";

  std::cout << "It will forward dead letter messages to: "
            << sub->dead_letter_policy().dead_letter_topic() << "\n";

  std::cout << "After " << sub->dead_letter_policy().max_delivery_attempts()
            << " delivery attempts.\n";
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Cloud.PubSub.V1;
using System;

public class CreateSubscriptionWithDeadLetterPolicySample
{
    public Subscription CreateSubscriptionWithDeadLetterPolicy(string projectId, string topicId, string subscriptionId, string deadLetterTopicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is the subscription you want to create with a dead letter policy.
        var subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        // This is an existing topic that you want to attach the subscription with dead letter policy to.
        var topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing topic that the subscription with dead letter policy forwards dead letter messages to.
        var deadLetterTopic = TopicName.FromProjectTopic(projectId, deadLetterTopicId).ToString();
        var subscriptionRequest = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = new DeadLetterPolicy
            {
                DeadLetterTopic = deadLetterTopic,
                // The maximum number of times that the service attempts to deliver a
                // message before forwarding it to the dead letter topic. Must be [5-100].
                MaxDeliveryAttempts = 10
            },
            AckDeadlineSeconds = 30
        };

        var subscription = subscriber.CreateSubscription(subscriptionRequest);
        Console.WriteLine("Created subscription: " + subscription.SubscriptionName.SubscriptionId);
        Console.WriteLine($"It will forward dead letter messages to: {subscription.DeadLetterPolicy.DeadLetterTopic}");
        Console.WriteLine($"After {subscription.DeadLetterPolicy.MaxDeliveryAttempts} delivery attempts.");
        // Remember to attach a subscription to the dead letter topic because
        // messages published to a topic with no subscriptions are lost.
        return subscription;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
)

// createSubWithDeadLetter creates a subscription with a dead letter policy.
func createSubWithDeadLetter(w io.Writer, projectID, topic, subscription, fullyQualifiedDeadLetterTopic string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project-id/subscriptions/my-sub"
	// fullyQualifiedDeadLetterTopic := "projects/my-project-id/topics/my-dead-letter-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	_, err = client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
		Name:  subscription,
		Topic: topic,
		DeadLetterPolicy: &pubsubpb.DeadLetterPolicy{
			DeadLetterTopic:     fullyQualifiedDeadLetterTopic,
			MaxDeliveryAttempts: 10,
		},
	})
	if err != nil {
		return fmt.Errorf("CreateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Created subscription with dead letter topic: (%s)\n", fullyQualifiedDeadLetterTopic)
	fmt.Fprintln(w, "To process dead letter messages, remember to add a subscription to your dead letter topic.")
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.

import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.Subscription;
import java.io.IOException;

public class CreateSubscriptionWithDeadLetterPolicyExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is the subscription you want to create with a dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that you want to attach the subscription with dead letter policy
    // to.
    String topicId = "your-topic-id";
    // This is an existing topic that the subscription with dead letter policy forwards dead letter
    // messages to.
    String deadLetterTopicId = "your-dead-letter-topic-id";

    CreateSubscriptionWithDeadLetterPolicyExample.createSubscriptionWithDeadLetterPolicyExample(
        projectId, subscriptionId, topicId, deadLetterTopicId);
  }

  public static void createSubscriptionWithDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);
      ProjectTopicName deadLetterTopicName = ProjectTopicName.of(projectId, deadLetterTopicId);

      DeadLetterPolicy deadLetterPolicy =
          DeadLetterPolicy.newBuilder()
              .setDeadLetterTopic(deadLetterTopicName.toString())
              // The maximum number of times that the service attempts to deliver a
              // message before forwarding it to the dead letter topic. Must be [5-100].
              .setMaxDeliveryAttempts(10)
              .build();

      Subscription request =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setDeadLetterPolicy(deadLetterPolicy)
              .build();

      Subscription subscription = subscriptionAdminClient.createSubscription(request);

      System.out.println("Created subscription: " + subscription.getName());
      System.out.println(
          "It will forward dead letter messages to: "
              + subscription.getDeadLetterPolicy().getDeadLetterTopic());
      System.out.println(
          "After "
              + subscription.getDeadLetterPolicy().getMaxDeliveryAttempts()
              + " delivery attempts.");
      // Remember to attach a subscription to the dead letter topic because
      // messages published to a topic with no subscriptions are lost.
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const deadLetterTopicNameOrId = 'YOUR_DEAD_LETTER_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithDeadLetterPolicy(
  topicNameOrId,
  subscriptionNameOrId,
  deadLetterTopicNameOrId,
) {
  // Creates a new subscription
  const options = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(deadLetterTopicNameOrId).name,
      maxDeliveryAttempts: 10,
    },
  };
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(
    `Created subscription ${subscriptionNameOrId} with dead letter topic ${deadLetterTopicNameOrId}.`,
  );
  console.log(
    'To process dead letter messages, remember to add a subscription to your dead letter topic.',
  );
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const deadLetterTopicNameOrId = 'YOUR_DEAD_LETTER_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, CreateSubscriptionOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function createSubscriptionWithDeadLetterPolicy(
  topicNameOrId: string,
  subscriptionNameOrId: string,
  deadLetterTopicNameOrId: string,
) {
  // Creates a new subscription
  const options: CreateSubscriptionOptions = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(deadLetterTopicNameOrId).name,
      maxDeliveryAttempts: 10,
    },
  };
  await pubSubClient
    .topic(topicNameOrId)
    .createSubscription(subscriptionNameOrId, options);
  console.log(
    `Created subscription ${subscriptionNameOrId} with dead letter topic ${deadLetterTopicNameOrId}.`,
  );
  console.log(
    'To process dead letter messages, remember to add a subscription to your dead letter topic.',
  );
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Creates a Pub/Sub subscription with dead letter policy enabled.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $deadLetterTopicName The Pub/Sub topic to use for dead letter policy.
 */
function dead_letter_create_subscription($projectId, $topicName, $subscriptionName, $deadLetterTopicName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $deadLetterTopic = $pubsub->topic($deadLetterTopicName);

    $subscription = $topic->subscribe($subscriptionName, [
        'deadLetterPolicy' => [
            'deadLetterTopic' => $deadLetterTopic
        ]
    ]);

    printf(
        'Subscription %s created with dead letter topic %s' . PHP_EOL,
        $subscription->name(),
        $deadLetterTopic->name()
    );
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy

# TODO(developer)
# project_id = "your-project-id"
# endpoint = "https://my-test-project.appspot.com/push"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

with subscriber:
    request = {
        "name": subscription_path,
        "topic": topic_path,
        "dead_letter_policy": dead_letter_policy,
    }
    subscription = subscriber.create_subscription(request)

print(f"Subscription created: {subscription.name}")
print(
    f"It will forward dead letter messages to: {subscription.dead_letter_policy.dead_letter_topic}."
)
print(
    f"After {subscription.dead_letter_policy.max_delivery_attempts} delivery attempts."
)

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Ruby Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

# topic_id             = "your-topic-id"
# subscription_id      = "your-subscription-id"
# dead_letter_topic_id = "your-dead-letter-topic-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.create_subscription \
  name: pubsub.subscription_path(subscription_id),
  topic: pubsub.topic_path(topic_id),
  dead_letter_policy: {
    dead_letter_topic: pubsub.topic_path(dead_letter_topic_id),
    max_delivery_attempts: 10
  }

puts "Created subscription #{subscription_id} with dead letter topic " \
     "#{dead_letter_topic_id}."
puts "To process dead letter messages, remember to add a subscription to " \
     "your dead letter topic."

Definir un tema de mensajes fallidos para una suscripción

Puedes actualizar una suscripción y definir un tema de mensajes fallidos mediante laGoogle Cloud consola, la CLI de Google Cloud, las bibliotecas de cliente o la API de Pub/Sub.

Consola

Para actualizar una suscripción y definir un tema de mensajes fallidos, sigue estos pasos.

  1. En la Google Cloud consola, ve a la página Suscripciones.

    Ir a Suscripciones

  2. Junto a la suscripción que quieras actualizar, haz clic en Más acciones.

  3. En el menú contextual, selecciona Editar.

    El menú contextual con la opción Editar resaltada.

  4. En la sección Mensajes fallidos, selecciona Habilitar mensajes fallidos.

  5. Elige o crea un tema en el menú desplegable.

  6. Si el tema elegido no tiene ninguna suscripción, el sistema te pedirá que crees una.

  7. En el campo Número máximo de intentos de entrega, especifica un número entero entre 5 y 100.

  8. Haz clic en Actualizar.

  9. En el panel de detalles se muestra una lista de posibles tareas. Si alguno de los elementos muestra un icono de error , haz clic en el elemento de acción para resolver el problema.

gcloud

Para actualizar una suscripción y definir un tema de mensajes fallidos, usa el comando gcloud pubsub subscriptions update:

gcloud pubsub subscriptions update subscription-id \
  --dead-letter-topic=dead-letter-topic-name \
  [--max-delivery-attempts=max-delivery-attempts] \
  [--dead-letter-topic-project=dead-letter-topic-project]

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& subscription_id,
   std::string const& dead_letter_topic_id,
   int dead_letter_delivery_attempts) {
  google::pubsub::v1::UpdateSubscriptionRequest request;
  request.mutable_subscription()->set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.mutable_subscription()
      ->mutable_dead_letter_policy()
      ->set_dead_letter_topic(
          pubsub::Topic(project_id, dead_letter_topic_id).FullName());
  request.mutable_subscription()
      ->mutable_dead_letter_policy()
      ->set_max_delivery_attempts(dead_letter_delivery_attempts);
  *request.mutable_update_mask()->add_paths() = "dead_letter_policy";
  auto sub = client.UpdateSubscription(request);
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription has been updated to: " << sub->DebugString()
            << "\n";

  std::cout << "It will forward dead letter messages to: "
            << sub->dead_letter_policy().dead_letter_topic() << "\n";

  std::cout << "After " << sub->dead_letter_policy().max_delivery_attempts()
            << " delivery attempts.\n";
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;

public class UpdateDeadLetterPolicySample
{
    public Subscription UpdateDeadLetterPolicy(string projectId, string topicId, string subscriptionId, string deadLetterTopicId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is an existing topic that the subscription with dead letter policy is attached to.
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing subscription with a dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        // This is an existing dead letter topic that the subscription with dead letter policy forwards
        // dead letter messages to.
        var deadLetterTopic = TopicName.FromProjectTopic(projectId, deadLetterTopicId).ToString();


        // Construct the subscription with the dead letter policy you expect to have after the update.
        // Here, values in the required fields (name, topic) help identify the subscription.
        var subscription = new Subscription
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = new DeadLetterPolicy
            {
                DeadLetterTopic = deadLetterTopic,
                MaxDeliveryAttempts = 20,
            }
        };

        var request = new UpdateSubscriptionRequest
        {
            Subscription = subscription,
            // Construct a field mask to indicate which field to update in the subscription.
            UpdateMask = new FieldMask { Paths = { "dead_letter_policy" } }
        };
        var updatedSubscription = subscriber.UpdateSubscription(request);
        return updatedSubscription;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

// updateDeadLetter updates an existing subscription with a dead letter policy.
func updateDeadLetter(w io.Writer, projectID, subscription, deadLetterTopic string) error {
	// projectID := "my-project-id"
	// subID := "projects/my-project-id/subscriptions/my-sub"
	// deadLetterTopic := "projects/my-project-id/topics/my-dead-letter-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.UpdateSubscription(ctx, &pubsubpb.UpdateSubscriptionRequest{
		Subscription: &pubsubpb.Subscription{
			Name: subscription,
			DeadLetterPolicy: &pubsubpb.DeadLetterPolicy{
				MaxDeliveryAttempts: 20,
				DeadLetterTopic:     deadLetterTopic,
			},
		},
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"dead_letter_policy"},
		},
	})
	if err != nil {
		return fmt.Errorf("UpdateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Updated subscription: %+v\n", sub)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.DeadLetterPolicy;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateSubscriptionRequest;
import java.io.IOException;

public class UpdateDeadLetterPolicyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with a dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that the subscription with dead letter policy is attached to.
    String topicId = "your-topic-id";
    // This is an existing dead letter topic that the subscription with dead letter policy forwards
    // dead letter messages to.
    String deadLetterTopicId = "your-dead-letter-topic-id";

    UpdateDeadLetterPolicyExample.updateDeadLetterPolicyExample(
        projectId, subscriptionId, topicId, deadLetterTopicId);
  }

  public static void updateDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId, String deadLetterTopicId)
      throws IOException {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);

      System.out.println(
          "Before: " + subscriptionAdminClient.getSubscription(subscriptionName).getAllFields());

      TopicName topicName = TopicName.of(projectId, topicId);
      TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId);

      // Construct the dead letter policy you expect to have after the update.
      DeadLetterPolicy deadLetterPolicy =
          DeadLetterPolicy.newBuilder()
              .setDeadLetterTopic(deadLetterTopicName.toString())
              .setMaxDeliveryAttempts(20)
              .build();

      // Construct the subscription with the dead letter policy you expect to have
      // after the update. Here, values in the required fields (name, topic) help
      // identify the subscription.
      Subscription subscription =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .setDeadLetterPolicy(deadLetterPolicy)
              .build();

      // Construct a field mask to indicate which field to update in the subscription.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("dead_letter_policy.max_delivery_attempts").build();

      UpdateSubscriptionRequest request =
          UpdateSubscriptionRequest.newBuilder()
              .setSubscription(subscription)
              .setUpdateMask(updateMask)
              .build();

      Subscription response = subscriptionAdminClient.updateSubscription(request);

      System.out.println("After: " + response.getAllFields());
      System.out.println(
          "Max delivery attempts is now "
              + response.getDeadLetterPolicy().getMaxDeliveryAttempts());
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateDeadLetterPolicy(topicNameOrId, subscriptionNameOrId) {
  const metadata = {
    deadLetterPolicy: {
      deadLetterTopic: pubSubClient.topic(topicNameOrId).name,
      maxDeliveryAttempts: 15,
    },
  };

  await pubSubClient
    .topic(topicNameOrId)
    .subscription(subscriptionNameOrId)
    .setMetadata(metadata);

  console.log('Max delivery attempts updated successfully.');
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Set the dead letter policy on an existing subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $deadLetterTopicName The Pub/Sub topic to use for dead letter policy.
 */
function dead_letter_update_subscription(
    string $projectId,
    string $topicName,
    string $subscriptionName,
    string $deadLetterTopicName
): void {
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $deadLetterTopic = $pubsub->topic($deadLetterTopicName);

    $subscription = $topic->subscription($subscriptionName);
    $subscription->update([
        'deadLetterPolicy' => [
            'deadLetterTopic' => $deadLetterTopic
        ]
    ]);

    printf(
        'Subscription %s updated with dead letter topic %s' . PHP_EOL,
        $subscription->name(),
        $deadLetterTopic->name()
    );
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask

# TODO(developer)
# project_id = "your-project-id"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"
# TODO(developer): This is an existing dead letter topic that the subscription
# with dead letter policy will forward dead letter messages to.
# dead_letter_topic_id = "your-dead-letter-topic-id"
# TODO(developer): This is the maximum number of delivery attempts allowed
# for a message before it gets delivered to a dead letter topic.
# max_delivery_attempts = 5

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)
dead_letter_topic_path = publisher.topic_path(project_id, dead_letter_topic_id)

subscription_before_update = subscriber.get_subscription(
    request={"subscription": subscription_path}
)
print(f"Before the update: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct a dead letter policy you expect to have after the update.
dead_letter_policy = DeadLetterPolicy(
    dead_letter_topic=dead_letter_topic_path,
    max_delivery_attempts=max_delivery_attempts,
)

# Construct the subscription with the dead letter policy you expect to have
# after the update. Here, values in the required fields (name, topic) help
# identify the subscription.
subscription = pubsub_v1.types.Subscription(
    name=subscription_path,
    topic=topic_path,
    dead_letter_policy=dead_letter_policy,
)

with subscriber:
    subscription_after_update: gapic_types.Subscription = (
        subscriber.update_subscription(
            request={"subscription": subscription, "update_mask": update_mask}
        )
    )

print(f"After the update: {subscription_after_update}.")

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Ruby Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

# subscription_id       = "your-subscription-id"
# role                  = "roles/pubsub.publisher"
# service_account_email =
# "serviceAccount:account_name@project_name.iam.gserviceaccount.com"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.get_subscription \
  subscription: pubsub.subscription_path(subscription_id)
subscription.dead_letter_policy.max_delivery_attempts = 20

subscription_admin.update_subscription subscription: subscription,
                                       update_mask: {
                                         paths: ["dead_letter_policy"]
                                       }

puts "Max delivery attempts is now " \
     "#{subscription.dead_letter_policy.max_delivery_attempts}."

Conceder roles de gestión de identidades y accesos para usar temas de mensajes fallidos

Para reenviar mensajes que no se pueden entregar a un tema de mensajes fallidos, Pub/Sub debe tener permiso para hacer lo siguiente:

  • Publica mensajes en el tema.
  • Confirma los mensajes para quitarlos de la suscripción.

Pub/Sub crea y mantiene una cuenta de servicio para cada proyecto: service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com. Puedes conceder permisos de reenvío asignando los roles de editor y suscriptor a esta cuenta de servicio.

Consola

Para conceder permiso a Pub/Sub para publicar mensajes en un tema de mensajes fallidos, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Suscripciones.

    Ir a Suscripciones

  2. Haz clic en el nombre de la suscripción que tenga el tema de mensajes fallidos.

  3. Haz clic en la pestaña Buzón de correos.

  4. Para asignar un rol de editor, haz clic en Conceder rol de editor. Si el rol de editor se asigna correctamente, verás una marca de verificación azul .

  5. Para asignar un rol de suscriptor, haz clic en Conceder rol de suscriptor. Si el rol de editor se asigna correctamente, verás una marca de verificación azul .

gcloud

Para conceder permiso a Pub/Sub para publicar mensajes en un tema de mensajes fallidos, ejecuta el siguiente comando:

PUBSUB_SERVICE_ACCOUNT="service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub topics add-iam-policy-binding dead-letter-topic-name \
  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
  --role="roles/pubsub.publisher"

Para conceder permiso a Pub/Sub para confirmar los mensajes reenviados que no se han podido entregar, ejecuta el siguiente comando:

PUBSUB_SERVICE_ACCOUNT="service-project-number@gcp-sa-pubsub.iam.gserviceaccount.com"

gcloud pubsub subscriptions add-iam-policy-binding subscription-id \
  --member="serviceAccount:$PUBSUB_SERVICE_ACCOUNT"\
  --role="roles/pubsub.subscriber"

Hacer un seguimiento de los intentos de entrega

Después de habilitar un tema de mensajes fallidos para una suscripción, cada mensaje de esa suscripción tiene un campo que especifica el número de intentos de entrega:

  • Los mensajes recibidos de una suscripción de extracción incluyen el campo delivery_attempt.

  • Los mensajes recibidos de una suscripción push incluyen el campo deliveryAttempt.

En los siguientes ejemplos se muestra cómo obtener el número de intentos de entrega:

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::cout << "Delivery attempt: " << h.delivery_attempt() << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Cloud.PubSub.V1;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncWithDeliveryAttemptsSample
{
    public async Task<int> PullMessagesAsyncWithDeliveryAttempts(string projectId, string subscriptionId, bool acknowledge)
    {
        // This is an existing subscription with a dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);

        int deliveryAttempt = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            System.Console.WriteLine($"Delivery Attempt: {message.GetDeliveryAttempt()}");
            if (message.GetDeliveryAttempt() != null)
            {
                deliveryAttempt = message.GetDeliveryAttempt().Value;
            }
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return deliveryAttempt;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
)

func pullMsgsDeadLetterDeliveryAttempt(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subID)
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		// When dead lettering is enabled, the delivery attempt field is a pointer to the
		// the number of times the service has attempted to delivery a message.
		// Otherwise, the field is nil.
		if msg.DeliveryAttempt != nil {
			fmt.Fprintf(w, "message: %s, delivery attempts: %d", msg.Data, *msg.DeliveryAttempt)
		}
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("got error in Receive: %w", err)
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ReceiveMessagesWithDeliveryAttemptsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with a dead letter policy.
    String subscriptionId = "your-subscription-id";

    ReceiveMessagesWithDeliveryAttemptsExample.receiveMessagesWithDeliveryAttemptsExample(
        projectId, subscriptionId);
  }

  public static void receiveMessagesWithDeliveryAttemptsExample(
      String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        new MessageReceiver() {
          @Override
          public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            // Handle incoming message, then ack the received message.
            System.out.println("Id: " + message.getMessageId());
            System.out.println("Data: " + message.getData().toStringUtf8());
            System.out.println("Delivery Attempt: " + Subscriber.getDeliveryAttempt(message));
            consumer.ack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithDeliveryAttempts(
  projectId,
  subscriptionNameOrId,
) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,
  };

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${message.message.data}`);
    console.log(`Delivery Attempt: ${message.deliveryAttempt}`);
    if (message.ackId) {
      ackIds.push(message.ackId);
    }
  }

  // Acknowledge all of the messages. You could also acknowledge
  // these individually, but this is more efficient.
  const ackRequest = {
    subscription: formattedSubscription,
    ackIds: ackIds,
  };
  await subClient.acknowledge(ackRequest);

  console.log('Done.');
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\Message;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Get the delivery attempt from a pulled message.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 * @param string $message The contents of a pubsub message data field.
 */
function dead_letter_delivery_attempt($projectId, $topicName, $subscriptionName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);

    // publish test message
    $topic->publish(new Message([
        'data' => $message
    ]));

    $subscription = $topic->subscription($subscriptionName);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        printf('Received message %s' . PHP_EOL, $message->data());
        printf('Delivery attempt %d' . PHP_EOL, $message->deliveryAttempt());
    }
    print('Done' . PHP_EOL);
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    print(f"With delivery attempts: {message.delivery_attempt}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscriber = pubsub.subscriber subscription_id

subscriber.pull(immediate: false).each do |message|
  puts "Received message: #{message.data}"
  puts "Delivery Attempt: #{message.delivery_attempt}"
  message.acknowledge!
end

Cuando Pub/Sub reenvía un mensaje que no se puede entregar a un tema de mensajes fallidos, añade los siguientes atributos al mensaje:

  • CloudPubSubDeadLetterSourceDeliveryCount: número de intentos de entrega a la suscripción de origen.
  • CloudPubSubDeadLetterSourceSubscription: nombre de la suscripción de origen.
  • CloudPubSubDeadLetterSourceSubscriptionProject: nombre del proyecto que contiene la suscripción de origen.
  • CloudPubSubDeadLetterSourceTopicPublishTime: marca de tiempo en la que se publicó originalmente el mensaje.
  • CloudPubSubDeadLetterSourceDeliveryErrorMessage: el motivo por el que no se ha podido entregar el mensaje al destino original. El atributo solo se utiliza para exportar suscripciones.

Monitorizar mensajes reenviados

Después de reenviar un mensaje que no se puede entregar, el servicio Pub/Sub elimina el mensaje de la suscripción. Puedes monitorizar los mensajes reenviados con Cloud Monitoring.

Si asocias una suscripción al tema de mensajes fallidos, los mensajes usarán la política de vencimiento de la suscripción asociada en lugar del periodo de vencimiento de la suscripción con la propiedad del tema de mensajes fallidos.

La subscription/dead_letter_message_count métrica registra el número de mensajes que no se pueden entregar y que Pub/Sub reenvía desde una suscripción.

Para obtener más información, consulta el artículo Monitorizar mensajes no entregados reenviados.

Eliminar un tema de mensajes fallidos

Para dejar de reenviar mensajes que no se pueden entregar, elimina el tema de mensajes fallidos de la suscripción.

Puedes quitar un tema de mensajes fallidos de una suscripción mediante laGoogle Cloud consola, la CLI de Google Cloud o la API Pub/Sub.

Consola

Para quitar un tema de mensajes fallidos de una suscripción, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Suscripciones.

    Ir a Suscripciones

  2. En la lista de suscripciones, haz clic en junto a la suscripción que quieras actualizar.

  3. En el menú contextual, selecciona Editar.

    El menú contextual con la opción Editar resaltada.

  4. En la sección Mensajes fallidos, desmarca Habilitar mensajes fallidos.

  5. Haz clic en Actualizar.

gcloud

Para quitar un tema de mensajes fallidos de una suscripción, usa el comando gcloud pubsub subscriptions update y la marca --clear-dead-letter-policy:

gcloud pubsub subscriptions update subscription-id \
  --clear-dead-letter-policy

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub_admin = ::google::cloud::pubsub_admin;
namespace pubsub = ::google::cloud::pubsub;
[](pubsub_admin::SubscriptionAdminClient client,
   std::string const& project_id, std::string const& subscription_id) {
  google::pubsub::v1::UpdateSubscriptionRequest request;
  request.mutable_subscription()->set_name(
      pubsub::Subscription(project_id, subscription_id).FullName());
  request.mutable_subscription()->clear_dead_letter_policy();
  *request.mutable_update_mask()->add_paths() = "dead_letter_policy";
  auto sub = client.UpdateSubscription(request);
  if (!sub) throw std::move(sub).status();

  std::cout << "The subscription has been updated to: " << sub->DebugString()
            << "\n";
}

C#

Antes de probar este ejemplo, sigue las instrucciones de configuración de C# que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C# de Pub/Sub.


using Google.Cloud.PubSub.V1;
using Google.Protobuf.WellKnownTypes;

public class RemoveDeadLetterPolicySample
{
    public Subscription RemoveDeadLetterPolicy(string projectId, string topicId, string subscriptionId)
    {
        SubscriberServiceApiClient subscriber = SubscriberServiceApiClient.Create();
        // This is an existing topic that the subscription with dead letter policy is attached to.
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        // This is an existing subscription with dead letter policy.
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        var subscription = new Subscription()
        {
            SubscriptionName = subscriptionName,
            TopicAsTopicName = topicName,
            DeadLetterPolicy = null
        };

        var request = new UpdateSubscriptionRequest
        {
            Subscription = subscription,
            UpdateMask = new FieldMask { Paths = { "dead_letter_policy" } }
        };
        var updatedSubscription = subscriber.UpdateSubscription(request);
        return updatedSubscription;
    }
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/protobuf/types/known/fieldmaskpb"
)

// removeDeadLetterTopic removes the dead letter policy from a subscription.
func removeDeadLetterTopic(w io.Writer, projectID, subscriptionName string) error {
	// projectID := "my-project-id"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub, err := client.SubscriptionAdminClient.UpdateSubscription(ctx, &pubsubpb.UpdateSubscriptionRequest{
		Subscription: &pubsubpb.Subscription{
			Name:             subscriptionName,
			DeadLetterPolicy: nil, // alternatively, you can omit this line entirely.
		},
		UpdateMask: &fieldmaskpb.FieldMask{
			Paths: []string{"dead_letter_policy"},
		},
	})
	if err != nil {
		return fmt.Errorf("UpdateSubscription: %w", err)
	}
	fmt.Fprintf(w, "Updated subscription: %+v\n", sub)
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateSubscriptionRequest;

public class RemoveDeadLetterPolicyExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // This is an existing subscription with dead letter policy.
    String subscriptionId = "your-subscription-id";
    // This is an existing topic that the subscription with dead letter policy is attached to.
    String topicId = "your-topic-id";

    RemoveDeadLetterPolicyExample.removeDeadLetterPolicyExample(projectId, subscriptionId, topicId);
  }

  public static void removeDeadLetterPolicyExample(
      String projectId, String subscriptionId, String topicId) throws Exception {
    try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {

      ProjectSubscriptionName subscriptionName =
          ProjectSubscriptionName.of(projectId, subscriptionId);
      TopicName topicName = TopicName.of(projectId, topicId);

      // Construct the subscription you expect to have after the request. Here,
      // values in the required fields (name, topic) help identify the subscription.
      // No dead letter policy is supplied.
      Subscription expectedSubscription =
          Subscription.newBuilder()
              .setName(subscriptionName.toString())
              .setTopic(topicName.toString())
              .build();

      // Construct a field mask to indicate which field to update in the subscription.
      FieldMask updateMask = FieldMask.newBuilder().addPaths("dead_letter_policy").build();

      UpdateSubscriptionRequest request =
          UpdateSubscriptionRequest.newBuilder()
              .setSubscription(expectedSubscription)
              .setUpdateMask(updateMask)
              .build();

      Subscription response = subscriptionAdminClient.updateSubscription(request);

      // You should see an empty dead letter topic field inside the dead letter policy.
      System.out.println("After: " + response.getAllFields());
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function removeDeadLetterPolicy(topicNameOrId, subscriptionNameOrId) {
  const metadata = {
    deadLetterPolicy: null,
  };

  await pubSubClient
    .topic(topicNameOrId)
    .subscription(subscriptionNameOrId)
    .setMetadata(metadata);

  console.log(
    `Removed dead letter topic from ${subscriptionNameOrId} subscription.`,
  );
}

PHP

Antes de probar este ejemplo, sigue las instrucciones de configuración de PHP que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Pub/Sub para PHP.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Remove dead letter policy from an existing subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function dead_letter_remove($projectId, $topicName, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);

    $subscription = $topic->subscription($subscriptionName);

    // Provide deadLetterPolicy in the update mask, but omit from update fields to unset.
    $subscription->update([], [
        'updateMask' => [
            'deadLetterPolicy'
        ]
    ]);

    printf(
        'Removed dead letter topic from subscription %s' . PHP_EOL,
        $subscription->name()
    );
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import FieldMask

# TODO(developer)
# project_id = "your-project-id"
# TODO(developer): This is an existing topic that the subscription
# with dead letter policy is attached to.
# topic_id = "your-topic-id"
# TODO(developer): This is an existing subscription with a dead letter policy.
# subscription_id = "your-subscription-id"

publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
topic_path = publisher.topic_path(project_id, topic_id)
subscription_path = subscriber.subscription_path(project_id, subscription_id)

subscription_before_update = subscriber.get_subscription(
    request={"subscription": subscription_path}
)
print(f"Before removing the policy: {subscription_before_update}.")

# Indicates which fields in the provided subscription to update.
update_mask = FieldMask(paths=["dead_letter_policy"])

# Construct the subscription (without any dead letter policy) that you
# expect to have after the update.
subscription = pubsub_v1.types.Subscription(
    name=subscription_path, topic=topic_path
)

with subscriber:
    subscription_after_update: gapic_types.Subscription = (
        subscriber.update_subscription(
            request={"subscription": subscription, "update_mask": update_mask}
        )
    )

print(f"After removing the policy: {subscription_after_update}.")

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

Ruby

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido de Pub/Sub con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Ruby Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::PubSub.new
subscription_admin = pubsub.subscription_admin

subscription = subscription_admin.get_subscription \
  subscription: pubsub.subscription_path(subscription_id)
subscription.dead_letter_policy = nil

subscription_admin.update_subscription subscription: subscription,
                                       update_mask: {
                                         paths: ["dead_letter_policy"]
                                       }

puts "Removed dead letter topic from #{subscription_id} subscription."

Precios

Cuando el servicio Pub/Sub reenvía mensajes que no se pueden entregar, se aplican las siguientes tarifas:

  • Las tarifas de publicación se facturan a la cuenta de facturación asociada al proyecto que contiene el tema de mensajes fallidos.
  • Las tarifas de suscripción de los mensajes salientes se facturan a la cuenta de facturación asociada al proyecto que contiene la suscripción con la propiedad de tema de mensajes fallidos.

Si defines la propiedad del tema de mensajes fallidos de una suscripción, pero la política de ubicación de almacenamiento de mensajes del tema de mensajes fallidos no permite la región que contiene la suscripción, también se aplicarán tarifas de publicación a los mensajes salientes.

Las tarifas de publicación de los mensajes salientes se facturan al proyecto que contiene el tema de mensajes fallidos. Para obtener más información, consulta los precios.

Siguientes pasos