Démarrage rapide : utiliser des bibliothèques clientes

Le service Cloud Pub/Sub permet aux applications d'échanger des messages de manière fiable, rapide et asynchrone. Pour cela, un producteur de données publie un message dans un sujet Cloud Pub/Sub. Un client abonné crée ensuite un abonnement associé à ce sujet et utilise les messages de l'abonnement. Cloud Pub/Sub conserve les messages qui n'ont pas pu être distribués de manière fiable pendant sept jours au maximum. Sur cette page, nous vous expliquons comment faire vos premiers pas avec Cloud Pub/Sub pour publier des messages à l'aide de bibliothèques clientes.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Configurez un projet dans la console GCP.

    Configurer un projet

    Cliquez pour effectuer les opérations suivantes :

    • Créer ou sélectionner un projet
    • Activer Cloud Pub/Subl'API requise pour ce projet
    • Créer un compte de service
    • Télécharger une clé privée au format JSON

    Vous pouvez afficher et gérer ces ressources à tout moment dans la console GCP.

  3. Définissez la variable d'environnement GOOGLE_APPLICATION_CREDENTIALS pour pointer vers le chemin du fichier JSON contenant la clé de votre compte de service. Cette variable ne s'applique qu'à la session de shell actuelle. Par conséquent, si vous ouvrez une nouvelle session, vous devez de nouveau la définir.

  4. Installez et initialisez le SDK Cloud.

Installation

Si vous utilisez le SDK (plutôt que l'environnement Cloud Shell), installez les bibliothèques clientes dans le langage de programmation de votre choix :

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

Si vous utilisez Maven, ajoutez les lignes suivantes à votre fichier pom.xml :
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.75.0</version>
</dependency>
Si vous utilisez Gradle, ajoutez les lignes suivantes à vos dépendances :
compile 'com.google.cloud:google-cloud-pubsub:1.75.0'
Si vous utilisez SBT, ajoutez les lignes suivantes à vos dépendances :
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.75.0"

Si vous utilisez IntelliJ ou Eclipse, vous pouvez ajouter des bibliothèques clientes à votre projet à l'aide des plug-ins IDE suivants :

Les plug-ins offrent des fonctionnalités supplémentaires, telles que la gestion des clés pour les comptes de service. Reportez-vous à la documentation de chaque plug-in pour plus de détails.

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Python

Pour en savoir plus sur la configuration de votre environnement de développement Python, consultez le guide de configuration d'un environnement de développement Python.

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

Ruby

gem install google-cloud-pubsub

Créer un sujet et un abonnement

Après avoir créé un sujet, vous pouvez vous y abonner ou y publier des messages.

Exécutez la commande gcloud pubsub topics create pour créer un sujet :

gcloud pubsub topics create my-topic

Exécutez la commande gcloud pubsub subscriptions create pour créer un abonnement. Seuls les messages publiés dans le sujet après la création de l'abonnement sont disponibles pour les applications d'abonnés.

gcloud pubsub subscriptions create my-sub --topic my-topic

Pour en savoir plus sur l'attribution de nom à vos sujets et à vos abonnements, consultez la section Noms de ressources.

Publier des messages

Vous êtes maintenant prêt à publier des messages dans le sujet :

C#

// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

Go

Avant d'essayer cet exemple, suivez les instructions de configuration de Go décrites dans le Guide de démarrage rapide de Cloud Pub/Sub – Utiliser des bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Cloud Pub/Sub pour Go.

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PublisherExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  /** Publish messages to a topic.
   * @param args topic name, number of messages
   */
  public static void main(String... args) throws Exception {
    // topic id, eg. "my-topic"
    String topicId = args[0];
    int messageCount = Integer.parseInt(args[1]);
    ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // convert message to bytes
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
            .setData(data)
            .build();

        // Schedule a message to be published. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      // Wait on any pending requests
      List<String> messageIds = ApiFutures.allAsList(futures).get();

      for (String messageId : messageIds) {
        System.out.println(messageId);
      }

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print('Published {} of message ID {}.'.format(data, future.result()))

print('Published messages.')

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish "This is a test message."

puts "Message published."

Recevoir des messages

Configurez à présent un abonné pour extraire les messages que vous venez de publier. Les abonnés doivent accuser réception de chaque message pendant une période configurable. Les messages non confirmés seront à nouveau distribués. Notez que Cloud Pub/Sub distribue parfois un message plusieurs fois pour garantir que tous les messages parviennent à un abonné au moins une fois. Voici un exemple de la façon dont vous pouvez recevoir et accuser réception des messages :

C#

// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class SubscriberExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

  static class MessageReceiverExample implements MessageReceiver {

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      messages.offer(message);
      consumer.ack();
    }
  }

  /** Receive messages over a subscription. */
  public static void main(String... args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
        PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
      // create a subscriber bound to the asynchronous message receiver
      subscriber =
          Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
      subscriber.startAsync().awaitRunning();
      // Continue to listen to messages
      while (true) {
        PubsubMessage message = messages.take();
        System.out.println("Message Id: " + message.getMessageId());
        System.out.println("Data: " + message.getData().toStringUtf8());
      }
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Comment ça s'est passé ?

Nettoyer (facultatif)

Afin d'éviter que des frais ne soient facturés sur votre compte Google Cloud Platform pour les ressources utilisées dans ce guide, vous pouvez supprimer le sujet et l'abonnement.
  gcloud pubsub subscriptions delete my-sub
  gcloud pubsub topics delete my-topic

Étapes suivantes

Pour découvrir des exemples complets que vous pouvez créer et exécuter, consultez les tutoriels Cloud Pub/Sub.

Si vous souhaitez en savoir plus sur la bibliothèque cliente pour le langage de votre choix, consultez la section relative aux bibliothèques clientes.

Pour découvrir une présentation de Cloud Pub/Sub, consultez la page Qu'est-ce que Cloud Pub/Sub ?.

Pour en savoir plus sur les concepts abordés sur cette page, consultez les guides pour les éditeurs et pour les abonnés.

Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Documentation sur Cloud Pub/Sub