Allgemeine Fehlerbehebung

Hier finden Sie nützliche Informationen über die schrittweise Fehlerbehebung in Verbindung mit Pub/Sub.

Thema kann nicht erstellt werden

Prüfen Sie, ob Sie die erforderlichen Berechtigungen haben. Um ein Pub/Sub-Thema zu erstellen, benötigen Sie die Rolle Pub/Sub Editor (roles/pubsub.editor) für Identity and Access Management im Projekt. Wenn Sie diese Rolle nicht haben, wenden Sie sich an Ihren Administrator. Weitere Informationen zur Fehlerbehebung zu den folgenden Themen finden Sie auf den folgenden Seiten:

Abo kann nicht erstellt werden

Prüfen Sie, ob Sie Folgendes getan haben:

  • Prüfen Sie, ob Sie die erforderlichen Berechtigungen haben. Um ein Pub/Sub-Abo zu erstellen, benötigen Sie die IAM-Rolle Pub/Sub Editor (roles/pubsub.editor) für das Projekt. Wenn Sie diese Rolle nicht haben, wenden Sie sich an Ihren Administrator.

  • Geben Sie einen Namen für das Abo an.

  • Der Name eines vorhandenen Themas, mit dem das Abo verknüpft werden soll.

  • Wenn Sie ein Push-Abo erstellen, muss der Wert https:// in Kleinbuchstaben (nicht http:// oder HTTPS://) als Protokoll für Ihre Empfänger-URL im Feld pushEndpoint angegeben werden.

Weitere Informationen zur Fehlerbehebung bei Abos findest du auf den folgenden Seiten:

Fehlerbehebung bei Berechtigungsproblemen

Mit Pub/Sub-Berechtigungen wird festgelegt, welche Nutzer und Dienstkonten Aktionen auf Ihren Pub/Sub-Ressourcen ausführen können. Wenn Berechtigungen falsch konfiguriert sind, kann dies zu Fehlern führen, dass Berechtigungen verweigert werden, und den Nachrichtenfluss stören. Audit-Logs enthalten einen detaillierten Überblick über alle Berechtigungsänderungen, sodass Sie die Ursache dieser Probleme ermitteln können.

So beheben Sie Probleme mit Pub/Sub-Berechtigungen mithilfe von Audit-Logs:

  1. Sie benötigen die erforderlichen Berechtigungen, um den Log-Explorer aufzurufen.

    Weitere Informationen finden Sie unter Vorbereitung.

  2. Rufen Sie in der Google Cloud -Konsole die Seite Log-Explorer auf.

    Zum Log-Explorer

  3. Wählen Sie ein vorhandenes Google Cloud -Projekt, einen Ordner oder eine Organisation aus.

  4. Hier finden Sie eine Liste der Filter, mit denen Sie relevante Protokolle finden können:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": Verwenden Sie diese Abfrage als Ausgangspunkt, wenn Sie Probleme beheben, die Änderungen an der Konfiguration von Themen oder Abos oder an der Zugriffssteuerung betreffen könnten. Sie können ihn mit anderen Filtern kombinieren, um die Suche weiter einzugrenzen.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": Verwenden Sie diese Abfrage, wenn Sie vermuten, dass ein Problem durch falsche oder fehlende Berechtigungen verursacht wird. So können Sie nachvollziehen, wer Änderungen an der IAM-Richtlinie vorgenommen hat und was diese Änderungen waren. Das kann bei der Behebung von Problemen hilfreich sein, z. B. wenn Nutzer keine Nachrichten in Themen veröffentlichen oder keine Abos abonnieren können, Anwendungen keinen Zugriff auf Pub/Sub-Ressourcen haben oder es unerwartete Änderungen bei der Zugriffssteuerung gibt.

    • protoPayload.status.code=7: Verwenden Sie diese Abfrage, wenn Fehler auftreten, die sich ausdrücklich auf Berechtigungen beziehen. So können Sie leichter feststellen, welche Aktionen fehlschlagen und wer sie ausführt. Sie können diese Abfrage mit den vorherigen kombinieren, um die Änderung der jeweiligen Ressource und IAM-Richtlinie zu ermitteln, die möglicherweise die Berechtigungsverweigerung verursacht.

  5. Analysieren Sie die Protokolle, um Faktoren wie den Zeitstempel des Ereignisses, die Person, die die Änderung vorgenommen hat, und die Art der Änderungen zu ermitteln.

  6. Anhand der Informationen aus den Prüfprotokollen können Sie Korrekturmaßnahmen ergreifen.

Abo wurde gelöscht

Pub/Sub-Abos können auf zwei Arten gelöscht werden:

  • Ein Nutzer oder ein Dienstkonto mit ausreichenden Berechtigungen löscht das Abo absichtlich.

  • Ein Abo wird nach einer bestimmten Zeit der Inaktivität automatisch gelöscht. Standardmäßig sind das 31 Tage. Weitere Informationen zur Ablaufrichtlinie für Abos finden Sie unter Ablaufzeitraum.

So beheben Sie Probleme mit einem gelöschten Abo:

  1. Rufen Sie in der Google Cloud -Console die Seite „Pub/Sub-Abos“ auf und prüfen Sie, ob das Abo nicht mehr aufgeführt ist. Weitere Informationen zum Auflisten von Abos finden Sie unter Abos auflisten.

  2. Prüfen Sie die Audit-Logs. Rufen Sie den Log-Explorer auf. Mit dem Filter protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" lassen sich gelöschte Abos finden. Prüfen Sie anhand der Protokolle, ob jemand das Abo gelöscht hat oder ob es aufgrund von Inaktivität gelöscht wurde. InternalExpireInactiveSubscription bedeutet, dass ein Abo aufgrund von Inaktivität gelöscht wurde. Weitere Informationen zur Verwendung von Audit-Logs zur Fehlerbehebung finden Sie unter Pub/Sub-Probleme mit Audit-Logs beheben.

403 (Forbidden) Fehler

Gehen Sie folgendermaßen vor, wenn dieser Fehler angezeigt wird:

  • Achten Sie darauf, dass Sie die Pub/Sub API in derGoogle Cloud -Konsole aktiviert haben.
  • Achten Sie darauf, dass der Teilnehmer, der die Anfrage stellt, die erforderlichen Berechtigungen für die relevanten Pub/Sub API-Ressourcen hat. Dies ist besonders wichtig, wenn Sie die Pub/Sub API für die projektübergreifende Kommunikation verwenden.

  • Wenn Sie Dataflow verwenden, achten Sie darauf, dass sowohl {PROJECT_NUMBER}@cloudservices.gserviceaccount.com als auch das Compute Engine-Dienstkonto {PROJECT_NUMBER}-compute@developer.gserviceaccount.com die erforderlichen Berechtigungen für die entsprechende Pub/Sub API-Ressource haben. Weitere Informationen finden Sie unter Sicherheit und Berechtigungen in Dataflow.

  • Wenn Sie App Engine verwenden, prüfen Sie auf der Seite „Berechtigungen“ Ihres Projekts, ob ein App Engine-Dienstkonto als Pub/Sub-Bearbeiter aufgeführt ist. Wenn dies nicht der Fall ist, fügen Sie Ihr App Engine-Dienstkonto als Pub/Sub-Bearbeiter hinzu. Normalerweise hat das App Engine-Dienstkonto das Format <project-id>@appspot.gserviceaccount.com.

Weitere häufige Fehlercodes

Eine Liste weiterer häufiger Fehlercodes im Zusammenhang mit der Pub/Sub API und ihre Beschreibungen finden Sie unter Fehlercodes.

Übermäßige Verwaltungsvorgänge verwenden

Sollten Sie feststellen, dass Sie zu viel von Ihrem Kontingent für Verwaltungsvorgänge verbrauchen, müssen Sie unter Umständen Ihren Code refaktorieren. Betrachten Sie diesen Pseudocode zur Veranschaulichung. In diesem Beispiel wird mit einem Verwaltungsvorgang (GET) geprüft, ob ein Abo vorhanden ist, bevor versucht wird, seine Ressourcen zu verbrauchen. Sowohl GET als auch CREATE sind Administratorvorgänge:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

Ein effizienteres Muster ist der Versuch, Nachrichten aus dem Abo zu verbrauchen (vorausgesetzt, Sie sind sich beim Namen des Abos einigermaßen sicher). Bei diesem optimistischen Ansatz erhalten oder erstellen Sie das Abo nur, wenn ein Fehler auftritt. Betrachten Sie dieses Beispiel:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

Mit den folgenden Codebeispielen können Sie dieses Muster in der Sprache Ihrer Wahl implementieren:

Go

Folgen Sie der Einrichtungsanleitung für Go in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Go API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// Since the subscription does not exist, create the subscription.
				s, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
					Topic: client.Topic(topicID),
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subID)

				// Pull from the new subscription.
				err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Java API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

Folgen Sie der Einrichtungsanleitung für Node.js in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Folgen Sie der Einrichtungsanleitung für Python in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Python API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

Folgen Sie der Einrichtungsanleitung für C++ in der Kurzanleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub C++ API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

Node.js (TypeScript)

Lesen Sie unter Pub/Sub-Schnellstart-Anleitung: Clientbibliotheken verwenden die Anleitung für die Einrichtung von Node.js, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Pub/Sub Node.js API.

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Pub/Sub zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}