Solucionar problemas generales

Descubre soluciones que pueden resultarte útiles si tienes algún problema mientras usas Pub/Sub.

No se puede crear un tema

Verifica que tienes los permisos necesarios. Para crear un tema de Pub/Sub, necesitas el rol Editor de Pub/Sub (roles/pubsub.editor) de gestión de identidades y accesos en el proyecto. Si no tienes este rol, ponte en contacto con tu administrador. Para obtener más información sobre cómo solucionar problemas relacionados con los temas, consulta las siguientes páginas:

No se puede crear una suscripción

Comprueba que has hecho lo siguiente:

  • Verifica que tienes los permisos necesarios. Para crear una suscripción de Pub/Sub, necesitas el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.editor) en el proyecto. Si no tienes este rol, ponte en contacto con tu administrador.

  • Especifica un nombre para la suscripción.

  • Se ha especificado el nombre de un tema al que quieres adjuntar la suscripción.

  • Si creas una suscripción push, especifica https:// en minúsculas (no http:// ni HTTPS://) como protocolo de la URL de recepción en el campo pushEndpoint.

Para obtener más información sobre cómo solucionar problemas relacionados con las suscripciones, consulta las siguientes páginas:

Solucionar problemas de permisos

Los permisos de Pub/Sub controlan qué usuarios y cuentas de servicio pueden realizar acciones en tus recursos de Pub/Sub. Si los permisos no están configurados correctamente, pueden producirse errores de denegación de permisos e interrumpir el flujo de mensajes. Los registros de auditoría proporcionan un registro detallado de todos los cambios de permisos, lo que te permite identificar el origen de estos problemas.

Para solucionar problemas de permisos de Pub/Sub con registros de auditoría, sigue estos pasos:

  1. Obtén los permisos necesarios para ver el explorador de registros.

    Para obtener más información, consulta la sección Antes de empezar.

  2. En la Google Cloud consola, ve a la página Explorador de registros.

    Ir a Explorador de registros

  3. Selecciona un proyecto, una carpeta o una organización Google Cloud .

  4. A continuación, se muestra una lista de filtros que puede usar para encontrar los registros pertinentes:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": Usa esta consulta como punto de partida cuando tengas que solucionar algún problema que pueda implicar cambios en las configuraciones de temas o suscripciones, o en el control de acceso. Puedes combinarlo con otros filtros para acotar aún más la búsqueda.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": usa esta consulta cuando sospeches que un problema se debe a permisos incorrectos o que faltan. Te ayuda a hacer un seguimiento de quién ha hecho cambios en la política de gestión de identidades y accesos y cuáles han sido esos cambios. Esto puede ser útil para solucionar problemas, como cuando los usuarios no pueden publicar en temas o suscribirse a suscripciones, cuando se deniega el acceso a recursos de Pub/Sub a las aplicaciones o cuando se producen cambios inesperados en el control de acceso.

    • protoPayload.status.code=7: usa esta consulta cuando se produzcan errores relacionados explícitamente con los permisos. De esta forma, podrás identificar qué acciones fallan y quién intenta llevarlas a cabo. Puedes combinar esta consulta con las anteriores para identificar el recurso específico y el cambio en la política de gestión de identidades y accesos que puede estar provocando la denegación de permisos.

  5. Analiza los registros para determinar factores como la marca de tiempo del evento, la entidad que ha realizado el cambio y el tipo de cambios que se han realizado.

  6. En función de la información recogida en los registros de auditoría, puedes tomar medidas correctivas.

Solucionar problemas de permisos de Terraform

Cuando uses Pub/Sub con Terraform, asigna explícitamente los roles necesarios en tu código de Terraform. Por ejemplo, para publicar, la cuenta de servicio de tu aplicación necesita el rol roles/pubsub.publisher. Si este rol no se define explícitamente en tu código de Terraform, una futura terraform apply podría eliminarlo. Esto suele ocurrir durante actualizaciones no relacionadas, lo que provoca que una aplicación fiable falle de repente con errores PERMISSION_DENIED. Si defines explícitamente el rol en el código, evitarás estas regresiones accidentales.

Se ha eliminado la suscripción

Las suscripciones de Pub/Sub se pueden eliminar de dos formas principales:

  • Un usuario o una cuenta de servicio con los permisos suficientes elimina la suscripción de forma intencionada.

  • Las suscripciones se eliminan automáticamente tras un periodo de inactividad, que es de 31 días de forma predeterminada. Para obtener más información sobre la política de vencimiento de las suscripciones, consulta el artículo Periodo de vencimiento.

Para solucionar problemas con una suscripción eliminada, sigue estos pasos:

  1. En la Google Cloud consola, vaya a la página de suscripciones de Pub/Sub y compruebe que la suscripción ya no aparece. Para obtener más información sobre cómo mostrar suscripciones, consulta Mostrar una suscripción.

  2. Consulta los registros de auditoría. Ve a Explorador de registros. Usa el filtro protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" para encontrar las suscripciones eliminadas. Examina los registros para determinar si alguien ha eliminado la suscripción o si se ha eliminado por inactividad. InternalExpireInactiveSubscription indica que se ha eliminado una suscripción por inactividad. Para obtener más información sobre cómo usar los registros de auditoría para solucionar problemas, consulte Solucionar problemas de Pub/Sub con registros de auditoría.

Error 403 (Forbidden)

Un error 403 suele significar que no tienes los permisos correctos para realizar una acción. Por ejemplo, puede que recibas un error 403 User not authorized al intentar publicar en un tema o extraer datos de una suscripción.

Si aparece este error, haz lo siguiente:

  • Asegúrate de haber habilitado la API Pub/Sub en laGoogle Cloud consola.
  • Asegúrate de que la entidad de seguridad que hace la solicitud tenga los permisos necesarios en los recursos de la API Pub/Sub pertinentes, sobre todo si usas la API Pub/Sub para la comunicación entre proyectos.

  • Si usas Dataflow, asegúrate de que tanto {PROJECT_NUMBER}@cloudservices.gserviceaccount.com como la cuenta de servicio de Compute Engine {PROJECT_NUMBER}-compute@developer.gserviceaccount.com tengan los permisos necesarios en el recurso de la API Pub/Sub correspondiente. Para obtener más información, consulta Seguridad y permisos de los flujos de datos.

  • Si usas App Engine, consulta la página Permisos de tu proyecto para ver si aparece una cuenta de servicio de App Engine como editor de Pub/Sub. Si no es así, añade tu cuenta de servicio de App Engine como editor de Pub/Sub. Normalmente, la cuenta de servicio de App Engine tiene el formato <project-id>@appspot.gserviceaccount.com.

  • Puedes usar los registros de auditoría para solucionar problemas de permisos.

Otros códigos de error habituales

Para ver una lista de otros códigos de error habituales relacionados con la API Pub/Sub y sus descripciones, consulta Códigos de error.

Usar operaciones administrativas excesivas

Si detectas que estás usando demasiado tu cuota de operaciones administrativas, puede que tengas que refactorizar tu código. Para ilustrarlo, veamos este seudocódigo. En este ejemplo, se usa una operación administrativa (GET) para comprobar si hay una suscripción antes de intentar consumir sus recursos. Tanto GET como CREATE son operaciones de administrador:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

Un patrón más eficiente es intentar consumir mensajes de la suscripción (siempre que puedas estar razonablemente seguro del nombre de la suscripción). Con este enfoque optimista, solo obtienes o creas la suscripción si hay un error. Observa el siguiente ejemplo:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

Puedes usar los siguientes ejemplos de código para implementar este patrón en el lenguaje que quieras:

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub/v2"
	"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topic, subscriptionName string) error {
	// projectID := "my-project-id"
	// topic := "projects/my-project-id/topics/my-topic"
	// subscription := "projects/my-project/subscriptions/my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
	// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
	// If a subscription ID is provided, the project ID from the client is used.
	sub := client.Subscriber(subscriptionName)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription assuming it exists.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// If the subscription does not exist, then create the subscription.
				subscription, err := client.SubscriptionAdminClient.CreateSubscription(ctx, &pubsubpb.Subscription{
					Name:  subscriptionName,
					Topic: topic,
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subscriptionName)

				// client.Subscriber can be passed a subscription ID (e.g. "my-sub") or
				// a fully qualified name (e.g. "projects/my-project/subscriptions/my-sub").
				// If a subscription ID is provided, the project ID from the client is used.
				sub = client.Subscriber(subscription.GetName())
				err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.ts

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number,
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId,
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);