Pemecahan masalah umum

Pelajari langkah-langkah pemecahan masalah yang mungkin berguna jika Anda mengalami masalah saat menggunakan Pub/Sub.

Tidak dapat membuat topik

Pastikan Anda memiliki izin yang diperlukan. Untuk membuat topik Pub/Sub, Anda memerlukan peran Identity and Access Management Pub/Sub Editor (roles/pubsub.editor) di project. Jika Anda tidak memiliki peran ini, hubungi administrator. Untuk informasi pemecahan masalah selengkapnya terkait topik, lihat halaman berikut:

Tidak dapat membuat langganan

Pastikan Anda telah melakukan hal berikut:

  • Pastikan Anda memiliki izin yang diperlukan. Untuk membuat langganan Pub/Sub, Anda memerlukan peran IAM Pub/Sub Editor (roles/pubsub.editor) di project. Jika Anda tidak memiliki peran ini, hubungi administrator.

  • Menentukan nama untuk langganan.

  • Menentukan nama topik yang sudah ada yang ingin Anda lampirkan langganannya.

  • Jika membuat langganan push, tentukan https:// dalam huruf kecil (bukan http:// atau HTTPS://) sebagai protokol untuk URL penerima Anda di kolom pushEndpoint.

Untuk informasi pemecahan masalah selengkapnya tentang langganan, lihat halaman berikut:

Memecahkan masalah izin

Izin Pub/Sub mengontrol pengguna dan akun layanan mana yang dapat melakukan tindakan pada resource Pub/Sub Anda. Jika izin salah dikonfigurasi, hal ini dapat menyebabkan error izin ditolak dan mengganggu alur pesan. Log audit memberikan catatan mendetail tentang semua perubahan izin, sehingga Anda dapat mengidentifikasi sumber masalah ini.

Untuk memecahkan masalah izin Pub/Sub dengan log audit:

  1. Dapatkan izin yang diperlukan untuk melihat Logs Explorer.

    Untuk mengetahui informasi selengkapnya, lihat Sebelum memulai.

  2. Di konsol Google Cloud , buka halaman Logs Explorer.

    Buka Logs Explorer

  3. Pilih project, folder, atau organisasi Google Cloud yang ada.

  4. Berikut adalah daftar filter yang dapat Anda gunakan untuk menemukan log yang relevan:

    • resource.type="pubsub_topic" OR resource.type="pubsub_subscription": Gunakan kueri ini sebagai titik awal saat Anda memecahkan masalah yang mungkin melibatkan perubahan pada konfigurasi topik atau langganan, atau kontrol akses. Anda dapat menggabungkannya dengan filter lain untuk menyaring penelusuran lebih lanjut.

    • protoPayload.methodName="google.iam.v1.SetIamPolicy": Gunakan kueri ini saat Anda mencurigai bahwa masalah disebabkan oleh izin yang salah atau tidak ada. Fitur ini membantu Anda melacak siapa yang membuat perubahan pada kebijakan IAM dan perubahan apa saja yang dilakukan. Hal ini dapat berguna untuk memecahkan masalah seperti pengguna yang tidak dapat memublikasikan ke topik atau berlangganan langganan, aplikasi yang ditolak aksesnya ke resource Pub/Sub, atau perubahan tak terduga pada kontrol akses.

    • protoPayload.status.code=7: Gunakan kueri ini saat Anda mengalami error yang secara eksplisit terkait dengan izin. Hal ini membantu Anda menentukan tindakan mana yang gagal dan siapa yang mencobanya. Anda dapat menggabungkan kueri ini dengan kueri sebelumnya untuk mengidentifikasi resource tertentu dan perubahan kebijakan IAM yang mungkin menyebabkan penolakan izin.

  5. Analisis log untuk menentukan faktor-faktor seperti stempel waktu peristiwa, prinsipal yang melakukan perubahan, dan jenis perubahan yang dilakukan.

  6. Berdasarkan informasi yang dikumpulkan dari log audit, Anda dapat mengambil tindakan perbaikan.

Langganan dihapus

Langganan Pub/Sub dapat dihapus dengan dua cara utama:

  • Pengguna atau akun layanan dengan izin yang memadai sengaja menghapus langganan.

  • Langganan akan otomatis dihapus setelah periode tidak aktif, yaitu 31 hari secara default. Untuk informasi selengkapnya tentang kebijakan masa berlaku langganan, lihat Periode habis masa berlaku.

Untuk memecahkan masalah langganan yang dihapus, lakukan langkah-langkah berikut:

  1. Di konsol Google Cloud , buka halaman langganan Pub/Sub dan pastikan langganan tersebut tidak lagi tercantum. Untuk mengetahui informasi selengkapnya tentang cara mencantumkan langganan, lihat Mencantumkan langganan.

  2. Periksa log audit. Buka Logs Explorer. Gunakan filter protoPayload.methodName="google.pubsub.v1.Subscriber.DeleteSubscription" untuk menemukan subscription yang dihapus. Periksa log untuk menentukan apakah seseorang menghapus langganan atau langganan dihapus karena tidak ada aktivitas. InternalExpireInactiveSubscription menunjukkan bahwa langganan telah dihapus karena tidak aktif. Untuk informasi selengkapnya tentang cara menggunakan log audit untuk pemecahan masalah, lihat Memecahkan masalah Pub/Sub dengan log audit.

403 (Forbidden) error

Jika Anda mendapatkan error ini, lakukan hal berikut:

  • Pastikan Anda telah mengaktifkan Pub/Sub API di konsol Google Cloud .
  • Pastikan akun utama yang membuat permintaan memiliki izin yang diperlukan pada resource Pub/Sub API yang relevan, terutama jika Anda menggunakan Pub/Sub API untuk komunikasi lintas project.

  • Jika Anda menggunakan Dataflow, pastikan {PROJECT_NUMBER}@cloudservices.gserviceaccount.com dan akun Layanan Compute Engine {PROJECT_NUMBER}-compute@developer.gserviceaccount.com memiliki izin yang diperlukan pada resource Pub/Sub API yang relevan. Untuk mengetahui informasi selengkapnya, lihat Keamanan dan Izin Dataflow.

  • Jika Anda menggunakan App Engine, periksa halaman Izin project untuk melihat apakah Akun Layanan App Engine tercantum sebagai Editor Pub/Sub. Jika belum, tambahkan Akun Layanan App Engine Anda sebagai Editor Pub/Sub. Biasanya, Akun Layanan App Engine memiliki format <project-id>@appspot.gserviceaccount.com.

Kode error umum lainnya

Untuk mengetahui daftar kode error umum lainnya yang terkait dengan Pub/Sub API dan deskripsinya, lihat Kode Error.

Menggunakan operasi administratif yang berlebihan

Jika Anda mendapati bahwa Anda menggunakan terlalu banyak kuota untuk operasi administratif, Anda mungkin perlu memfaktorkan ulang kode. Sebagai ilustrasi, pertimbangkan kode pseudo ini. Dalam contoh ini, operasi administratif (GET) digunakan untuk memeriksa keberadaan langganan sebelum mencoba menggunakan resource-nya. GET dan CREATE adalah operasi administrator:

if !GetSubscription my-sub {
  CreateSubscription my-sub
}
Consume from subscription my-sub

Pola yang lebih efisien adalah mencoba menggunakan pesan dari langganan (dengan asumsi bahwa Anda dapat yakin dengan nama langganan). Dalam pendekatan optimis ini, Anda hanya mendapatkan atau membuat langganan jika terjadi error. Perhatikan contoh berikut:

try {
  Consume from subscription my-sub
} catch NotFoundError {
  CreateSubscription my-sub
  Consume from subscription my-sub
}

Anda dapat menggunakan contoh kode berikut untuk menerapkan pola ini dalam bahasa pilihan Anda:

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"errors"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

// optimisticSubscribe shows the recommended pattern for optimistically
// assuming a subscription exists prior to receiving messages.
func optimisticSubscribe(w io.Writer, projectID, topicID, subID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Instead of checking if the subscription exists, optimistically try to
	// receive from the subscription.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got from existing subscription: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		if st, ok := status.FromError(err); ok {
			if st.Code() == codes.NotFound {
				// Since the subscription does not exist, create the subscription.
				s, err := client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
					Topic: client.Topic(topicID),
				})
				if err != nil {
					return err
				}
				fmt.Fprintf(w, "Created subscription: %q\n", subID)

				// Pull from the new subscription.
				err = s.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
					fmt.Fprintf(w, "Got from new subscription: %q\n", string(msg.Data))
					msg.Ack()
				})
				if err != nil && !errors.Is(err, context.Canceled) {
					return err
				}
			}
		}
	}
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class OptimisticSubscribeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    String topicId = "your-topic-id";

    optimisticSubscribeExample(projectId, subscriptionId, topicId);
  }

  public static void optimisticSubscribeExample(
      String projectId, String subscriptionId, String topicId) throws IOException {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

      // Listen for resource NOT_FOUND errors and rebuild the  subscriber and restart subscribing
      // when the current subscriber encounters these errors.
      subscriber.addListener(
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              System.out.println(failure.getStackTrace());
              if (failure instanceof NotFoundException) {
                try (SubscriptionAdminClient subscriptionAdminClient =
                    SubscriptionAdminClient.create()) {
                  TopicName topicName = TopicName.of(projectId, topicId);
                  // Create a pull subscription with default acknowledgement deadline of 10 seconds.
                  // The client library will automatically extend acknowledgement deadlines.
                  Subscription subscription =
                      subscriptionAdminClient.createSubscription(
                          subscriptionName, topicName, PushConfig.getDefaultInstance(), 10);
                  System.out.println("Created pull subscription: " + subscription.getName());
                  optimisticSubscribeExample(projectId, subscriptionId, topicId);
                } catch (IOException err) {
                  System.out.println("Failed to create pull subscription: " + err.getMessage());
                }
              }
            }
          },
          MoreExecutors.directExecutor());

      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (IllegalStateException e) {
      // Prevent an exception from being thrown if it is the expected NotFoundException
      if (!(subscriber.failureCause() instanceof NotFoundException)) {
        throw e;
      }
    } catch (TimeoutException e) {
      subscriber.stopAsync();
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(subscriptionNameOrId, topicNameOrId, timeout) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async e => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

from google.api_core.exceptions import NotFound
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0
# topic_id = "your-topic-id"

# Create a subscriber client.
subscriber = pubsub_v1.SubscriberClient()

# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Define callback to be called when a message is received.
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Ack message after processing it.
    message.ack()

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # Optimistically subscribe to messages on the subscription.
        streaming_pull_future = subscriber.subscribe(
            subscription_path, callback=callback
        )
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        print("Successfully subscribed until the timeout passed.")
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.
    except NotFound:
        print(f"Subscription {subscription_path} not found, creating it.")

        try:
            # If the subscription does not exist, then create it.
            publisher = pubsub_v1.PublisherClient()
            topic_path = publisher.topic_path(project_id, topic_id)
            subscription = subscriber.create_subscription(
                request={"name": subscription_path, "topic": topic_path}
            )

            if subscription:
                print(f"Subscription {subscription.name} created")
            else:
                raise ValueError("Subscription creation failed.")

            # Subscribe on the created subscription.
            try:
                streaming_pull_future = subscriber.subscribe(
                    subscription.name, callback=callback
                )
                streaming_pull_future.result(timeout=timeout)
            except TimeoutError:
                streaming_pull_future.cancel()  # Trigger the shutdown.
                streaming_pull_future.result()  # Block until the shutdown is complete.
        except Exception as e:
            print(
                f"Exception occurred when creating subscription and subscribing to it: {e}"
            )
    except Exception as e:
        print(f"Exception occurred when attempting optimistic subscribe: {e}")

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

auto process_response = [](gc::StatusOr<pubsub::PullResponse> response) {
  if (response) {
    std::cout << "Received message " << response->message << "\n";
    std::move(response->handler).ack();
    return gc::Status();
  }
  if (response.status().code() == gc::StatusCode::kUnavailable &&
      response.status().message() == "no messages returned") {
    std::cout << "No messages returned from Pull()\n";
    return gc::Status();
  }
  return response.status();
};

// Instead of checking if the subscription exists, optimistically try to
// consume from the subscription.
auto status = process_response(subscriber.Pull());
if (status.ok()) return;
if (status.code() != gc::StatusCode::kNotFound) throw std::move(status);

// Since the subscription does not exist, create the subscription.
pubsub_admin::SubscriptionAdminClient subscription_admin_client(
    pubsub_admin::MakeSubscriptionAdminConnection());
google::pubsub::v1::Subscription request;
request.set_name(
    pubsub::Subscription(project_id, subscription_id).FullName());
request.set_topic(
    pubsub::Topic(project_id, std::move(topic_id)).FullName());
auto sub = subscription_admin_client.CreateSubscription(request);
if (!sub) throw std::move(sub).status();

// Consume from the new subscription.
status = process_response(subscriber.Pull());
if (!status.ok()) throw std::move(status);

Node.js (TypeScript)

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message, StatusError} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function optimisticSubscribe(
  subscriptionNameOrId: string,
  topicNameOrId: string,
  timeout: number
) {
  // Try using an existing subscription
  let subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Set an error handler so that we're notified if the subscription doesn't
  // already exist.
  subscription.on('error', async (e: StatusError) => {
    // Resource Not Found
    if (e.code === 5) {
      console.log('Subscription not found, creating it');
      await pubSubClient.createSubscription(
        topicNameOrId,
        subscriptionNameOrId
      );

      // Refresh our subscriber object and re-attach the message handler.
      subscription = pubSubClient.subscription(subscriptionNameOrId);
      subscription.on('message', messageHandler);
    }
  });

  // Listen for new messages until timeout is hit; this will attempt to
  // open the actual subscriber streams. If it fails, the error handler
  // above will be called.
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}