常规问题排查

如果您在使用 Pub/Sub 时遇到问题,可查阅以下实用的问题排查步骤。

无法创建主题

验证您是否拥有必要的权限。如需创建 Pub/Sub 主题,您需要拥有项目的 Pub/Sub Editor (roles/pubsub.editor) Identity and Access Management 角色。如果您没有此角色,请与您的管理员联系。 如需详细了解与主题相关的问题排查信息,请参阅以下页面:

无法创建订阅

检查您是否已完成以下操作:

  • 验证您是否拥有必要的权限。如需创建 Pub/Sub 订阅,您需要拥有项目的 Pub/Sub Editor (roles/pubsub.editor) IAM 角色。如果您没有此角色,请与您的管理员联系。

  • 为订阅指定了名称。

  • 指定要将订阅附加到的现有主题的名称。

  • 如果创建推送订阅,请在 pushEndpoint 字段中以小写字母指定 https://(而不是 http://HTTPS://)作为接收网址的协议。

如需详细了解订阅问题排查,请参阅以下页面:

排查权限问题

Pub/Sub 权限用于控制哪些用户和服务账号可以对您的 Pub/Sub 资源执行操作。如果权限配置错误,可能会导致权限被拒错误并中断消息流。审核日志会详细记录所有权限更改,以便您找出这些问题的根源。

如需使用审核日志排查 Pub/Sub 权限问题,请执行以下操作:

  1. 获取查看日志浏览器所需的权限。

    如需了解更多信息,请参阅准备工作

  2. 在 Google Cloud 控制台中,前往 Logs Explorer 页面。

    转到日志浏览器

  3. 选择一个现有的 Google Cloud 项目、文件夹或组织。

  4. 下面列出了可用于查找相关日志的过滤条件:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription":在排查可能涉及更改主题或订阅配置或访问权限控制的任何问题时,请将此查询作为起点。您可以将其与其他过滤条件结合使用,以进一步缩小搜索范围。

    • protoPayload.methodName="google.iam.v1.SetIamPolicy":如果您怀疑问题是由权限不正确或缺失所致,请使用此查询。它可帮助您跟踪谁更改了 IAM 政策以及更改内容。这对于排查问题非常有用,例如用户无法发布到主题或订阅订阅、应用被拒绝访问 Pub/Sub 资源,或访问权限控制发生意外变化。

    • protoPayload.status.code=7:如果您遇到与权限明确相关的错误,请使用此查询。这有助于您找出哪些操作失败以及是哪些人尝试执行这些操作。您可以将此查询与之前的查询结合使用,以确定可能导致权限被拒的具体资源和 IAM 政策更改。

  5. 分析日志,以确定事件的时间戳、进行更改的正文以及所进行的更改类型等因素。

  6. 根据从审核日志中收集的信息,您可以采取纠正措施。

订阅已被删除

您可以通过以下两种主要方式删除 Pub/Sub 订阅:

  • 具有足够权限的用户或服务账号故意删除订阅。

  • 订阅会在一段闲置时间后自动删除,默认时间为 31 天。如需详细了解订阅到期政策,请参阅到期期限

如需对已删除的订阅进行问题排查,请执行以下步骤:

  1. 在 Google Cloud 控制台中,前往 Pub/Sub“订阅”页面,并验证该订阅是否已不再列出。如需详细了解如何列出订阅,请参阅列出订阅

  2. 查看审核日志。前往 Logs Explorer。 使用过滤条件 protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" 查找已删除的订阅。检查日志,确定是有人删除了订阅,还是订阅因长期闲置而被删除。InternalExpireInactiveSubscription 表示订阅因长期无活动而被删除。如需详细了解如何使用审核日志进行问题排查,请参阅使用审核日志排查 Pub/Sub 问题

403 (Forbidden) 个错误

如果遇到此错误,请执行以下操作:

  • 确保您已在Google Cloud 控制台中启用了 Pub/Sub API。
  • 确保发出请求的主账号对相关 Pub/Sub API 资源具有所需权限,尤其是在您使用 Pub/Sub API 进行跨项目通信时。

  • 如果您使用的是 Dataflow,请确保 {PROJECT_NUMBER}@cloudservices.gserviceaccount.com 和 Compute Engine 服务账号 {PROJECT_NUMBER}-compute@developer.gserviceaccount.com 对相关 Pub/Sub API 资源都具有所需权限。如需了解详情,请参阅 Dataflow 安全和权限

  • 如果您使用的是 App Engine,请检查项目的权限页面,查看 App Engine 服务账号是否被列为 Pub/Sub Editor。如果不是,请将您的 App Engine 服务账号添加为 Pub/Sub Editor。通常情况下,App Engine 服务账号的格式为 <project-id>@appspot.gserviceaccount.com

其他常见错误代码

如需查看与 Pub/Sub API 相关的其他常见错误代码及其说明的列表,请参阅错误代码

使用过多的管理操作

如果您发现占用了过多管理操作配额,可能需要重构代码。例如,请参考下面的伪代码。在此示例中,管理操作 (GET) 用于在尝试使用其资源之前检查是否存在订阅。GETCREATE 都是管理员操作:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

更有效的模式是尝试使用订阅中的消息(假设您可以合理地确定订阅名称)。在这种乐观方式中,您只能在发生错误时获取或创建订阅。请参考下面的示例:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

您可以使用以下代码示例以所选语言实现此模式:

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// Since the subscription does not exist, create the subscription.
				s, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
					Topic: client.Topic(topicID),
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subID)

				// Pull from the new subscription.
				err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Java API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Node.js 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Python API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 C++ 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub C++ API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

Node.js (TypeScript)

在尝试此示例之前,请按照《Pub/Sub 快速入门:使用客户端库》中的 Node.js 设置说明执行操作。 如需了解详情,请参阅 Pub/Sub Node.js API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}