Início rápido: como usar bibliotecas de cliente

O serviço Pub/Sub permite que os aplicativos troquem mensagens de maneira confiável, rápida e assíncrona. Para que isso aconteça, um produtor de dados publica mensagens em um tópico do Pub/Sub. Depois, um cliente assinante cria uma assinatura para o tópico para então começar a consumir as mensagens da assinatura. O Pub/Sub preserva as mensagens que não puderam ser entregues de maneira confiável por até sete dias. Veja nesta página como começar a publicar mensagens com o Pub/Sub usando bibliotecas de cliente.

Antes de começar

  1. Faça login na sua conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. Configure um projeto do Console do Cloud.

    Configurar um projeto

    Clique para:

    • Crie ou selecione um projeto.
    • ativar a API Pub/Sub para esse projeto;
    • criar uma conta de serviço;
    • fazer o download de uma chave privada como JSON.

    É possível visualizar e gerenciar esses recursos a qualquer momento no Console do Cloud.

  3. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  4. Instale e inicialize o SDK do Cloud..

Como instalar bibliotecas de cliente

Se você estiver usando o SDK (em vez do Cloud Shell), instale as bibliotecas de cliente na linguagem de programação de sua preferência.

Python

Para mais informações sobre a configuração do ambiente de desenvolvimento Python, consulte o Guia de configuração do ambiente de desenvolvimento Python.

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

Se você estiver usando o Maven, adicione o código abaixo ao arquivo pom.xml. Para mais informações sobre BOMs, consulte BOM das bibliotecas do Google Cloud Platform.

<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>8.0.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-pubsub</artifactId>
  </dependency>

Se você estiver usando o Gradle, adicione isto às dependências:

compile 'com.google.cloud:google-cloud-pubsub:1.107.0'

Se você estiver usando o sbt, adicione o seguinte às suas dependências:

libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.107.0"

Caso você esteja usando o IntelliJ ou o Eclipse, poderá adicionar bibliotecas de cliente ao seu projeto usando estes plug-ins de ambiente de desenvolvimento integrado:

Os plug-ins também oferecem outras funcionalidades, como gerenciamento de chaves de contas de serviço. Consulte a documentação de cada plug-in para mais detalhes.

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Ruby

gem install google-cloud-pubsub

Como criar um tópico e uma assinatura

Depois de criar um tópico, é possível assiná-lo ou publicar nele.

Use o comando gcloud pubsub topics create para criar um tópico:

    gcloud pubsub topics create my-topic
    

Use o comando gcloud pubsub subscriptions create para criar uma assinatura. Somente mensagens publicadas no tópico após a criação da assinatura estarão disponíveis para aplicativos do assinante.

    gcloud pubsub subscriptions create my-sub --topic my-topic
    

Para mais informações sobre como nomear tópicos e assinaturas, consulte Nomes de recursos.

Como publicar mensagens

PYTHON

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_name}`
    topic_path = publisher.topic_path(project_id, topic_name)

    for n in range(1, 10):
        data = u"Message number {}".format(n)
        # Data must be a bytestring
        data = data.encode("utf-8")
        # When you publish a message, the client returns a future.
        future = publisher.publish(topic_path, data=data)
        print(future.result())

    print("Published messages.")

C#

// PublisherClient collects messages into appropriately sized
    // batches.
    var publishTasks =
        messageTexts.Select(async text =>
        {
            try
            {
                string message = await publisher.PublishAsync(text);
                await Console.Out.WriteLineAsync($"Published message {message}");
            }
            catch (Exception exception)
            {
                await Console.Out.WriteLineAsync($"An error ocurred when publishing message {text}:");
                await Console.Out.WriteLineAsync(exception.Message);
            }
        });
    await Task.WhenAll(publishTasks);

GO

import (
    	"context"
    	"fmt"
    	"io"

    	"cloud.google.com/go/pubsub"
    )

    func publish(w io.Writer, projectID, topicID, msg string) error {
    	// projectID := "my-project-id"
    	// topicID := "my-topic"
    	// msg := "Hello World"
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	t := client.Topic(topicID)
    	result := t.Publish(ctx, &pubsub.Message{
    		Data: []byte(msg),
    	})
    	// Block until the result is returned and a server-generated
    	// ID is returned for the published message.
    	id, err := result.Get(ctx)
    	if err != nil {
    		return fmt.Errorf("Get: %v", err)
    	}
    	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
    	return nil
    }
    

JAVA


    import com.google.api.core.ApiFuture;
    import com.google.api.core.ApiFutures;
    import com.google.cloud.ServiceOptions;
    import com.google.cloud.pubsub.v1.Publisher;
    import com.google.protobuf.ByteString;
    import com.google.pubsub.v1.ProjectTopicName;
    import com.google.pubsub.v1.PubsubMessage;
    import java.util.ArrayList;
    import java.util.List;

    public class PublisherExample {

      // use the default project id
      private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

      /**
       * Publish messages to a topic.
       *
       * @param args topic name, number of messages
       */
      public static void main(String... args) throws Exception {
        // topic id, eg. "my-topic"
        String topicId = args[0];
        int messageCount = Integer.parseInt(args[1]);
        ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
        Publisher publisher = null;
        List<ApiFuture<String>> futures = new ArrayList<>();

        try {
          // Create a publisher instance with default settings bound to the topic
          publisher = Publisher.newBuilder(topicName).build();

          for (int i = 0; i < messageCount; i++) {
            String message = "message-" + i;

            // convert message to bytes
            ByteString data = ByteString.copyFromUtf8(message);
            PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

            // Schedule a message to be published. Messages are automatically batched.
            ApiFuture<String> future = publisher.publish(pubsubMessage);
            futures.add(future);
          }
        } finally {
          // Wait on any pending requests
          List<String> messageIds = ApiFutures.allAsList(futures).get();

          for (String messageId : messageIds) {
            System.out.println(messageId);
          }

          if (publisher != null) {
            // When finished with the publisher, shutdown to free up resources.
            publisher.shutdown();
          }
        }
      }
    }
    

NODE.JS

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const topicName = 'YOUR_TOPIC_NAME';
    // const data = JSON.stringify({foo: 'bar'});

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    async function publishMessage() {
      /**
       * TODO(developer): Uncomment the following lines to run the sample.
       */
      // const topicName = 'my-topic';

      // Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
      const dataBuffer = Buffer.from(data);

      const messageId = await pubSubClient.topic(topicName).publish(dataBuffer);
      console.log(`Message ${messageId} published.`);
    }

    publishMessage().catch(console.error);

PHP

use Google\Cloud\PubSub\PubSubClient;

    /**
     * Publishes a message for a Pub/Sub topic.
     *
     * @param string $projectId  The Google project ID.
     * @param string $topicName  The Pub/Sub topic name.
     * @param string $message  The message to publish.
     */
    function publish_message($projectId, $topicName, $message)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $topic = $pubsub->topic($topicName);
        $topic->publish(['data' => $message]);
        print('Message published' . PHP_EOL);
    }

RUBY

# project_id = "Your Google Cloud Project ID"
    # topic_name = "Your Pubsub topic name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    topic = pubsub.topic topic_name
    topic.publish "This is a test message."

    puts "Message published."

Receber mensagens

Configure um assinante para solicitar as mensagens que você acabou de publicar. Todos os assinantes precisam confirmar o recebimento de cada mensagem dentro de um determinado período de tempo configurável. As mensagens não confirmadas serão reenviadas. Observe que o Pub/Sub de vez em quando entrega as mensagens mais de uma vez para garantir que todas as mensagens foram entregues ao assinante. Veja um exemplo de como as mensagens são recebidas e confirmadas:

PYTHON

from google.cloud import pubsub_v1

    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    # The `subscription_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/subscriptions/{subscription_name}`
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()

C#

// SubscriberClient runs your message handle function on multiple
    // threads to maximize throughput.
    Task startTask = subscriber.StartAsync(
        async (PubsubMessage message, CancellationToken cancel) =>
        {
            string text =
                Encoding.UTF8.GetString(message.Data.ToArray());
            await Console.Out.WriteLineAsync(
                $"Message {message.MessageId}: {text}");
            return acknowledge ? SubscriberClient.Reply.Ack
                : SubscriberClient.Reply.Nack;
        });
    // Run for 3 seconds.
    await Task.Delay(3000);
    await subscriber.StopAsync(CancellationToken.None);

GO

import (
    	"context"
    	"fmt"
    	"io"
    	"sync"

    	"cloud.google.com/go/pubsub"
    )

    func pullMsgs(w io.Writer, projectID, subID string, topic *pubsub.Topic) error {
    	// projectID := "my-project-id"
    	// subID := "my-sub"
    	// topic of type https://godoc.org/cloud.google.com/go/pubsub#Topic
    	ctx := context.Background()
    	client, err := pubsub.NewClient(ctx, projectID)
    	if err != nil {
    		return fmt.Errorf("pubsub.NewClient: %v", err)
    	}

    	// Publish 10 messages on the topic.
    	var results []*pubsub.PublishResult
    	for i := 0; i < 10; i++ {
    		res := topic.Publish(ctx, &pubsub.Message{
    			Data: []byte(fmt.Sprintf("hello world #%d", i)),
    		})
    		results = append(results, res)
    	}

    	// Check that all messages were published.
    	for _, r := range results {
    		_, err := r.Get(ctx)
    		if err != nil {
    			return fmt.Errorf("Get: %v", err)
    		}
    	}
    	// Consume 10 messages.
    	var mu sync.Mutex
    	received := 0
    	sub := client.Subscription(subID)
    	cctx, cancel := context.WithCancel(ctx)
    	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
    		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
    		msg.Ack()
    		mu.Lock()
    		defer mu.Unlock()
    		received++
    		if received == 10 {
    			cancel()
    		}
    	})
    	if err != nil {
    		return fmt.Errorf("Receive: %v", err)
    	}
    	return nil
    }
    

JAVA


    import com.google.cloud.ServiceOptions;
    import com.google.cloud.pubsub.v1.AckReplyConsumer;
    import com.google.cloud.pubsub.v1.MessageReceiver;
    import com.google.cloud.pubsub.v1.Subscriber;
    import com.google.pubsub.v1.ProjectSubscriptionName;
    import com.google.pubsub.v1.PubsubMessage;

    public class SubscriberExample {
      // use the default project id
      private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

      static class MessageReceiverExample implements MessageReceiver {

        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
          System.out.println(
              "Message Id: " + message.getMessageId() + " Data: " + message.getData().toStringUtf8());
          // Ack only after all work for the message is complete.
          consumer.ack();
        }
      }

      /** Receive messages over a subscription. */
      public static void main(String... args) throws Exception {
        // set subscriber id, eg. my-sub
        String subscriptionId = args[0];
        ProjectSubscriptionName subscriptionName =
            ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
        Subscriber subscriber = null;
        try {
          // create a subscriber bound to the asynchronous message receiver
          subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
          subscriber.startAsync().awaitRunning();
          // Allow the subscriber to run indefinitely unless an unrecoverable error occurs.
          subscriber.awaitTerminated();
        } catch (IllegalStateException e) {
          System.out.println("Subscriber unexpectedly stopped: " + e);
        }
      }
    }

NODE.JS

/**
     * TODO(developer): Uncomment these variables before running the sample.
     */
    // const subscriptionName = 'YOUR_SUBSCRIPTION_NAME';
    // const timeout = 60;

    // Imports the Google Cloud client library
    const {PubSub} = require('@google-cloud/pubsub');

    // Creates a client; cache this for further use
    const pubSubClient = new PubSub();

    function listenForMessages() {
      // References an existing subscription
      const subscription = pubSubClient.subscription(subscriptionName);

      // Create an event handler to handle messages
      let messageCount = 0;
      const messageHandler = message => {
        console.log(`Received message ${message.id}:`);
        console.log(`\tData: ${message.data}`);
        console.log(`\tAttributes: ${message.attributes}`);
        messageCount += 1;

        // "Ack" (acknowledge receipt of) the message
        message.ack();
      };

      // Listen for new messages until timeout is hit
      subscription.on('message', messageHandler);

      setTimeout(() => {
        subscription.removeListener('message', messageHandler);
        console.log(`${messageCount} message(s) received.`);
      }, timeout * 1000);
    }

    listenForMessages();

PHP

use Google\Cloud\PubSub\PubSubClient;

    /**
     * Pulls all Pub/Sub messages for a subscription.
     *
     * @param string $projectId  The Google project ID.
     * @param string $subscriptionName  The Pub/Sub subscription name.
     */
    function pull_messages($projectId, $subscriptionName)
    {
        $pubsub = new PubSubClient([
            'projectId' => $projectId,
        ]);
        $subscription = $pubsub->subscription($subscriptionName);
        foreach ($subscription->pull() as $message) {
            printf('Message: %s' . PHP_EOL, $message->data());
            // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
            $subscription->acknowledge($message);
        }
    }

RUBY

# project_id        = "Your Google Cloud Project ID"
    # subscription_name = "Your Pubsub subscription name"
    require "google/cloud/pubsub"

    pubsub = Google::Cloud::Pubsub.new project: project_id

    subscription = pubsub.subscription subscription_name
    subscriber   = subscription.listen do |received_message|
      puts "Received message: #{received_message.data}"
      received_message.acknowledge!
    end

    subscriber.start
    # Let the main thread sleep for 60 seconds so the thread for listening
    # messages does not quit
    sleep 60
    subscriber.stop.wait!

Como foi?

Limpar (opcional)

Para evitar que os recursos usados neste guia sejam cobrados na conta do Google Cloud Platform, use a linha de comando para excluir o tópico e a assinatura.

      gcloud pubsub subscriptions delete my-sub
      gcloud pubsub topics delete my-topic
    

A seguir