Guia de início rápido: publicar e receber mensagens no Pub/Sub Lite usando a API Apache Kafka

Publique e receba mensagens no Pub/Sub Lite usando a API Apache Kafka

Nesta página, mostramos como usar o Pub/Sub Lite Kafka Shim para publicar e receber mensagens do Pub/Sub Lite.

O Pub/Sub Lite Kafka Shim é uma biblioteca Java que facilita o trabalho dos usuários da biblioteca de cliente Java (em inglês) do Apache Kafka com o Pub/Sub Lite. Isso acontece por meio da implementação da API Producer e da API Consumer (links em inglês).

Isso é possível porque os tópicos do Pub/Sub Lite, como os do Apache Kafka, são registros particionados que rastreiam o progresso do consumidor com deslocamentos numéricos.

Embora os dois sistemas sejam semelhantes, há algumas diferenças práticas importantes:

  • Um tópico do Pub/Sub Lite equivale a um do Kafka. No entanto, um tópico do Lite inclui capacidade configurável de armazenamento e processamento em cada partição do tópico. Já a capacidade do tópico do Kafka é determinada pelas configurações do cluster do Kafka.
  • Uma assinatura do Pub/Sub Lite equivale a um grupo de consumidores do Kafka. Uma assinatura do Lite é um recurso nomeado de primeira ordem do Google Cloud Platform que representa mensagens de partições de tópicos do Lite que podem ser lidas pelos assinantes. Da mesma forma, um grupo de consumidores do Kafka é composto por consumidores que podem ler dados das partições de um tópico do Kafka.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Ative a API Pub/Sub Lite:

    gcloud services enable pubsublite.googleapis.com
  7. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  8. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  9. Instale a CLI do Google Cloud.
  10. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  11. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  12. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  13. Ative a API Pub/Sub Lite:

    gcloud services enable pubsublite.googleapis.com
  14. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  15. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.

Instale a biblioteca de cliente

As amostras a seguir mostram como instalar as bibliotecas de cliente pubsublite-kafka.

Java

Se você estiver usando o Maven sem BOM, adicione isto às dependências:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>1.1.1</version>
</dependency>

Se você estiver usando o Gradle, adicione isto às dependências:

implementation 'com.google.cloud:pubsublite-kafka:1.1.2'

Se você estiver usando o sbt, adicione o seguinte às suas dependências:

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.1.2"

Se você estiver usando o Visual Studio Code, o IntelliJ ou o Eclipse, poderá adicionar bibliotecas de cliente ao projeto usando estes plug-ins de IDE:

Os plug-ins também oferecem outras funcionalidades, como gerenciamento de chaves de contas de serviço. Consulte a documentação de cada plug-in para mais detalhes.

Criar um tópico e uma assinatura do Lite

Crie um tópico e uma assinatura do Lite usando os comandos a seguir.

gcloud pubsub lite-topics create LITE_TOPIC_NAME \
    --location=LITE_LOCATION \
    --partitions=1 \
    --per-partition-bytes=30GiB

gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION_NAME \
    --location=LITE_LOCATION \
    --topic=LITE_TOPIC_NAME \
    --starting-offset=end \
    --delivery-requirement=deliver-after-stored

Substitua:

  • LITE_TOPIC_NAME: o nome do novo tópico do Lite.
  • LITE_SUBSCRIPTION_NAME: o nome da nova assinatura do Lite.
  • LITE_LOCATION: o local em que você está criando o tópico e a assinatura do Lite. Escolha um local do Pub/Sub Lite compatível. Especifique também uma zona para a região. Por exemplo, us-central1-a.

Publicar mensagens no Pub/Sub Lite

Crie um produtor do Pub/Sub Lite para enviar dados ao tópico do Lite. Essa classe implementa org.apache.kafka.clients.Producer, que é a mesma interface que KafkaProducer (links em inglês).

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Receber mensagens do Pub/Sub Lite

Crie um consumidor do Pub/Sub Lite para receber dados da assinatura do Lite. Essa classe implementa org.apache.kafka.clients.consumer.Consumer, que é a mesma interface que KafkaConsumer (links em inglês).

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Cloud com esses recursos.

  1. Exclua o tópico e a assinatura do Lite.

    gcloud pubsub lite-topics delete LITE_TOPIC_NAME
    gcloud pubsub lite-subscriptions delete LITE_SUBSCRIPTION_NAME
    

  2. Exclua a conta de serviço:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  3. Opcional: revogue as credenciais de autenticação que você criou e exclua o arquivo de credenciais local:

    gcloud auth application-default revoke
  4. Opcional: revogar credenciais da CLI gcloud.

    gcloud auth revoke

A seguir