일반적인 문제해결

Pub/Sub 사용 중에 문제가 발생할 경우 유용하게 활용할 수 있는 문제 해결 단계를 알아봅니다.

주제를 만들 수 없음

필요한 권한이 있는지 확인합니다. Pub/Sub 주제를 만들려면 프로젝트에 대해 Pub/Sub 편집자 (roles/pubsub.editor) Identity and Access Management 역할이 필요합니다. 이 역할이 없으면 관리자에게 문의하세요. 주제 관련 문제 해결에 대한 자세한 내용은 다음 페이지를 참고하세요.

구독 생성 불가

다음을 완료했는지 확인합니다.

  • 필요한 권한이 있는지 확인합니다. Pub/Sub 구독을 만들려면 프로젝트에 대해 Pub/Sub 편집자(roles/pubsub.editor) IAM 역할이 필요합니다. 이 역할이 없으면 관리자에게 문의하세요.

  • 구독 이름을 지정했습니다.

  • 구독을 연결하려는 기존 주제 이름을 지정했습니다.

  • push 구독을 만드는 경우 pushEndpoint 필드에 수신 URL의 프로토콜로 https://를 소문자 (http:// 또는 HTTPS:// 아님)로 지정했습니다.

구독 문제 해결에 관한 자세한 내용은 다음 페이지를 참고하세요.

권한 문제 해결

Pub/Sub 권한은 Pub/Sub 리소스에 대한 작업을 실행할 수 있는 사용자 및 서비스 계정을 제어합니다. 권한이 잘못 구성되면 권한 거부 오류가 발생하고 메시지 흐름이 중단될 수 있습니다. 감사 로그는 모든 권한 변경사항에 관한 자세한 기록을 제공하므로 이러한 문제의 근원을 파악할 수 있습니다.

감사 로그의 Pub/Sub 권한 문제를 해결하려면 다음 단계를 따르세요.

  1. 로그 탐색기를 보려면 필요한 권한을 가져옵니다.

    자세한 내용은 시작하기 전에를 참조하세요.

  2. Google Cloud 콘솔에서 로그 탐색기 페이지로 이동합니다.

    로그 탐색기로 이동

  3. 기존 Google Cloud 프로젝트, 폴더 또는 조직을 선택합니다.

  4. 다음은 관련 로그를 찾는 데 사용할 수 있는 필터 목록입니다.

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": 주제 또는 구독 구성 또는 액세스 제어 변경과 관련된 문제를 해결할 때 이 쿼리를 시작점으로 사용하세요. 이 필터를 다른 필터와 결합하여 검색 범위를 더욱 좁힐 수 있습니다.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": 잘못된 권한 또는 누락된 권한으로 인해 문제가 발생한 것으로 의심되는 경우 이 쿼리를 사용하세요. 이를 통해 IAM 정책을 변경한 사용자와 변경사항을 추적할 수 있습니다. 이는 사용자가 주제에 게시하거나 구독을 구독할 수 없거나, 애플리케이션에 Pub/Sub 리소스에 대한 액세스가 거부되었거나, 액세스 제어에 예상치 못한 변경사항이 있는 등의 문제를 해결하는 데 유용할 수 있습니다.

    • protoPayload.status.code=7: 권한과 명시적으로 관련된 오류가 발생할 때 이 쿼리를 사용합니다. 이를 통해 실패하는 작업과 해당 작업을 시도하는 사용자를 파악할 수 있습니다. 이 쿼리를 이전 쿼리와 결합하여 권한 거부에 원인이 될 수 있는 특정 리소스 및 IAM 정책 변경사항을 식별할 수 있습니다.

  5. 로그를 분석하여 이벤트 타임스탬프, 변경사항을 적용한 사용자, 변경사항 유형과 같은 요소를 확인합니다.

  6. 감사 로그에서 수집한 정보를 바탕으로 시정 조치를 취할 수 있습니다.

구독이 삭제됨

Pub/Sub 구독은 다음 두 가지 기본 방법으로 삭제할 수 있습니다.

  • 충분한 권한이 있는 사용자 또는 서비스 계정이 의도적으로 정기 결제를 삭제합니다.

  • 구독은 일정 기간 활동이 없으면 자동으로 삭제되며 기본적으로 이 기간은 31일입니다. 정기 결제 만료 정책에 대한 자세한 내용은 만료 기간을 참고하세요.

삭제된 구독 문제를 해결하려면 다음 단계를 따르세요.

  1. Google Cloud 콘솔에서 Pub/Sub 구독 페이지로 이동하여 구독이 더 이상 표시되지 않는지 확인합니다. 정기 결제를 표시하는 방법에 관한 자세한 내용은 정기 결제 표시를 참고하세요.

  2. 감사 로그를 확인합니다. 로그 탐색기로 이동합니다. protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" 필터를 사용하여 삭제된 구독을 찾습니다. 로그를 검토하여 누군가 구독을 삭제했는지 또는 비활성 상태로 인해 삭제되었는지 확인합니다. InternalExpireInactiveSubscription는 비활성 상태로 인해 구독이 삭제되었음을 나타냅니다. 문제 해결을 위해 감사 로그를 사용하는 방법에 관한 자세한 내용은 감사 로그를 사용하여 Pub/Sub 문제 해결을 참고하세요.

오류 403 (Forbidden)

이 오류가 발생한다면 다음을 수행하세요.

  • Google Cloud 콘솔에서 Pub/Sub API를 사용 설정했는지 확인합니다.
  • 특히 프로젝트 간 통신에 Pub/Sub API를 사용하는 경우, 요청을 수행하는 주 구성원이 관련 Pub/Sub API 리소스에 필요한 권한을 갖고 있는지 확인합니다.

  • Dataflow를 사용하는 경우 {PROJECT_NUMBER}@cloudservices.gserviceaccount.com 및 Compute Engine 서비스 계정 {PROJECT_NUMBER}-compute@developer.gserviceaccount.com에 모두 관련 Pub/Sub API 리소스에 대한 필수 권한이 있는지 확인합니다. 자세한 내용은 Dataflow 보안 및 권한을 참조하세요.

  • App Engine을 사용한다면 프로젝트의 권한 페이지를 확인해 App Engine 서비스 계정이 Pub/Sub 편집자로 표시되어 있는지 살펴봅니다. 그렇지 않다면 App Engine 서비스 계정을 Pub/Sub 편집자로 추가해야 합니다. 일반적으로 App Engine 서비스 계정은 <project-id>@appspot.gserviceaccount.com 형식입니다.

기타 일반적인 오류 코드

Pub/Sub API와 관련된 다른 일반적인 오류 코드 목록 및 설명은 오류 코드를 참고하세요.

과도한 관리 작업 사용

관리 작업에 할당량을 너무 많이 사용하고 있는 경우 코드를 리팩터링해야 할 수도 있습니다. 다음 유사 코드를 예로 들어 보겠습니다. 이 예에서는 리소스를 소모하기 전에 관리 작업 (GET)을 사용하여 구독이 있는지 확인합니다. GETCREATE 모두 관리 작업입니다.

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

보다 효율적인 패턴은 구독에서 메시지를 소비하려고 시도하는 것입니다(구독 이름을 합리적으로 확신할 수 있다고 가정). 이 낙관적 접근 방식에서는 오류가 있는 경우에만 구독을 가져오거나 생성합니다. 다음 예를 참조하세요.

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

다음 코드 샘플을 사용하여 원하는 언어로 이 패턴을 구현할 수 있습니다.

Go

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// Since the subscription does not exist, create the subscription.
				s, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
					Topic: client.Topic(topicID),
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subID)

				// Pull from the new subscription.
				err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

Node.js (TypeScript)

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}