Sottoscrizioni pull

Questo documento fornisce una panoramica di un abbonamento pull, del relativo flusso di lavoro e delle proprietà associate.

In una sottoscrizione pull, il client sottoscrittore richiede i messaggi il server Pub/Sub.

La modalità pull può utilizzare una delle due API di servizio Pull o StreamingPull. Per eseguire l'API scelta, puoi selezionare un client di alto livello fornito da Google libreria client o una libreria client di basso livello generata automaticamente. Puoi anche scegliere tra l'elaborazione asincrona e quella sincrona dei messaggi.

Prima di iniziare

Prima di leggere questo documento, assicurati di conoscere quanto segue:

  • Come funziona Pub/Sub e i diversi termini di Pub/Sub.

  • I diversi tipi di abbonamenti supportato da Pub/Sub e perché potrebbe essere utile utilizzare una query pull abbonamento.

Flusso di lavoro di sottoscrizione pull

Per una sottoscrizione pull, il client sottoscrittore avvia richieste a un il server Pub/Sub per recuperare i messaggi. Il client abbonato utilizza una delle seguenti API:

La maggior parte dei client sottoscrittori non effettua queste richieste direttamente. Invece, si affidano alla libreria client di alto livello fornita da Google Cloud, esegue internamente le richieste di pull in modalità flusso e recapita i messaggi in modo asincrono. Per un client sottoscrittore che ha bisogno di un maggiore controllo sul modo in cui vengono estratti i messaggi, Pub/Sub utilizza una libreria gRPC di basso livello e generata automaticamente. Questa libreria effettua richieste di pull o in modalità flusso . Queste richieste possono essere sincrone o asincrone.

Le due immagini seguenti mostrano il flusso di lavoro tra un client sottoscrittore e un abbonamento pull.

Flusso di messaggi per una sottoscrizione pull
Figura 1. Flusso di lavoro per una sottoscrizione pull

Flusso di messaggi per un
Sottoscrizione streamingPull
Figura 2. Flusso di lavoro per un abbonamento pull per lo streaming

Flusso di lavoro pull

Il flusso di lavoro pull è il seguente e fa riferimento alla Figura 1:

  1. Il client sottoscrittore chiama esplicitamente il metodo pull, che richiede i messaggi da consegnare. Questa richiesta è PullRequest come mostrato nell'immagine.
  2. Il server Pub/Sub risponde con zero o più messaggi e ID conferma. Una risposta con zero messaggi o con un errore non indicano necessariamente che non ci sono messaggi disponibili da ricevere. Questo la risposta è PullResponse, come mostrato nell'immagine.

  3. Il client del sottoscrittore chiama esplicitamente il metodo acknowledge. Il cliente utilizza l'ID di conferma restituito per confermare che il messaggio sono elaborati e non devono essere recapitati di nuovo.

Per una singola richiesta di pull in streaming, un client sottoscrittore può ricevere più risposte a causa della connessione aperta. Al contrario, solo una risposta restituiti per ogni richiesta di pull.

Proprietà di una sottoscrizione pull

Le proprietà che configuri per una sottoscrizione pull determinano la modalità di scrittura dei messaggi nella sottoscrizione. Per ulteriori informazioni, vedi proprietà abbonamento.

API di servizio Pub/Sub

La sottoscrizione al pull Pub/Sub può utilizzare uno dei le due API seguenti per il recupero dei messaggi:

  • Pull
  • StreamingPull

Utilizza le RPC Acknowledge e ModifyAckDeadline univoche quando ricevi messaggi utilizzando queste API. Le due API Pub/Sub sono descritte nel le schede seguenti.

API StreamingPull

Ove possibile, le librerie client Pub/Sub utilizzano StreamingPull per una velocità in uscita massima e una latenza minima. Anche se potresti non usare mai direttamente l'API StreamingPull, è importante conoscerne le differenze dall'API Pull.

L'API StreamingPull si basa su una connessione bidirezionale persistente per ricevere più messaggi man mano che diventano disponibili. Di seguito sono riportate le flusso di lavoro personalizzato:

  1. Il client invia una richiesta al server per stabilire una connessione. Se la quota di connessioni viene superata, il server restituisce un errore di risorsa esaurita. La libreria client riprova automaticamente gli errori di superamento della quota.

  2. Se non si verificano errori o se la quota di connessione è di nuovo disponibile, il server invia continuamente messaggi al client connesso.

  3. Se o quando viene superata la quota di velocità effettiva, il server interrompe l'invio messaggi. Tuttavia, la connessione non è interrotta. Ogni volta che è di nuovo disponibile una quota di throughput sufficiente, lo stream riprende.

  4. Il client o il server alla fine chiude la connessione.

L'API StreamingPull mantiene una connessione aperta. Pub/Sub i server chiudono ripetutamente la connessione dopo un determinato periodo di tempo per evitare con una connessione costante a lunga durata. La libreria client si riapre automaticamente una connessione StreamingPull.

I messaggi vengono inviati alla connessione quando sono disponibili. StreamingPull L'API riduce al minimo la latenza e massimizza la velocità effettiva per i messaggi.

Scopri di più sui metodi RPC StreamingPull: StreamingPullRequest e StreamingPullResponse.

API pull

Questa API è una RPC unaria tradizionale basata su una richiesta e una risposta un modello di machine learning. Una singola risposta pull corrisponde a una singola richiesta di pull. Di seguito è riportato il flusso di lavoro:

  1. Il client invia una richiesta di messaggi al server. Se la quota di throughput viene superata, il server restituisce un errore di esaurimento delle risorse.

  2. Se non viene rilevato alcun errore o se la quota di throughput è di nuovo disponibile, il server risponde con zero o più messaggi e ID di conferma.

Quando si utilizza l'API Pull unary, una risposta con zero messaggi o con un l'errore non indica necessariamente che non ci sono messaggi disponibili da ricevere.

L'utilizzo dell'API Pull non garantisce una bassa latenza e un'elevata velocità effettiva di messaggi. Per ottenere una velocità effettiva elevata e una bassa latenza con l'API Pull, devono avere più richieste in sospeso simultanee. Le nuove richieste vengono create quando le richieste precedenti ricevono una risposta. La progettazione di una soluzione di questo tipo è soggetta a errori e difficile da gestire. Ti consigliamo di utilizzare StreamingPull per questi casi d'uso.

Utilizza l'API Pull anziché l'API StreamingPull solo se hai bisogno di un controllo rigoroso su quanto segue:

  • Il numero di messaggi che il client sottoscrittore può elaborare
  • La memoria e le risorse del client

Puoi utilizzare questa API anche quando l'abbonato è un proxy tra Pub/Sub e un altro servizio che opera in modo più pull.

Scopri di più sui metodi REST pull: Metodo: projects.subscriptions.pull.

Scopri di più sui metodi RPC pull: PullRequest e PullResponse.

Tipi di modalità di elaborazione dei messaggi

Scegli una delle seguenti modalità pull per i client sottoscrittori.

Modalità pull asincrona

La modalità pull asincrona disaccoppia la ricezione dei messaggi dall'elaborazione di messaggi in un client sottoscrittore. Questa modalità è predefinita per la maggior parte dei client abbonati. La modalità pull asincrona può utilizzare l'API StreamingPull o l'API Pull unaria. Il pull asincrono può utilizzare anche la libreria client di alto livello libreria client generata automaticamente o di basso livello.

Puoi scoprire di più sulle librerie client più avanti in questo documento.

Modalità pull sincrona

In modalità pull sincrona, la ricezione e l'elaborazione dei messaggi avvengono in sequenza e non sono disaccoppiate l'una dall'altra. Pertanto, in modo simile StreamingPull e API unary Pull, l'elaborazione asincrona offre minore latenza e velocità effettiva superiore rispetto all'elaborazione sincrona.

Utilizza la modalità pull sincrona solo per le applicazioni in cui bassa latenza e alta throughput non sono i fattori più importanti rispetto ad altri requisiti. Ad esempio, un'applicazione potrebbe essere limitata all'utilizzo solo del modello di programmazione sincrona. Oppure, un'applicazione con risorse potrebbero richiedere un controllo più esatto su memoria, rete per la CPU. In questi casi, utilizza la modalità sincrona con l'API Pull unary.

Librerie client Pub/Sub

Pub/Sub offre un'architettura di alto livello e una di basso livello libreria client.

Libreria client Pub/Sub di alto livello

La libreria client di alto livello fornisce opzioni per controllare le scadenze di conferma utilizzando la gestione del leasing. Queste opzioni sono più granulare rispetto a quando configuri le scadenze di conferma utilizzando la console o l'interfaccia a riga di comando a livello di abbonamento. La libreria client di alto livello implementa anche il supporto di funzionalità come la consegna ordinata, la consegna exactly-once e il controllo del flusso.

Ti consigliamo di utilizzare il pull asincrono e l'API StreamingPull con la libreria client di alto livello. Non tutti i linguaggi supportati per Google Cloud supportano anche l'API Pull nella libreria client di alto livello.

Per utilizzare le librerie client di alto livello, consulta Librerie client Pub/Sub.

Libreria client Pub/Sub generata automaticamente di basso livello

È disponibile una libreria client di basso livello nei casi in cui è necessario utilizzare Esegui il pull dell'API direttamente. Puoi utilizzare l'elaborazione sincrona o asincrona con la libreria client generata automaticamente di basso livello. Devi programmare manualmente funzionalità come consegna ordinata, consegna "exactly-once", controllo del flusso e gestione dei leasing quando utilizzi la libreria client di basso livello generata automaticamente.

Puoi utilizzare il modello di elaborazione sincrona quando utilizzi la libreria client di basso livello autogenerata per tutti i linguaggi supportati. Puoi utilizzare la libreria client di basso livello generata automaticamente e il pull sincrono nei casi in cui sia opportuno utilizzare direttamente l'API Pull. Ad esempio, potresti avere già una logica di applicazione basata su questo modello.

Per utilizzare direttamente le librerie client di basso livello generate automaticamente, consulta Panoramica delle API Pub/Sub.

Esempi di codice della libreria client

Esempi di codice della libreria client StreamingPull e di alto livello


Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";


Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API C# Pub/Sub.

using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;


Prima di provare questo esempio, segui le istruzioni di configurazione di Go in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (


func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil


Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${}:`);
    console.log(`\tData: ${}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);


Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Message} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId: string, timeout: number) {
  // References an existing subscription; if you are unsure if the
  // subscription will exist, try the optimisticSubscribe sample.
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = (message: Message) => {
    console.log(`Received message ${}:`);
    console.log(`\tData: ${}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);


Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python Pub/Sub.

from concurrent.futures import TimeoutError
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

Recuperare gli attributi personalizzati utilizzando la libreria client di alto livello

Gli esempi riportati di seguito mostrano come eseguire il pull dei messaggi in modo asincrono e recuperarli gli attributi personalizzati dei metadati.


Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message with attributes:\n";
        for (auto& kv : m.attributes()) {
          std::cout << "  " << kv.first << ": " << kv.second << "\n";


Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API C# Pub/Sub.

using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithCustomAttributesAsyncSample
    public async Task<List<PubsubMessage>> PullMessagesWithCustomAttributesAsync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);

        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        var messages = new List<PubsubMessage>();
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            if (message.Attributes != null)
                foreach (var attribute in message.Attributes)
                    Console.WriteLine($"{attribute.Key} = {attribute.Value}");
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        // Run for 7 seconds.
        await Task.Delay(7000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messages;


Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (


func pullMsgsCustomAttributes(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	// Receive blocks until the context is cancelled or an error occurs.
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message :%q\n", string(msg.Data))
		fmt.Fprintln(w, "Attributes:")
		for key, value := range msg.Attributes {
			fmt.Fprintf(w, "%s = %s\n", key, value)
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)

	return nil


Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithCustomAttributesExample(projectId, subscriptionId);

  public static void subscribeWithCustomAttributesExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          // Print message attributes.
              .forEach((key, value) -> System.out.println(key + " = " + value));

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenWithCustomAttributes(subscriptionNameOrId, timeout) {
  // References an existing subscription, e.g. "my-subscription"
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
      `Received message: id ${}, data ${
      }, attributes: ${JSON.stringify(message.attributes)}`

    // "Ack" (acknowledge receipt of) the message

  // Wait a while for the subscription to run. (Part of the sample only.)
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
  }, timeout * 1000);


Prima di provare questo esempio, segui le istruzioni di configurazione di Python riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Python.

from concurrent.futures import TimeoutError
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {!r}.")
    if message.attributes:
        for key in message.attributes:
            value = message.attributes.get(key)
            print(f"{key}: {value}")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"
  unless received_message.attributes.empty?
    puts "Attributes:"
    received_message.attributes.each do |key, value|
      puts "#{key}: #{value}"

# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60

Gestire gli errori utilizzando la libreria client di alto livello

Gli esempi riportati di seguito mostrano come gestire gli errori che si verificano quando la sottoscrizione ai messaggi.


Prima di provare questo esempio, segui le istruzioni per la configurazione di C++ in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber
      .Subscribe([&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
      // Setup an error handler for the subscription session
      .then([](future<google::cloud::Status> f) {
        std::cout << "Subscription session result: " << f.get() << "\n";


Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

import (


func pullMsgsError(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	defer client.Close()

	// If the service returns a non-retryable error, Receive returns that error after
	// all of the outstanding calls to the handler have returned.
	err = client.Subscription(subID).Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
	if err != nil {
		return fmt.Errorf("Receive: %w", err)
	return nil


Prima di provare questo esempio, segui le istruzioni di configurazione di Java in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithErrorListenerExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithErrorListenerExample(projectId, subscriptionId);

  public static void subscribeWithErrorListenerExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());

    Subscriber subscriber = null;
    try {
      // Provides an executor service for processing messages.
      ExecutorProvider executorProvider =

      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)

      // Listen for unrecoverable failures. Rebuild a subscriber and restart subscribing
      // when the current subscriber encounters permanent errors.
          new Subscriber.Listener() {
            public void failed(Subscriber.State from, Throwable failure) {
              if (!executorProvider.getExecutor().isShutdown()) {
                subscribeWithErrorListenerExample(projectId, subscriptionId);

      // Start the subscriber.
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.


Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Node.js.

 * TODO(developer): Uncomment these variables before running the sample.
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForErrors(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  const messageHandler = message => {
    // Do something with the message
    console.log(`Message: ${message}`);

    // "Ack" (acknowledge receipt of) the message

  // Create an event handler to handle errors
  const errorHandler = error => {
    // Do something with the error
    console.error(`ERROR: ${error}`);
    throw error;

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  subscription.on('error', errorHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    subscription.removeListener('error', errorHandler);
  }, timeout * 1000);


Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Python.

from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    # When `timeout` is not set, result() will block indefinitely,
    # unless an exception is encountered first.
    except Exception as e:
            f"Listening for messages on {subscription_path} threw an exception: {e}."
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.


Prima di provare questo esempio, segui le istruzioni di configurazione di Go riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub Go.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{}"
# Propagate expection from child threads to the main thread as soon as it is
# raised. Exceptions happened in the callback thread are collected in the
# callback thread pool and do not propagate to the main thread
Thread.abort_on_exception = true

  # Let the main thread sleep for 60 seconds so the thread for listening
  # messages does not quit
  sleep 60
rescue StandardError => e
  puts "Exception #{e.inspect}: #{e.message}"
  raise "Stopped listening for messages."

Esempi di codice di pull unari

Ecco un codice campione tira e riconoscere un numero fisso di messaggi.


Prima di provare questo esempio, segui le istruzioni di configurazione C++ riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub C++.

[](google::cloud::pubsub::Subscriber subscriber) {
  auto response = subscriber.Pull();
  if (!response) throw std::move(response).status();
  std::cout << "Received message " << response->message << "\n";


Prima di provare questo esempio, segui le istruzioni di configurazione C# riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API C# Pub/Sub.

using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Linq;
using System.Threading;

public class PullMessagesSyncSample
    public int PullMessagesSync(string projectId, string subscriptionId, bool acknowledge)
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();
        int messageCount = 0;
            // Pull messages from server,
            // allowing an immediate response if there are no messages.
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);
            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");
                Interlocked.Increment(ref messageCount);
            // If acknowledgement required, send to server.
            if (acknowledge && messageCount > 0)
                subscriberClient.Acknowledge(subscriptionName, response.ReceivedMessages.Select(msg => msg.AckId));
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        return messageCount;


Prima di provare questo esempio, segui le istruzioni di configurazione di Java riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.

import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncExample(projectId, subscriptionId, numOfMessages);

  public static void subscribeSyncExample(
      String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
    SubscriberStubSettings subscriberStubSettings =
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
      PullRequest pullRequest =

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        // Handle received message
        // ...

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.


Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js riportate nella Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

 * TODO(developer): Uncomment these variables before running the sample.
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPull(projectId, subscriptionNameOrId) {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const request = {
    subscription: formattedSubscription,
    maxMessages: 10,

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Process the messages.
  const ackIds = [];
  for (const message of response.receivedMessages || []) {
    console.log(`Received message: ${}`);
    if (message.ackId) {

  if (ackIds.length !== 0) {
    // Acknowledge all of the messages. You could also acknowledge
    // these individually, but this is more efficient.
    const ackRequest = {
      subscription: formattedSubscription,
      ackIds: ackIds,

    await subClient.acknowledge(ackRequest);



Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Pub/Sub per Node.js.

use Google\Cloud\PubSub\PubSubClient;

 * Pulls all Pub/Sub messages for a subscription.
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
function pull_messages($projectId, $subscriptionName)
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.


Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate nella guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub =

subscription = pubsub.subscription subscription_id
subscription.pull(immediate: false).each do |message|
  puts "Message pulled: #{}"




  "returnImmediately": "false",
  "maxMessages": "1"


200 OK

  "receivedMessages": [{
    "message": {
      "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
      "messageId": "19917247034"



  "ackIds": [


Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Guida rapida all'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python Pub/Sub.

from google.api_core import retry
from import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)


# Wrap the subscriber in a 'with' block to automatically call close() to
# close the underlying gRPC channel when done.
with subscriber:
    # The subscriber pulls a specific number of messages. The actual
    # number of messages pulled may be smaller than max_messages.
    response = subscriber.pull(
        request={"subscription": subscription_path, "max_messages": NUM_MESSAGES},

    if len(response.received_messages) == 0:

    ack_ids = []
    for received_message in response.received_messages:
        print(f"Received: {}.")

    # Acknowledges the received messages so they will not be sent again.
        request={"subscription": subscription_path, "ack_ids": ack_ids}

        f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."

Pub/Sub consegna un elenco di messaggi. Se l'elenco contiene più messaggi, Pub/Sub li ordina con la stessa chiave di ordinamento. Di seguito sono riportate alcune avvertenze importanti:

  • L'impostazione di un valore per max_messages nella richiesta non garantisce che vengano restituiti max_messages, anche se sono presenti così tanti messaggi nel backlog. L'API Pull di Pub/Sub potrebbe restituire meno di max_messages per ridurre la latenza di recapito dei messaggi subito disponibili per la pubblicazione.

  • Una risposta pull che viene fornita con 0 messaggi non deve essere utilizzata come indicatore per assicurarti che non ci siano messaggi nel backlog. È possibile ricevere una risposta con 0 messaggi e una richiesta successiva che restituisce messaggi.

  • Per ottenere una bassa latenza di consegna dei messaggi con la modalità pull unario, essenziale avere molte richieste di pull contemporaneamente in sospeso. Come aumenta la velocità effettiva dell'argomento, sono necessarie più richieste di pull. In generale, la modalità StreamingPull è preferibile per le applicazioni sensibili alla latenza.

Quote e limiti

Sia le connessioni pull che StreamingPull sono soggette a quote e limiti. Per ulteriori informazioni, consulta Quote e limiti di Pub/Sub.

