Dokumen ini memberikan ringkasan tentang langganan pull, alur kerjanya, dan properti terkait.

Dalam langganan pull, klien pelanggan meminta pesan dari server Pub/Sub.

Mode pull dapat menggunakan salah satu dari dua API layanan, Pull atau StreamingPull. Untuk menjalankan API yang dipilih, Anda dapat memilih library klien tingkat tinggi yang disediakan Google, atau library klien tingkat rendah yang dibuat secara otomatis. Anda juga dapat memilih antara pemrosesan pesan asinkron dan sinkron.

Sebelum membaca dokumen ini, pastikan Anda memahami hal-hal berikut:

  • Cara kerja Pub/Sub dan berbagai istilah Pub/Sub.

  • Berbagai jenis langganan yang didukung Pub/Sub dan alasan Anda mungkin ingin menggunakan langganan pull.

Alur kerja langganan pull

Untuk langganan pull, klien pelanggan Anda akan memulai permintaan ke server Pub/Sub untuk mengambil pesan. Klien pelanggan menggunakan salah satu API berikut:

Sebagian besar klien pelanggan tidak membuat permintaan ini secara langsung. Sebagai gantinya, klien mengandalkan library klien tingkat tinggi yang disediakan Google Cloudyang melakukan permintaan pull streaming secara internal dan mengirimkan pesan secara asinkron. Untuk klien pelanggan yang memerlukan kontrol lebih besar atas cara pesan diambil, Pub/Sub menggunakan library gRPC tingkat rendah dan yang dibuat secara otomatis. Library ini membuat permintaan pull atau streaming pull secara langsung. Permintaan ini dapat bersifat sinkron atau asinkron.

Dua gambar berikut menunjukkan alur kerja antara klien pelanggan dan langganan pull.

Gambar 1. Alur kerja untuk langganan pull

Gambar 2. Alur kerja untuk langganan pull streaming

Alur kerja pull

Alur kerja pull adalah sebagai berikut dan mereferensikan Gambar 1:

  1. Klien pelanggan secara eksplisit memanggil metode pull, yang meminta pesan untuk dikirim. Permintaan ini adalah PullRequest seperti yang ditunjukkan dalam gambar.
  2. Server Pub/Sub merespons dengan nol atau beberapa pesan dan ID konfirmasi. Respons dengan nol pesan atau dengan error tidak selalu menunjukkan bahwa tidak ada pesan yang tersedia untuk diterima. Respons ini adalah PullResponse seperti yang ditunjukkan pada gambar.

  3. Klien pelanggan secara eksplisit memanggil metode acknowledge. Klien menggunakan ID konfirmasi yang ditampilkan untuk mengonfirmasi bahwa pesan diproses dan tidak perlu dikirim lagi.

Untuk satu permintaan pull streaming, klien pelanggan dapat memiliki beberapa respons yang ditampilkan karena koneksi terbuka. Sebaliknya, hanya satu respons yang ditampilkan untuk setiap permintaan pull.

Properti langganan pull

Properti yang Anda konfigurasikan untuk langganan pull menentukan cara Anda menulis pesan ke langganan. Untuk mengetahui informasi selengkapnya, lihat properti langganan.

API layanan Pub/Sub

Langganan pull Pub/Sub dapat menggunakan salah satu dari dua API berikut untuk mengambil pesan:

  • Tarik
  • StreamingPull

Gunakan RPC Acknowledge dan ModifyAckDeadline unary saat Anda menerima pesan menggunakan API ini. Kedua Pub/Sub API dijelaskan di bagian berikut.

StreamingPull API

Jika memungkinkan, library klien Pub/Sub menggunakan StreamingPull untuk throughput maksimum dan latensi terendah. Meskipun Anda mungkin tidak pernah menggunakan StreamingPull API secara langsung, penting untuk mengetahui perbedaannya dengan Pull API.

StreamingPull API mengandalkan koneksi dua arah yang persisten untuk menerima beberapa pesan saat tersedia. Berikut adalah alur kerjanya:

  1. Klien mengirimkan permintaan ke server untuk membuat koneksi. Jika kuota koneksi terlampaui, server akan menampilkan error resource habis. Library klien akan otomatis mencoba kembali error kehabisan kuota.

  2. Jika tidak ada error atau kuota koneksi tersedia lagi, server akan terus mengirim pesan ke klien yang terhubung.

  3. Jika atau saat kuota throughput terlampaui, server akan berhenti mengirim pesan. Namun, koneksi tidak terputus. Setiap kali kuota throughput yang memadai tersedia lagi, streaming akan dilanjutkan.

  4. Klien atau server pada akhirnya akan menutup koneksi.

StreamingPull API mempertahankan koneksi yang terbuka. Server Pub/Sub secara berulang menutup koneksi setelah jangka waktu tertentu untuk menghindari koneksi melekat yang berjalan lama. Library klien akan otomatis membuka kembali koneksi StreamingPull.

Pesan akan dikirim ke koneksi saat tersedia. Dengan demikian, StreamingPull API meminimalkan latensi dan memaksimalkan throughput untuk pesan.

Baca selengkapnya tentang metode StreamingPull RPC: StreamingPullRequest dan StreamingPullResponse.

Pull API

API ini adalah RPC unary tradisional yang didasarkan pada model permintaan dan respons. Satu respons pull sesuai dengan satu permintaan pull. Berikut adalah alur kerjanya:

  1. Klien mengirimkan permintaan ke server untuk pesan. Jika kuota throughput terlampaui, server akan menampilkan error resource habis.

  2. Jika tidak ada error atau kuota throughput tersedia lagi, server akan membalas dengan nol atau lebih pesan dan ID konfirmasi.

Saat menggunakan Pull API unary, respons dengan nol pesan atau dengan error tidak selalu menunjukkan bahwa tidak ada pesan yang tersedia untuk diterima.

Penggunaan Pull API tidak menjamin latensi rendah dan throughput pesan yang tinggi. Untuk mencapai throughput tinggi dan latensi rendah dengan Pull API, Anda harus memiliki beberapa permintaan yang belum terselesaikan secara bersamaan. Permintaan baru dibuat saat permintaan lama menerima respons. Merancang solusi semacam itu rawan error dan sulit dikelola. Sebaiknya gunakan StreamingPull API untuk kasus penggunaan tersebut.

Gunakan Pull API, bukan StreamingPull API, hanya jika Anda memerlukan kontrol ketat atas hal berikut:

  • Jumlah pesan yang dapat diproses oleh klien pelanggan
  • Memori dan resource klien

Anda juga dapat menggunakan API ini jika subscriber Anda adalah proxy antara Pub/Sub dan layanan lain yang beroperasi dengan cara yang lebih berorientasi pull.

Baca selengkapnya tentang metode REST Pull: Metode: projects.subscriptions.pull.

Baca selengkapnya tentang metode Pull RPC: PullRequest dan PullResponse.

Jenis mode pemrosesan pesan

Pilih salah satu mode pull berikut untuk klien pelanggan Anda.

Mode pull asinkron

Mode pull asinkron memisahkan penerimaan pesan dari pemrosesan pesan di klien pelanggan. Mode ini adalah default untuk sebagian besar klien pelanggan. Mode pull asinkron dapat menggunakan StreamingPull API atau Pull API unary. Pull asinkron juga dapat menggunakan library klien tingkat tinggi atau library klien tingkat rendah yang dibuat otomatis.

Anda dapat mempelajari library klien lebih lanjut nanti dalam dokumen ini.

Mode pull sinkron

Dalam mode pull sinkron, penerimaan dan pemrosesan pesan terjadi secara berurutan dan tidak dipisahkan satu sama lain. Oleh karena itu, mirip dengan StreamingPull versus unary Pull API, pemrosesan asinkron menawarkan latensi yang lebih rendah dan throughput yang lebih tinggi daripada pemrosesan sinkron.

Gunakan mode pull sinkron hanya untuk aplikasi dengan latensi rendah dan throughput tinggi yang bukan merupakan faktor terpenting dibandingkan dengan beberapa persyaratan lainnya. Misalnya, aplikasi mungkin dibatasi untuk hanya menggunakan model pemrograman sinkron. Atau, aplikasi dengan batasan resource mungkin memerlukan kontrol yang lebih tepat atas memori, jaringan, atau CPU. Dalam kasus tersebut, gunakan mode sinkron dengan Pull API unary.

Library klien Pub/Sub

Pub/Sub menawarkan library klien tingkat tinggi dan tingkat rendah yang dihasilkan secara otomatis.

Library klien Pub/Sub tingkat tinggi

Library klien tingkat tinggi menyediakan opsi untuk mengontrol batas waktu konfirmasi dengan menggunakan pengelolaan sewa. Opsi ini lebih terperinci daripada saat Anda mengonfigurasi batas waktu konfirmasi dengan menggunakan konsol atau CLI di tingkat langganan. Library klien tingkat tinggi juga menerapkan dukungan untuk fitur seperti pengiriman yang diurutkan, pengiriman tepat sekali, dan kontrol alur.

Sebaiknya gunakan pull asinkron dan StreamingPull API dengan library klien tingkat tinggi. Tidak semua bahasa yang didukung untuk Google Cloud juga mendukung Pull API di library klien tingkat tinggi.

Untuk menggunakan library klien tingkat tinggi, lihat Library klien Pub/Sub.

Library klien Pub/Sub tingkat rendah yang dibuat secara otomatis

Library klien level rendah tersedia untuk kasus saat Anda harus menggunakan Pull API secara langsung. Anda dapat menggunakan pemrosesan sinkron atau asinkron dengan library klien tingkat rendah yang dibuat secara otomatis. Anda harus membuat kode fitur secara manual seperti pengiriman yang diurutkan, pengiriman tepat sekali, kontrol alur, dan pengelolaan sewa saat menggunakan library klien tingkat rendah yang dibuat otomatis.

Anda dapat menggunakan model pemrosesan sinkron saat menggunakan library klien tingkat rendah yang dibuat otomatis untuk semua bahasa yang didukung. Anda dapat menggunakan library klien tingkat rendah yang dibuat otomatis dan pull sinkron jika menggunakan Pull API secara langsung masuk akal. Misalnya, Anda mungkin memiliki logika aplikasi yang ada yang mengandalkan model ini.

Untuk menggunakan library klien tingkat rendah yang dibuat otomatis secara langsung, lihat ringkasan Pub/Sub API.

Contoh kode library klien

Contoh kode library klien StreamingPull dan tingkat tinggi


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C# Pub/Sub.

using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (


func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${}:`);
    console.log(`\tData: ${}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${}:`);
    console.log(`\tData: ${}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

from concurrent.futures import TimeoutError
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

Mengambil atribut khusus menggunakan library klien tingkat tinggi

Contoh berikut menunjukkan cara menarik pesan secara asinkron dan mengambil atribut kustom dari metadata.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C# Pub/Sub.

using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithCustomAttributesAsyncSample
    public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        var messages = new List<PubsubMessage>();
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            if (message.Attributes != null)
                foreach (var attribute in message.Attributes)
                    Console.WriteLine($"{attribute.Key} = {attribute.Value}");
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messages;


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (


func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
		fmt.Fprintln(w, "Attributes:")
		for key, value := range msg.Attributes {
			fmt.Fprintf(w, "%s = %s\n", key, value)
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)

	return nil


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
              .forEach((key, value) -> System.out.println(key + " = " + value));

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenWithCustomAttributes(subscriptionNameOrId, timeout) {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
      `Received message: id ${}, data ${
      }, attributes: ${JSON.stringify(message.attributes)}`

    // "Ack" (acknowledge receipt of) the message

  // Wait a while for the subscription to run. (Part of the sample only.)
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
  }, timeout * 1000);


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

from concurrent.futures import TimeoutError
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {!r}.")
    if message.attributes:
        for key in message.attributes:
            value = message.attributes.get(key)
            print(f"{key}: {value}")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

Menangani error menggunakan library klien tingkat tinggi

Contoh berikut menunjukkan cara menangani error yang muncul saat berlangganan pesan.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber
      .Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
      // Setup an error handler for the subscription session
      .then([](future<google::cloud::Status> f) {
        std::cout << "Subscription session result: " << f.get() << "\n";


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

import (


func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	defer client.Close()

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
	if err != nil {
		return fmt.Errorf("Receive: %w", err)
	return nil


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)

      // Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
      // when the current subscriber encounters permanent errors.
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              if (!executorProvider.getExecutor().isShutdown()) {
                subscribeWithErrorListenerExample(projectId, subscriptionId);

      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message

  // Create an event handler to handle errors
  const errorHandler = error => {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    except Exception as e:
            f"Listening for messages on {subscription_path} threw an exception: {e}."
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Go API.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
rescue StandardError => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."

Contoh kode pull unary

Berikut adalah beberapa contoh kode untuk mengambil dan mengonfirmasi jumlah pesan tetap.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub C++ API.

[](google::cloud::pubsub::Subscriber subscriber) {
  auto response = subscriber.Pull();
  if (!response) throw std::move(response).status();
  std::cout << "Received message " << response->message << "\n";


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C# Pub/Sub.

using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Threading;

public class PullMessagesSyncSample
    public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
        int messageCount = 0;
            // Pull messages from server,
            // allowing an immediate response if there are no messages.
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
            // If acknowledgement required, send to server.
            if (acknowledge && messageCount > 0)
                subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        return messageCount;


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Java API Pub/Sub.

import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncExample(projectId, subscriptionId, numOfMessages);

  public static void subscribeSyncExample(
      String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
    SubscriberStubSettings subscriberStubSettings =
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // Handle received message
        // ...

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPull(projectId, subscriptionNameOrId) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${}`);
    if (message.ackId) {

  if (ackIds.length !== 0) {
    // Acknowledge all of the messages. You could also acknowledge
    // these individually, but this is more efficient.
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: ackIds,

    await subClient.acknowledge(ackRequest);



Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Node.js Pub/Sub.

use Google\Cloud\PubSub\PubSubClient;

 * Pulls all Pub/Sub messages for a subscription.
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
function pull_messages($projectId, $subscriptionName)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Ruby API.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscription.pull(immediate: false).each do |message|
  puts "Message pulled: #{}"




  "returnImmediately": "false",
  "maxMessages": "1"


200 OK

  "receivedMessages": [{
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"



  "ackIds": [


Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan Memulai: Menggunakan Library Klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Pub/Sub Python API.

from google.api_core import retry
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)


# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages. The actual
    # number of messages pulled may be smaller than max_messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},

    if len(response.received_messages) == 0:

    ack_ids = []
    for received_message in response.received_messages:
        print(f"Received: {}.")

    # Acknowledges the received messages so they will not be sent again.
        request={"subscription": subscription_path, "ack_ids": ack_ids}

        f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."

Pub/Sub mengirimkan daftar pesan. Jika daftar memiliki beberapa pesan, Pub/Sub akan mengurutkan pesan dengan kunci pengurutan yang sama. Berikut adalah beberapa pengecualian penting:

  • Menetapkan nilai untuk max_messages dalam permintaan tidak menjamin bahwa max_messages ditampilkan, meskipun ada banyak pesan dalam backlog. Pub/Sub Pull API mungkin menampilkan kurang dari max_messages untuk mengurangi latensi pengiriman pesan yang siap dikirim.

  • Respons pull yang berisi 0 pesan tidak boleh digunakan sebagai indikator bahwa tidak ada pesan dalam backlog. Anda bisa mendapatkan respons dengan 0 pesan dan memiliki permintaan berikutnya yang menampilkan pesan.

  • Untuk mencapai latensi pengiriman pesan yang rendah dengan mode pull unary, Anda harus memiliki banyak permintaan pull yang belum terselesaikan secara bersamaan. Seiring peningkatan throughput topik, lebih banyak permintaan pull diperlukan. Secara umum, mode StreamingPull lebih disukai untuk aplikasi yang sensitif terhadap latensi.

Kuota dan batas

Koneksi Pull dan StreamingPull tunduk pada kuota dan batas. Untuk mengetahui informasi selengkapnya, lihat Kuota dan batas Pub/Sub.

