Conectar o Pub/Sub Lite ao Apache Kafka

Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub Lite usando o conector do Kafka do grupo do Pub/Sub.

Sobre o conector do Kafka do grupo do Pub/Sub

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. Ele é comumente usado em arquiteturas distribuídas para permitir a comunicação entre componentes acoplados com flexibilidade. O Pub/Sub Lite é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como acontece com o Kafka, é possível usar o Pub/Sub Lite para fazer a comunicação entre componentes na sua arquitetura de nuvem.

O conector de Kafka do grupo do Pub/Sub permite integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:

  • O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub Lite.
  • O conector de origem lê mensagens de um tópico do Pub/Sub Lite e as publica no Kafka.

Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:

  • Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
  • Você tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos seus serviços de back-end que precisam receber os eventos do Kafka.
  • Você coleta registros de uma solução local do Kafka e os envia ao Google Cloud para análise de dados.
  • Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando o Kafka.

O conector requer o Kafka Connect (em inglês), que é um framework para fazer streaming de dados entre o Kafka e outros sistemas. Para usar o conector, é necessário executar o Kafka Connect com o cluster do Kafka.

Neste documento, presumimos que você esteja familiarizado com o Kafka e o Pub/Sub Lite. Para começar a usar o Pub/Sub Lite, consulte Publicar e receber mensagens no Pub/Sub Lite usando o console do Google Cloud.

Introdução ao conector do Kafka do grupo do Pub/Sub

Esta seção mostra as tarefas a seguir:

  1. Configure o conector do Kafka do grupo do Pub/Sub.
  2. Envie eventos do Kafka para o Pub/Sub Lite.
  3. Envie mensagens do Pub/Sub Lite para o Kafka.

Pré-requisitos

Instale o Kafka

Siga o guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua estas etapas no guia de início rápido:

  1. Faça o download da versão mais recente do Kafka e extraia-a.
  2. Inicie o ambiente Kafka.
  3. Criar um tópico do Kafka.

Autenticar

O conector de Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  6. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  7. Instale a CLI do Google Cloud.
  8. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  9. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  10. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  11. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.

Fazer o download do JAR do conector

Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.

Copiar os arquivos de configuração do conector

  1. Clone ou faça o download do repositório do GitHub para o conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copie o conteúdo do diretório config para o subdiretório config da instalação do Kafka.

    cp config/* [path to Kafka installation]/config/
    

Esses arquivos contêm as definições de configuração do conector.

Atualizar a configuração do Kafka Connect

  1. Navegue até o diretório que contém o binário do Kafka Connect que você fez o download.
  2. No diretório binário do Kafka Connect, abra o arquivo chamado config/connect-standalone.properties em um editor de texto.
  3. Remova a marca de comentário de plugin.path property se houver algum comentário.
  4. Atualize o plugin.path property para incluir o caminho para o JAR do conector.

    Exemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Defina a propriedade offset.storage.file.filename como um nome de arquivo local. No modo independente, o Kafka usa esse arquivo para armazenar dados de deslocamento.

    Exemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Encaminhar eventos do Kafka para o Pub/Sub Lite

Nesta seção, descrevemos como iniciar o conector do coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub Lite.

  1. Use o Google Cloud CLI para criar uma reserva do Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Substitua:

    • RESERVATION_NAME: o nome da reserva do Pub/Sub Lite.
    • LOCATION: o local da reserva.
  2. Use o Google Cloud CLI para criar um tópico do Pub/Sub Lite com uma assinatura.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Substitua:

    • LITE_TOPIC: o nome do tópico do Pub/Sub Lite para receber mensagens do Kafka.
    • LOCATION: a localização do tema. O valor precisa corresponder ao local da reserva.
    • RESERVATION_NAME: o nome da reserva do Pub/Sub Lite.
    • LITE_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub Lite para o tópico.
  3. Abra o arquivo /config/pubsub-lite-sink-connector.properties em um editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas como "TODO" nos comentários:

    topics=KAFKA_TOPICS
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.topic=LITE_TOPIC
    

    Substitua:

    • KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura.
    • PROJECT_ID: o projeto do Google Cloud que contém o tópico do Pub/Sub Lite.
    • LOCATION: o local do tópico do Pub/Sub Lite.
    • LITE_TOPIC: o tópico do Pub/Sub Lite para receber mensagens do Kafka.
  4. No diretório do Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-sink-connector.properties
    
  5. Siga as etapas no guia de início rápido do Apache Kafka para gravar alguns eventos no tópico do Kafka.

  6. Inscreva-se na assinatura do Pub/Sub Lite usando um dos métodos mostrados em Como receber mensagens de assinaturas do Lite.

Encaminhar mensagens do Pub/Sub Lite para o Kafka

Nesta seção, descrevemos como iniciar o conector de origem, publicar mensagens no Pub/Sub Lite e ler as mensagens encaminhadas do Kafka.

  1. Use o Google Cloud CLI para criar uma reserva do Pub/Sub Lite.

    gcloud pubsub lite-reservations create RESERVATION_NAME \
    --location=LOCATION \
    --throughput-capacity=4
    

    Substitua:

    • RESERVATION_NAME: o nome da reserva do Pub/Sub Lite.
    • LOCATION: o local da reserva.
  2. Use o Google Cloud CLI para criar um tópico do Pub/Sub Lite com uma assinatura.

    gcloud pubsub lite-topics create LITE_TOPIC \
    --location=LOCATION \
    --partitions=2 \
    --per-partition-bytes=30GiB \
    --throughput-reservation=RESERVATION_NAME
    
    gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \
    --location=LOCATION \
    --topic=LITE_TOPIC
    

    Substitua:

    • LITE_TOPIC: o nome do tópico do Pub/Sub Lite.
    • LOCATION: a localização do tema. O valor precisa corresponder ao local da reserva.
    • RESERVATION_NAME: o nome da reserva do Pub/Sub Lite.
    • LITE_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub Lite para o tópico.
  3. Abra o arquivo /config/pubsub-lite-source-connector.properties em um editor de texto. Adicione valores para as seguintes propriedades, marcadas como "TODO" nos comentários:

    topic=KAFKA_TOPIC
    pubsublite.project=PROJECT_ID
    pubsublite.location=LOCATION
    pubsublite.subscription=LITE_SUBSCRIPTION
    

    Substitua:

    • KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
    • PROJECT_ID: o projeto do Google Cloud que contém o tópico do Pub/Sub.
    • LOCATION: o local do tópico do Pub/Sub Lite.
    • LITE_SUBSCRIPTION: o tópico do Pub/Sub Lite.
  4. No diretório do Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/pubsub-lite-source-connector.properties
    
  5. Publique mensagens no tópico do Pub/Sub Lite usando qualquer um dos métodos mostrados em Como publicar mensagens em tópicos do Lite.

  6. Leia a mensagem do Kafka. Siga as etapas no Guia de início rápido do Apache Kafka para ler as mensagens no tópico do Kafka.

Conversão de mensagem

Um registro Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub Lite tem os seguintes campos:

  • key: chave da mensagem (bytes)
  • data: dados da mensagem (bytes)
  • attributes: zero ou mais atributos. Cada atributo é um mapa (key,values[]). Um único atributo pode ter vários valores.
  • event_time: um carimbo de data/hora do evento opcional fornecido pelo usuário.

O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades nos arquivos de configuração do conector:

  • key.converter: o conversor usado para serializar chaves de registro.
  • value.converter: o conversor usado para serializar valores de registro.

Conversão do Kafka para o Pub/Sub Lite

O conector do coletor converte registros Kafka em mensagens do Pub/Sub Lite da seguinte maneira.

Registro do Kafka (SinkRecord) Mensagem do Pub/Sub Lite
Chave key
Valor data
Cabeçalhos attributes
Carimbo de data/hora eventTime
Tipo de carimbo de data/hora attributes["x-goog-pubsublite-source-kafka-event-time-type"]
Tópico attributes["x-goog-pubsublite-source-kafka-topic"]
Partição attributes["x-goog-pubsublite-source-kafka-offset"]
Deslocamento attributes["x-goog-pubsublite-source-kafka-partition"]

As chaves, os valores e os cabeçalhos são codificados da seguinte maneira:

  • Esquemas nulos são tratados como esquemas de string.
  • Os payloads de bytes são gravados diretamente, sem conversão.
  • Payloads de string, número inteiro e ponto flutuante são codificados em uma sequência de bytes UTF-8.
  • Todos os outros payloads são codificados em um tipo Value de buffer de protocolo e convertidos em uma string de bytes.
    • Os campos de string aninhados são codificados em um protobuf Value.
    • Os campos de bytes aninhados são codificados em um protobuf Value que contém os bytes codificados em base64.
    • Campos numéricos aninhados são codificados como duplos em um protobuf Value.
    • Mapas com chaves de matriz, mapa ou struct não são aceitos.

Conversão do Pub/Sub Lite para o Kafka

O conector de origem converte mensagens do Pub/Sub Lite em registros Kafka da seguinte maneira:

Mensagem do Pub/Sub Lite Registro do Kafka (SourceRecord)
key Chave
data Valor
attributes Cabeçalhos
event_time Carimbo de data/hora Se event_time não estiver presente, o horário de publicação será usado.

Opções de configuração

Além das configurações fornecidas pela API Kafka Connect, o conector é compatível com as configurações do Pub/Sub Lite a seguir.

Opções de configuração do conector do coletor

O conector do coletor é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para o conector do coletor do Pub/Sub Lite, o valor precisa ser com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite.
pubsublite.location String Obrigatório. O local do tópico do Pub/Sub Lite.
pubsublite.project String Obrigatório. O Google Cloud que contém o tópico do Pub/Sub Lite.
pubsublite.topic String Obrigatório. O tópico do Pub/Sub Lite em que os registros do Kafka serão publicados.
topics String Obrigatório. Uma lista separada por vírgulas de tópicos do Kafka para leitura.

Opções de configuração do conector de origem

O conector de origem é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para o conector de origem do Pub/Sub Lite, o valor precisa ser com.google.pubsublite.kafka.source.PubSubLiteSourceConnector.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite.
kafka.topic String Obrigatório. O tópico do Kafka que recebe mensagens do Pub/Sub Lite.
pubsublite.location String Obrigatório. O local do tópico do Pub/Sub Lite.
pubsublite.partition_flow_control.bytes Long

O número máximo de bytes pendentes por partição do Pub/Sub Lite.

Padrão: 20.000.000

pubsublite.partition_flow_control.messages Long

O número máximo de mensagens pendentes por partição do Pub/Sub Lite.

Padrão: Long.MAX_VALUE

pubsublite.project String Obrigatório. O projeto do Google Cloud que contém o tópico do Pub/Sub Lite.
pubsublite.subscription String Obrigatório. O nome da assinatura do Pub/Sub Lite da qual receber mensagens.

A seguir