Guida rapida: pubblica e ricevi messaggi in Pub/Sub utilizzando una libreria client

Pubblica e ricevi messaggi in Pub/Sub utilizzando una libreria client

Il servizio Pub/Sub consente alle applicazioni di scambiare messaggi in modo affidabile, rapido e asincrono. Di seguito è riportata la sequenza degli eventi:

  1. Un producer di dati pubblica un messaggio in un argomento Pub/Sub.
  2. Un client sottoscrittore crea una sottoscrizione a quell'argomento e consuma messaggi dalla sottoscrizione.

Puoi configurare un ambiente Pub/Sub utilizzando uno dei seguenti metodi: console Google Cloud, Cloud Shell, librerie client o API REST. Questa pagina mostra come iniziare a pubblicare messaggi con Pub/Sub utilizzando le librerie client.

Pub/Sub offre una libreria client generata automaticamente e di alto livello. Per impostazione predefinita, come in questa guida rapida, è consigliabile la libreria client di alto livello.


Per seguire le indicazioni dettagliate per questa attività direttamente nella console Google Cloud, fai clic su Procedura guidata:

Procedura guidata


Prima di iniziare

  1. Accedi al tuo account Google Cloud. Se non conosci Google Cloud, crea un account per valutare le prestazioni dei nostri prodotti in scenari reali. I nuovi clienti ricevono anche 300 $di crediti gratuiti per l'esecuzione, il test e il deployment dei carichi di lavoro.
  2. Installa Google Cloud CLI.
  3. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

    gcloud init
  4. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID
    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID
  5. Verifica che la fatturazione sia attivata per il tuo progetto Google Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.

  6. Attiva Pub/Sub API.

    gcloud services enable pubsub.googleapis.com
  7. Crea le credenziali di autenticazione per il tuo Account Google:

    gcloud auth application-default login
  8. Concedi i ruoli al tuo Account Google. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni ruolo.
  9. Installa Google Cloud CLI.
  10. Per inizializzare l'interfaccia a riga di comando gcloud, esegui il comando seguente:

    gcloud init
  11. Crea o seleziona un progetto Google Cloud.

    • Crea un progetto Google Cloud:

      gcloud projects create PROJECT_ID
    • Seleziona il progetto Google Cloud che hai creato:

      gcloud config set project PROJECT_ID
  12. Verifica che la fatturazione sia attivata per il tuo progetto Google Cloud. Scopri come verificare se la fatturazione è abilitata per un progetto.

  13. Attiva Pub/Sub API.

    gcloud services enable pubsub.googleapis.com
  14. Crea le credenziali di autenticazione per il tuo Account Google:

    gcloud auth application-default login
  15. Concedi i ruoli al tuo Account Google. Esegui il comando seguente una volta per ciascuno dei seguenti ruoli IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci EMAIL_ADDRESS con il tuo indirizzo email.
    • Sostituisci ROLE con ogni ruolo.

Installazione delle librerie client

I seguenti esempi mostrano come installare le librerie client:

Python

Per saperne di più sulla configurazione del tuo ambiente di sviluppo Python, consulta la guida alla configurazione dell'ambiente di sviluppo di Python.

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

C++

Per maggiori informazioni sull'installazione della libreria C++, consulta GitHub README.

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get cloud.google.com/go/pubsub

Java

Se utilizzi Maven, aggiungi quanto segue al file pom.xml. Per ulteriori informazioni sui BOM, consulta la pagina BOM delle librerie Google Cloud Platform.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>26.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
  </dependency>

</dependencies>

Se utilizzi Gradle, aggiungi quanto segue alle dipendenze:

implementation platform('com.google.cloud:libraries-bom:26.0.0')

implementation 'com.google.cloud:google-cloud-pubsub'

Se utilizzi sbt, aggiungi quanto segue alle dipendenze:

libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.120.1"

Se utilizzi Visual Studio Code, IntelliJ o Eclipse, puoi aggiungere librerie client al progetto utilizzando i seguenti plug-in IDE:

I plug-in offrono funzionalità aggiuntive, come la gestione delle chiavi per gli account di servizio. Per informazioni dettagliate, consulta la documentazione di ogni plug-in.

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Ruby

gem install google-cloud-pubsub

Crea un argomento e una sottoscrizione

Dopo aver creato un argomento, puoi iscriverti o pubblicarlo.

Utilizza il seguente comando gcloud pubsub topics create per creare un argomento denominato my-topic. Non modificare il nome dell'argomento, perché fa riferimento al resto del tutorial.

gcloud pubsub topics create my-topic

Utilizza il comando gcloud pubsub subscription create per creare una sottoscrizione. Solo i messaggi pubblicati nell'argomento dopo la creazione della sottoscrizione sono disponibili per le applicazioni abbonato.

gcloud pubsub subscriptions create my-sub --topic my-topic

Per maggiori informazioni sulla denominazione di argomenti e iscrizioni, consulta Nomi delle risorse.

Pubblica dei messaggi

Prima di eseguire i seguenti esempi, assicurati di rimuovere il commento e inserire uno dei valori obbligatori contrassegnati nel codice. Questa operazione è necessaria per collegare l'esempio al progetto e le risorse Pub/Sub che hai creato in precedenza.

Utilizza my-topic per l'ID argomento.

Python

from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

C++

#include "google/cloud/pubsub/publisher.h"
#include <iostream>

int main(int argc, char* argv[]) try {
  if (argc != 3) {
    std::cerr << "Usage: " << argv[0] << " <project-id> <topic-id>\n";
    return 1;
  }

  std::string const project_id = argv[1];
  std::string const topic_id = argv[2];

  // Create a namespace alias to make the code easier to read.
  namespace pubsub = ::google::cloud::pubsub;
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(pubsub::Topic(project_id, topic_id)));
  auto id =
      publisher
          .Publish(pubsub::MessageBuilder{}.SetData("Hello World!").Build())
          .get();
  if (!id) throw std::move(id).status();
  std::cout << "Hello World published with id=" << *id << "\n";

  return 0;
} catch (google::cloud::Status const& status) {
  std::cerr << "google::cloud::Status thrown: " << status << "\n";
  return 1;
}

C#


using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PublishMessagesAsyncSample
{
    public async Task<int> PublishMessagesAsync(string projectId, string topicId, IEnumerable<string> messageTexts)
    {
        TopicName topicName = TopicName.FromProjectTopic(projectId, topicId);
        PublisherClient publisher = await PublisherClient.CreateAsync(topicName);

        int publishedMessageCount = 0;
        var publishTasks = messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                Console.WriteLine($"Published message {message}");
                Interlocked.Increment(ref publishedMessageCount);
            }
            catch (Exception exception)
            {
                Console.WriteLine($"An error ocurred when publishing message {text}: {exception.Message}");
            }
        });
        await Task.WhenAll(publishTasks);
        return publishedMessageCount;
    }
}

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publish(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub: NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	result := t.Publish(ctx, &pubsub.Message{
		Data: []byte(msg),
	})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("pubsub: result.Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java


import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublisherExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publisherExample(projectId, topicId);
  }

  public static void publisherExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);

    Publisher publisher = null;
    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      // Once published, returns a server-assigned message id (unique within the topic)
      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const data = JSON.stringify({foo: 'bar'});

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishMessage(topicNameOrId, data) {
  // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
  const dataBuffer = Buffer.from(data);

  try {
    const messageId = await pubSubClient
      .topic(topicNameOrId)
      .publishMessage({data: dataBuffer});
    console.log(`Message ${messageId} published.`);
  } catch (error) {
    console.error(`Received error while publishing: ${error.message}`);
    process.exitCode = 1;
  }
}

PHP

use Google\Cloud\PubSub\MessageBuilder;
use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $topic = $pubsub->topic($topicName);
    $topic->publish((new MessageBuilder)->setData($message)->build());

    print('Message published' . PHP_EOL);
}

Ruby

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id
topic.publish "This is a test message."

puts "Message published."

Ricevere messaggi

Configura un sottoscrittore per il pull dei messaggi appena pubblicati. Ogni sottoscrittore deve confermare ciascun messaggio entro un periodo di tempo configurabile. I messaggi non confermati vengono ricaricati. Tieni presente che Pub/Sub consegna occasionalmente un messaggio più di una volta per garantire che tutti i messaggi arrivino a un sottoscrittore almeno una volta.

Prima di eseguire i seguenti esempi, assicurati di rimuovere il commento e inserire uno dei valori obbligatori contrassegnati nel codice. Questa operazione è necessaria per collegare l'esempio al progetto e le risorse Pub/Sub che hai creato in precedenza

Usa my-sub come ID abbonamento.

Python

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

C++

namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
  return subscriber.Subscribe(
      [&](pubsub::Message const& m, pubsub::AckHandler h) {
        std::cout << "Received message " << m << "\n";
        std::move(h).ack();
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
};

C#


using Google.Cloud.PubSub.V1;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesAsyncSample
{
    public async Task<int> PullMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = System.Text.Encoding.UTF8.GetString(message.Data.ToArray());
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

import (
	"context"
	"fmt"
	"io"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
)

func pullMsgs(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)

	// Receive messages for 10 seconds, which simplifies testing.
	// Comment this out in production, since `Receive` should
	// be used as a long running operation.
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var received int32
	err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		atomic.AddInt32(&received, 1)
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	fmt.Fprintf(w, "Received %d messages\n", received)

	return nil
}

Java


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

function listenForMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = message => {
    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);
    messageCount += 1;

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Ruby

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Com'è andata?

Eseguire la pulizia (facoltativo)

  1. Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa guida, puoi utilizzare la riga di comando per eliminare l'argomento e la sottoscrizione.
      gcloud pubsub subscriptions delete my-sub
      gcloud pubsub topics delete my-topic
    
  2. (Facoltativo) Revoca le credenziali di autenticazione che hai creato ed elimina il file delle credenziali locale.

    gcloud auth application-default revoke
  3. Facoltativo: revoca le credenziali dall'interfaccia a riga di comando gcloud.

    gcloud auth revoke

Passaggi successivi