Como usar o Pub/Sub Lite com a API Apache Kafka

Nesta página, mostramos como usar o Pub/Sub Lite do Kafka para publicar mensagens e receber mensagens do Pub/Sub Lite.

O Kafka Kafka do Pub/Sub Lite é uma biblioteca Java que facilita para os usuários da biblioteca de cliente Java Apache Kafka (em inglês) trabalharem com Pub/Sub Lite Isso é feito por meio da implementação da API Producer e da API Consumer.

Isso é possível porque os tópicos do Pub/Sub Lite, como os tópicos do Apache Kafka, são registros particionados que rastreiam o progresso do consumidor com deslocamentos numéricos.

Embora os dois sistemas sejam semelhantes, há algumas diferenças práticas notáveis:

  • Um tópico do Pub/Sub Lite é equivalente a um tópico do Kafka. No entanto, um tópico do Lite tem capacidade configurável e capacidade de armazenamento em cada partição de tópico, enquanto a capacidade do tópico do Kafka é determinada pelas configurações de cluster do Kafka.
  • Uma assinatura do Pub/Sub Lite é equivalente a um grupo de consumidores do Kafka. Uma assinatura do Lite é um recurso de primeira ordem do Google Cloud Platform que representa mensagens de partições de tópico do Lite que podem ser lidas pelos assinantes. Da mesma forma, um grupo de consumidores do Kafka é composto por consumidores que podem ler dados das partições de um tópico do Kafka.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No Console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se o faturamento está ativado para seu projeto na nuvem. Saiba como confirmar se o faturamento está ativado para o projeto.

  4. Ative a API Pub/Sub Lite.

    Ative a API

  5. Crie uma conta de serviço:

    1. No Console do Cloud, acesse a página Criar conta de serviço.

      Acesse Criar conta de serviço
    2. Selecione um projeto.
    3. No campo Nome da conta de serviço, insira um nome. O Console do Cloud preenche o campo ID da conta de serviço com base nesse nome.

      No campo Descrição da conta de serviço, insira uma descrição. Por exemplo, Service account for quickstart.

    4. Clique em Criar.
    5. Clique no campo Selecionar um papel.

      Em Acesso rápido, clique em Básico e em Proprietário.

    6. Clique em Continuar.
    7. Clique em Concluído para terminar a criação da conta de serviço.

      Não feche a janela do navegador. Você vai usá-lo na próxima etapa.

  6. Crie uma chave de conta de serviço:

    1. No Console do Cloud, clique no endereço de e-mail da conta de serviço que você criou.
    2. Clique em Chaves.
    3. Clique em Adicionar chave e em Criar nova chave.
    4. Clique em Criar. O download de um arquivo de chave JSON é feito no seu computador.
    5. Clique em Fechar.
  7. Defina a variável de ambiente GOOGLE_APPLICATION_CREDENTIALS como o caminho do arquivo JSON que contém a chave da conta de serviço. Essa variável só se aplica à sessão de shell atual. Dessa maneira, se você abrir uma nova sessão, defina a variável novamente.

Como instalar o módulo

Java

Se você estiver usando o Maven sem BOM, adicione isto às dependências:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>0.2.3</version>
</dependency>

Se você estiver usando o Gradle, adicione isto às dependências:

compile 'com.google.cloud:pubsublite-kafka:0.2.3'

Se você estiver usando o sbt, adicione o seguinte às suas dependências:

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "0.2.3"

Caso você esteja usando o IntelliJ ou o Eclipse, poderá adicionar bibliotecas de cliente ao seu projeto usando estes plug-ins de ambiente de desenvolvimento integrado:

Os plug-ins também oferecem outras funcionalidades, como gerenciamento de chaves de contas de serviço. Consulte a documentação de cada plug-in para mais detalhes.

Como criar um tópico do Pub/Sub Lite

Crie um tópico do Lite usando as seguintes etapas:

  1. No Console do Cloud, acesse a página Tópicos do Lite.

    Acessar a página de tópicos do Lite

  2. Clique em Criar tópico Lite.

  3. Selecione uma região e uma zona.

  4. Na seção Nome, digite your-lite-topic como o ID do tópico do Lite. O nome do tópico do Lite inclui o ID do tópico do Lite, a zona e o número do projeto.

  5. Clique em Criar.

Como criar uma assinatura do Pub/Sub Lite

Crie uma assinatura do Lite seguindo estas etapas:

  1. No Console do Cloud, acesse a página Assinaturas do Lite.

    Acessar a página "Assinaturas Lite"

  2. Clique em Criar assinatura do Lite.

  3. No campo ID da assinatura do Lite, digite your-lite-subscription.

  4. Selecione um tópico do Lite para receber mensagens.

  5. Na seção Requisito de entrega, selecione Entregar mensagens após o armazenamento.

  6. Clique em Criar.

A assinatura do Lite está na mesma zona que o tópico do Lite.

Como publicar mensagens no Pub/Sub Lite

Construa um produtor do Pub/Sub Lite para enviar dados ao seu tópico do Lite. Essa classe implementa org.apache.kafka.clients.Producer, a mesma interface de KafkaProducer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Como receber mensagens do Pub/Sub Lite

Crie um consumidor Pub/Sub Lite para receber dados da sua assinatura do Lite. Essa classe implementa org.apache.kafka.clients.consumer.Consumer, a mesma interface de KafkaConsumer.

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Limpeza

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados neste guia de início rápido, siga estas etapas:

  1. No Console do Cloud, acesse a página Tópicos do Lite.

    Acessar a página de tópicos do Lite

  2. Clique em seu-tópico.

  3. Na página Detalhes do tópico do Lite, clique em Excluir.

  4. No campo exibido, digite delete para confirmar que você quer excluir o tópico do Lite.

  5. Clique em Excluir.

  6. Repita essas etapas para sua assinatura do Lite.

A seguir