Guia de início rápido: como usar bibliotecas de cliente

O serviço Cloud Pub/Sub permite que os aplicativos troquem mensagens de maneira confiável, rápida e assíncrona. Para que isso aconteça, um produtor de dados publica mensagens em um tópico do Cloud Pub/Sub. Depois, um cliente do assinante cria uma assinatura no tópico para consumir as respectivas mensagens. O Cloud Pub/Sub preserva as mensagens que não puderam ser entregues de maneira confiável por até sete dias. Veja nesta página como começar a publicar mensagens com o Cloud Pub/Sub usando bibliotecas de cliente.

Antes de começar

  1. Faça login na sua Conta do Google.

    Se você ainda não tiver uma, inscreva-se.

  2. {% dynamic if "no_credentials" in setvar.task_params %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic if not setvar.redirect_url %}{% dynamic setvar redirect_url %}https://console.cloud.google.com{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if setvar.in_henhouse_no_auth_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}NO_AUTH{% dynamic endsetvar %}{% dynamic endif %}{% dynamic elif setvar.in_henhouse_service_account_whitelist %}{% dynamic if not setvar.credential_type %}{% dynamic setvar credential_type %}SERVICE_ACCOUNT{% dynamic endsetvar %}{% dynamic endif %}{% dynamic endif %}{% dynamic if not setvar.service_account_roles and setvar.credential_type == "SERVICE_ACCOUNT" %}{% dynamic setvar service_account_roles %}{% dynamic endsetvar %}{% dynamic endif %}{% dynamic setvar console %}{% dynamic if "no_steps" not in setvar.task_params %}
  3. {% dynamic endif %}{% dynamic if setvar.api_list %}{% dynamic if setvar.in_henhouse_no_auth_whitelist or setvar.in_henhouse_service_account_whitelist %} Configure um projeto do Console do GCP.

    Configurar um projeto

    Clique para:

    • criar ou selecionar um projeto;
    • ativar {% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}obrigatória{% dynamic endif %}{% dynamic if "," in setvar.api_list %} APIs{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %} para o projeto;
    • {% dynamic if setvar.credential_type == 'SERVICE_ACCOUNT' %}
    • criar uma conta de serviço;
    • fazer o download de uma chave privada como JSON.
    • {% dynamic endif %}

    É possível ver e gerenciar esses recursos a qualquer momento no Console do GCP.

    {% dynamic else %}{% dynamic if "no_text" not in setvar.task_params %} Ativar {% dynamic if setvar.api_names %}{% dynamic print setvar.api_names %}{% dynamic else %}obrigatória{% dynamic endif %}{% dynamic if "," in setvar.api_list %} APIs{% dynamic elif "API" in setvar.api_names %}{% dynamic else %} API{% dynamic endif %}. {% dynamic endif %}

    Ativar {% dynamic if "," in setvar.api_list %} as APIs{% dynamic else %} a API{% dynamic endif %}

    {% dynamic endif %}{% dynamic endif %}{% dynamic if "no_steps" not in setvar.task_params %}
  4. {% dynamic endif %}{% dynamic endsetvar %}{% dynamic print setvar.console %}
  5. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS para o caminho do arquivo JSON que contém a chave da sua conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

  6. Instale e inicialize o SDK do Cloud.

Instalação

Se você estiver usando o SDK (em vez do Cloud Shell), instale as bibliotecas de cliente na linguagem de programação de sua preferência.

C#

Install-Package Google.Cloud.PubSub.V1 -Pre

Go

go get -u cloud.google.com/go/pubsub

Java

Se você estiver usando o Maven, adicione ao seu arquivo pom.xml:
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.61.0</version>
</dependency>
Se você estiver usando o Gradle, adicione às suas dependências:
compile 'com.google.cloud:google-cloud-pubsub:1.61.0'
Se você estiver usando o SBT, adicione às suas dependências:
libraryDependencies += "com.google.cloud" % "google-cloud-pubsub" % "1.61.0"

Caso você esteja usando o IntelliJ ou o Eclipse, poderá adicionar bibliotecas de cliente ao seu projeto usando estes plug-ins de ambiente de desenvolvimento integrado:

Os plug-ins também oferecem outras funcionalidades, como gerenciamento de chaves de contas de serviço. Consulte a documentação de cada plug-in para mais detalhes.

Node.js

npm install --save @google-cloud/pubsub

PHP

composer require google/cloud-pubsub

Python

Para mais informações sobre a configuração do ambiente de desenvolvimento Python, consulte o Guia de configuração do ambiente de desenvolvimento Python.

# ensure that you are using virtualenv
# as described in the python dev setup guide

pip install --upgrade google-cloud-pubsub

Ruby

gem install google-cloud-pubsub

Criar um tópico e uma assinatura

Depois de criar um tópico, é possível assiná-lo ou publicar nele.

Use o comando gcloud pubsub topics create para criar um tópico:

gcloud pubsub topics create my-topic

Use o comando gcloud pubsub subscriptions create para criar uma assinatura. Somente mensagens publicadas no tópico após a criação da assinatura estarão disponíveis para aplicativos do assinante.

gcloud pubsub subscriptions create my-sub --topic my-topic

Para mais informações sobre como nomear tópicos e assinaturas, consulte Nomes dos recursos.

Publicar mensagens

Agora, você pode publicar mensagens no tópico:

C#

// PublisherClient collects messages into appropriately sized
// batches.
var publishTasks =
    messageTexts.Select(text => publisher.PublishAsync(text));
foreach (Task<string> task in publishTasks)
{
    string message = await task;
    await Console.Out.WriteLineAsync($"Published message {message}");
}

Go

Antes de testar este exemplo, siga as instruções de configuração do Go no Guia de início rápido do Cloud Pub/Sub: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Cloud Pub/Sub para Go (em inglês).

t := client.Topic(topic)
result := t.Publish(ctx, &pubsub.Message{
	Data: []byte(msg),
})
// Block until the result is returned and a server-generated
// ID is returned for the published message.
id, err := result.Get(ctx)
if err != nil {
	return err
}
fmt.Printf("Published a message; msg ID: %v\n", id)

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class PublisherExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  /** Publish messages to a topic.
   * @param args topic name, number of messages
   */
  public static void main(String... args) throws Exception {
    // topic id, eg. "my-topic"
    String topicId = args[0];
    int messageCount = Integer.parseInt(args[1]);
    ProjectTopicName topicName = ProjectTopicName.of(PROJECT_ID, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Create a publisher instance with default settings bound to the topic
      publisher = Publisher.newBuilder(topicName).build();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // convert message to bytes
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
            .setData(data)
            .build();

        // Schedule a message to be published. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      // Wait on any pending requests
      List<String> messageIds = ApiFutures.allAsList(futures).get();

      for (String messageId : messageIds) {
        System.out.println(messageId);
      }

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const topicName = 'my-topic';
// const data = JSON.stringify({ foo: 'bar' });

// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
const dataBuffer = Buffer.from(data);

const messageId = await pubsub.topic(topicName).publish(dataBuffer);
console.log(`Message ${messageId} published.`);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Publishes a message for a Pub/Sub topic.
 *
 * @param string $projectId  The Google project ID.
 * @param string $topicName  The Pub/Sub topic name.
 * @param string $message  The message to publish.
 */
function publish_message($projectId, $topicName, $message)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $topic = $pubsub->topic($topicName);
    $topic->publish(['data' => $message]);
    print('Message published' . PHP_EOL);
}

Python

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

publisher = pubsub_v1.PublisherClient()
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
    data = u'Message number {}'.format(n)
    # Data must be a bytestring
    data = data.encode('utf-8')
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data=data)
    print('Published {} of message ID {}.'.format(data, future.result()))

print('Published messages.')

Ruby

# project_id = "Your Google Cloud Project ID"
# topic_name = "Your Pubsub topic name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

topic = pubsub.topic topic_name
topic.publish "This is a test message."

puts "Message published."

Receber mensagens

Agora, configure um assinante para solicitar as mensagens que você acabou de publicar. Todos os assinantes precisam confirmar o recebimento de cada mensagem dentro de um determinado período de tempo configurável. As mensagens não confirmadas serão reenviadas. Observe que o Cloud Pub/Sub de vez em quando entrega as mensagens mais de uma vez para garantir que todas as mensagens foram entregues ao assinante. Veja um exemplo de como as mensagens são recebidas e confirmadas:

C#

// SubscriberClient runs your message handle function on multiple
// threads to maximize throughput.
subscriber.StartAsync(
    async (PubsubMessage message, CancellationToken cancel) =>
    {
        string text =
            Encoding.UTF8.GetString(message.Data.ToArray());
        await Console.Out.WriteLineAsync(
            $"Message {message.MessageId}: {text}");
        return acknowledge ? SubscriberClient.Reply.Ack
            : SubscriberClient.Reply.Nack;
    });
// Run for 3 seconds.
await Task.Delay(3000);
await subscriber.StopAsync(CancellationToken.None);

Go

// Consume 10 messages.
var mu sync.Mutex
received := 0
sub := client.Subscription(subName)
cctx, cancel := context.WithCancel(ctx)
err := sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
	msg.Ack()
	fmt.Printf("Got message: %q\n", string(msg.Data))
	mu.Lock()
	defer mu.Unlock()
	received++
	if received == 10 {
		cancel()
	}
})
if err != nil {
	return err
}

Java

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class SubscriberExample {

  // use the default project id
  private static final String PROJECT_ID = ServiceOptions.getDefaultProjectId();

  private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

  static class MessageReceiverExample implements MessageReceiver {

    @Override
    public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
      messages.offer(message);
      consumer.ack();
    }
  }

  /** Receive messages over a subscription. */
  public static void main(String... args) throws Exception {
    // set subscriber id, eg. my-sub
    String subscriptionId = args[0];
    ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(
        PROJECT_ID, subscriptionId);
    Subscriber subscriber = null;
    try {
      // create a subscriber bound to the asynchronous message receiver
      subscriber =
          Subscriber.newBuilder(subscriptionName, new MessageReceiverExample()).build();
      subscriber.startAsync().awaitRunning();
      // Continue to listen to messages
      while (true) {
        PubsubMessage message = messages.take();
        System.out.println("Message Id: " + message.getMessageId());
        System.out.println("Data: " + message.getData().toStringUtf8());
      }
    } finally {
      if (subscriber != null) {
        subscriber.stopAsync();
      }
    }
  }
}

Node.js

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client
const pubsub = new PubSub();

/**
 * TODO(developer): Uncomment the following lines to run the sample.
 */
// const subscriptionName = 'my-sub';
// const timeout = 60;

// References an existing subscription
const subscription = pubsub.subscription(subscriptionName);

// Create an event handler to handle messages
let messageCount = 0;
const messageHandler = message => {
  console.log(`Received message ${message.id}:`);
  console.log(`\tData: ${message.data}`);
  console.log(`\tAttributes: ${message.attributes}`);
  messageCount += 1;

  // "Ack" (acknowledge receipt of) the message
  message.ack();
};

// Listen for new messages until timeout is hit
subscription.on(`message`, messageHandler);

setTimeout(() => {
  subscription.removeListener('message', messageHandler);
  console.log(`${messageCount} message(s) received.`);
}, timeout * 1000);

PHP

use Google\Cloud\PubSub\PubSubClient;

/**
 * Pulls all Pub/Sub messages for a subscription.
 *
 * @param string $projectId  The Google project ID.
 * @param string $subscriptionName  The Pub/Sub subscription name.
 */
function pull_messages($projectId, $subscriptionName)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);
    $subscription = $pubsub->subscription($subscriptionName);
    foreach ($subscription->pull() as $message) {
        printf('Message: %s' . PHP_EOL, $message->data());
        // Acknowledge the Pub/Sub message has been received, so it will not be pulled multiple times.
        $subscription->acknowledge($message);
    }
}

Python

import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
    project_id, subscription_name)

def callback(message):
    print('Received message: {}'.format(message))
    message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
    time.sleep(60)

Ruby

# project_id        = "Your Google Cloud Project ID"
# subscription_name = "Your Pubsub subscription name"
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new project: project_id

subscription = pubsub.subscription subscription_name
subscriber   = subscription.listen do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

Como foi?

Limpeza (opcional)

Para evitar que os recursos usados neste guia sejam cobrados na conta do Google Cloud Platform, você pode excluir o tópico e a assinatura.
  gcloud pubsub subscriptions delete my-sub
  gcloud pubsub topics delete my-topic

A seguir

Para ver exemplos completos que você pode criar e executar, consulte os tutoriais do Cloud Pub/Sub.

Para ver detalhes sobre a biblioteca de cliente da sua linguagem preferida, consulte Bibliotecas de cliente.

Para uma visão geral do Cloud Pub/Sub, veja O que é o Cloud Pub/Sub?

Para mais informações sobre os conceitos discutidos nesta página, consulte os guias do editor e do assinante.

Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Documentação do Cloud Pub/Sub