Dépannage d'ordre général

Découvrez les étapes de dépannage qui pourraient vous être utiles si vous rencontrez des problèmes avec Pub/Sub.

Impossible de créer un sujet

Vérifiez que vous disposez des autorisations nécessaires. Pour créer un sujet Pub/Sub, vous devez disposer du rôle Éditeur Pub/Sub (roles/pubsub.editor) Identity and Access Management au projet. Si vous ne disposez pas de ce rôle, contactez votre administrateur. Pour obtenir des informations de dépannage supplémentaires sur ces sujets, consultez les pages suivantes:

Impossible de créer un abonnement

Assurez-vous d'avoir effectué les opérations suivantes:

  • Vérifiez que vous disposez des autorisations nécessaires. Pour créer un abonnement Pub/Sub, vous devez disposer du rôle IAM Éditeur Pub/Sub (roles/pubsub.editor) sur le projet. Si vous ne disposez pas de ce rôle, contactez votre administrateur.

  • Vous avez indiqué un nom pour l'abonnement.

  • Vous avez spécifié le nom d'un sujet existant auquel vous souhaitez associer l'abonnement.

  • Si vous créez un abonnement push, vous avez spécifié https:// en minuscule (et non http:// ou HTTPS://) en tant que protocole pour votre URL de réception dans le champ pushEndpoint.

Pour obtenir des informations de dépannage supplémentaires sur les abonnements, consultez les pages suivantes:

Résoudre les problèmes d'autorisation

Les autorisations Pub/Sub contrôlent les utilisateurs et les comptes de service autorisés à effectuer des actions sur vos ressources Pub/Sub. Lorsque les autorisations sont mal configurées, cela peut entraîner des erreurs d'autorisation refusée et perturber le flux de messages. Les journaux d'audit fournissent un enregistrement détaillé de toutes les modifications d'autorisations, ce qui vous permet d'identifier la source de ces problèmes.

Pour résoudre les problèmes d'autorisation Pub/Sub avec les journaux d'audit:

  1. Obtenez les autorisations requises pour afficher l'explorateur de journaux.

    Pour plus d'informations, consultez la section Avant de commencer.

  2. Dans la console Google Cloud , accédez à la page Explorateur de journaux.

    Accéder à l'explorateur de journaux

  3. Sélectionnez un projet, un dossier ou une organisation Google Cloud existant.

  4. Voici une liste de filtres que vous pouvez utiliser pour trouver les journaux pertinents:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription" : utilisez cette requête comme point de départ lorsque vous résolvez un problème pouvant impliquer des modifications de la configuration d'un sujet ou d'un abonnement, ou du contrôle des accès. Vous pouvez le combiner à d'autres filtres pour affiner davantage votre recherche.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": utilisez cette requête lorsque vous pensez qu'un problème est causé par des autorisations incorrectes ou manquantes. Il vous aide à identifier qui a modifié la stratégie IAM et quelles étaient ces modifications. Cela peut être utile pour résoudre des problèmes tels que des utilisateurs qui ne peuvent pas publier de sujets ni s'abonner à des abonnements, des applications auxquelles l'accès aux ressources Pub/Sub est refusé ou des modifications inattendues du contrôle des accès.

    • protoPayload.status.code=7: utilisez cette requête lorsque vous rencontrez des erreurs explicitement liées aux autorisations. Cela vous permet d'identifier les actions qui échouent et les utilisateurs qui les effectuent. Vous pouvez combiner cette requête avec les précédentes pour identifier la ressource spécifique et la modification de la stratégie IAM pouvant entraîner le refus d'autorisation.

  5. Analysez les journaux pour déterminer des facteurs tels que le code temporel de l'événement, l'entité principale à l'origine de la modification et le type de modifications apportées.

  6. Sur la base des informations recueillies dans les journaux d'audit, vous pouvez prendre des mesures correctives.

Abonnement supprimé

Les abonnements Pub/Sub peuvent être supprimés de deux manières principales:

  • Un utilisateur ou un compte de service disposant d'autorisations suffisantes supprime intentionnellement l'abonnement.

  • Un abonnement est automatiquement supprimé après une période d'inactivité, qui est de 31 jours par défaut. Pour en savoir plus sur le règlement sur l'expiration des abonnements, consultez la section Durée d'expiration.

Pour résoudre les problèmes liés à un abonnement supprimé, procédez comme suit:

  1. Dans la console Google Cloud , accédez à la page "Abonnements Pub/Sub" et vérifiez que l'abonnement n'est plus listé. Pour savoir comment lister des abonnements, consultez Lister un abonnement.

  2. Consultez les journaux d'audit. Accédez à l'explorateur de journaux. Utilisez le filtre protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" pour rechercher les abonnements supprimés. Examinez les journaux pour déterminer si quelqu'un a supprimé l'abonnement ou s'il a été supprimé en raison d'une inactivité. InternalExpireInactiveSubscription indique qu'un abonnement a été supprimé pour cause d'inactivité. Pour en savoir plus sur l'utilisation des journaux d'audit pour le dépannage, consultez la section Résoudre les problèmes Pub/Sub à l'aide des journaux d'audit.

403 (Forbidden) erreur

Si vous rencontrez cette erreur, procédez comme suit :

  • Assurez-vous d'avoir activé l'API Pub/Sub dans la consoleGoogle Cloud .
  • Vérifiez que le principal à l'origine de la requête dispose des autorisations requises sur les ressources de l'API Pub/Sub correspondantes, en particulier si vous vous servez de cette API pour communiquer entre plusieurs projets.

  • Si vous utilisez Dataflow, assurez-vous que {PROJECT_NUMBER}@cloudservices.gserviceaccount.com et le compte de service Compute Engine {PROJECT_NUMBER}-compute@developer.gserviceaccount.com disposent des autorisations requises sur la ressource de l'API Pub/Sub appropriée. Pour en savoir plus, consultez la section Sécurité et autorisations pour Dataflow.

  • Si vous utilisez App Engine, consultez la page des autorisations de votre projet pour savoir si un compte de service App Engine est répertorié en tant qu'éditeur Pub/Sub. Si ce n'est pas le cas, ajoutez votre compte de service App Engine en tant qu'éditeur Pub/Sub. Normalement, le compte de service App Engine est au format suivant : <project-id>@appspot.gserviceaccount.com.

Autres codes d'erreur courants

Pour obtenir la liste d'autres codes d'erreur courants liés à l'API Pub/Sub et leur description, consultez la section Codes d'erreur.

Utiliser des opérations d'administration excessives

Si vous constatez que vous utilisez une trop grande partie de votre quota pour des opérations d'administration, vous devrez peut-être refactoriser votre code. À titre d'illustration, intéressons-nous à ce pseudo-code. Ici, une opération d'administration (GET) est utilisée pour vérifier la présence d'un abonnement avant toute tentative d'utilisation de ses ressources. GET et CREATE sont des opérations d'administration:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

Un modèle plus efficace consiste à essayer de consulter des messages de l'abonnement (en supposant que vous soyez suffisamment sûr du nom de l'abonnement). Dans cette approche optimiste, vous n'accédez à l'abonnement ou ne le créez qu'en cas d'erreur. Considérez l'exemple suivant :

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

Vous pouvez utiliser les exemples de code suivants pour implémenter ce modèle dans la langue de votre choix:

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Go.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// Since the subscription does not exist, create the subscription.
				s, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
					Topic: client.Topic(topicID),
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subID)

				// Pull from the new subscription.
				err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Java.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

Avant d'essayer cet exemple, suivez les instructions de configuration pour Node.js du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub Python.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

Avant d'essayer cet exemple, suivez les instructions de configuration pour C++ du guide de démarrage rapide de Pub/Sub : utiliser les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub C++.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

Node.js (TypeScript)

Avant d'essayer cet exemple, suivez les instructions de configuration de Node.js décrites dans le guide de démarrage rapide de Pub/Sub : utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence sur l'API Pub/Sub pour Node.js.

Pour vous authentifier auprès de Pub/Sub, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}