Conectar o Pub/Sub ao Apache Kafka

Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector do Kafka do grupo do Pub/Sub.

Sobre o conector do Kafka do grupo do Pub/Sub

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. Ele é comumente usado em arquiteturas distribuídas para permitir a comunicação entre componentes acoplados com flexibilidade. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de maneira assíncrona. Assim como acontece com o Kafka, é possível usar o Pub/Sub para a comunicação entre componentes na sua arquitetura de nuvem.

O conector de Kafka do grupo do Pub/Sub permite integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:

  • O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
  • O conector de origem lê mensagens de um tópico do Pub/Sub e as publica no Kafka.

Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:

  • Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
  • Você tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos seus serviços de back-end que precisam receber os eventos do Kafka.
  • Você coleta registros de uma solução local do Kafka e os envia ao Google Cloud para análise de dados.
  • Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando o Kafka.

O conector requer o Kafka Connect (em inglês), que é um framework para fazer streaming de dados entre o Kafka e outros sistemas. Para usar o conector, é necessário executar o Kafka Connect com o cluster do Kafka.

Neste documento, presumimos que você esteja familiarizado com o Kafka e o Pub/Sub. Antes de ler este documento, conclua um dos guias de início rápido do Pub/Sub.

O conector do Pub/Sub não oferece suporte a nenhuma integração entre as ACLs do Google Cloud IAM e do Kafka Connect.

Começar a usar o conector

Esta seção mostra as tarefas a seguir:

  1. Configure o conector do Kafka do grupo do Pub/Sub.
  2. Envie eventos do Kafka para o Pub/Sub.
  3. Envie mensagens do Pub/Sub para o Kafka.

Pré-requisitos

Instale o Kafka

Siga o guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua estas etapas no guia de início rápido:

  1. Faça o download da versão mais recente do Kafka e extraia-a.
  2. Inicie o ambiente Kafka.
  3. Criar um tópico do Kafka.

Autenticar

O conector de Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  6. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  7. Instale a CLI do Google Cloud.
  8. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  9. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  10. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  11. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.

Fazer o download do JAR do conector

Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.

Copiar os arquivos de configuração do conector

  1. Clone ou faça o download do repositório do GitHub para o conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copie o conteúdo do diretório config para o subdiretório config da instalação do Kafka.

    cp config/* [path to Kafka installation]/config/
    

Esses arquivos contêm as definições de configuração do conector.

Atualizar a configuração do Kafka Connect

  1. Navegue até o diretório que contém o binário do Kafka Connect que você fez o download.
  2. No diretório binário do Kafka Connect, abra o arquivo chamado config/connect-standalone.properties em um editor de texto.
  3. Remova a marca de comentário de plugin.path property se houver algum comentário.
  4. Atualize o plugin.path property para incluir o caminho para o JAR do conector.

    Exemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Defina a propriedade offset.storage.file.filename como um nome de arquivo local. No modo independente, o Kafka usa esse arquivo para armazenar dados de deslocamento.

    Exemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Encaminhar eventos do Kafka para o Pub/Sub

Nesta seção, descrevemos como iniciar o conector do coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub.

  1. Use o Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para receber as mensagens do Kafka.
    • PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub para o tópico.
  2. Abra o arquivo /config/cps-sink-connector.properties em um editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas como "TODO" nos comentários:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Substitua:

    • KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura.
    • PROJECT_ID: o projeto do Google Cloud que contém o tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
  3. No diretório do Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Siga as etapas no guia de início rápido do Apache Kafka para gravar alguns eventos no tópico do Kafka.

  5. Use a CLI gcloud para ler os eventos do Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Encaminhar mensagens do Pub/Sub para o Kafka

Nesta seção, descrevemos como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.

  1. Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
    • PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub.
  2. Abra o arquivo /config/cps-source-connector.properties em um editor de texto. Adicione valores para as seguintes propriedades, marcadas como "TODO" nos comentários:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Substitua:

    • KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
    • PROJECT_ID: o projeto do Google Cloud que contém o tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub.
  3. No diretório do Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Use a CLI gcloud para publicar uma mensagem no Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leia a mensagem do Kafka. Siga as etapas no Guia de início rápido do Apache Kafka para ler as mensagens no tópico do Kafka.

Conversão de mensagem

Um registro Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.

O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades nos arquivos de configuração do conector:

  • key.converter: o conversor usado para serializar chaves de registro.
  • value.converter: o conversor usado para serializar valores de registro.

O corpo de uma mensagem do Pub/Sub é um objeto ByteString. Portanto, a conversão mais eficiente é copiar o payload diretamente. Por esse motivo, recomendamos o uso de um conversor que produza tipos de dados primitivos (número inteiro, flutuante, string ou esquema de bytes) sempre que possível, para evitar a desserialização e a reserialização do mesmo corpo da mensagem.

Conversão do Kafka para o Pub/Sub

O conector do coletor converte registros Kafka em mensagens do Pub/Sub da seguinte maneira:

  • A chave de registro do Kafka é armazenada como um atributo chamado "key" na mensagem do Pub/Sub.
  • Por padrão, o conector descarta todos os cabeçalhos no registro do Kafka. No entanto, se você definir a opção de configuração headers.publish como true, o conector gravará os cabeçalhos como atributos do Pub/Sub. O conector pula todos os cabeçalhos que excedem os limites de atributos de mensagens do Pub/Sub.
  • Para esquemas de números inteiros, flutuantes, strings e bytes, o conector transmite os bytes do valor do registro Kafka diretamente para o corpo da mensagem do Pub/Sub.
  • Nos esquemas de struct, o conector grava cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for { "id"=123 }, a mensagem do Pub/Sub resultante terá um atributo "id"="123". O valor do campo é sempre convertido em uma string. Os tipos Map e struct não são compatíveis como tipos de campo em um struct.
  • Nos esquemas de mapa, o conector grava cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for {"alice"=1,"bob"=2}, a mensagem do Pub/Sub resultante terá dois atributos, "alice"="1" e "bob"="2". As chaves e os valores são convertidos em strings.

Os esquemas struct e de mapa têm alguns comportamentos adicionais:

  • Também é possível especificar um determinado campo struct ou chave de mapa para ser o corpo da mensagem, definindo a propriedade de configuração messageBodyName. O valor do campo ou chave é armazenado como um ByteString no corpo da mensagem. Se você não definir messageBodyName, o corpo da mensagem ficará vazio para os esquemas de struct e mapa.

  • Para valores de matriz, o conector oferece suporte apenas aos tipos de matriz primitiva. A sequência de valores na matriz é concatenada em um único objeto ByteString.

Conversão do Pub/Sub para o Kafka

O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte maneira:

  • Chave de registro Kafka: por padrão, a chave é definida como null. Se quiser, especifique um atributo de mensagem do Pub/Sub para usar como a chave definindo a opção de configuração kafka.key.attribute. Nesse caso, o conector procura um atributo com esse nome e define a chave de registro como o valor do atributo. Se o atributo especificado não estiver presente, a chave de registro será definida como null.

  • Valor do registro do Kafka. O conector grava o valor do registro da seguinte maneira:

    • Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector gravará o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for false, o conector gravará um struct no valor do registro. O struct contém um campo para cada atributo e um campo chamado "message", cujo valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Nesse caso, é necessário usar um value.converter compatível com esquemas de struct, como org.apache.kafka.connect.json.JsonConverter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for true, o conector gravará os atributos como cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

  • Cabeçalhos de registro do Kafka: Por padrão, os cabeçalhos ficam vazios, a menos que você defina kafka.record.headers como true.

Opções de configuração

Além das configurações fornecidas pela API Kafka Connect, o conector do Kafka do grupo Pub/Sub é compatível com as configurações a seguir.

Opções de configuração do conector do coletor

O conector do coletor é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para o conector do coletor do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

O endpoint do Pub/Sub a ser usado.

Padrão: "pubsub.googleapis.com:443".

cps.project String Obrigatório. O Google Cloud que contém o tópico do Pub/Sub.
cps.topic String Obrigatório. O tópico do Pub/Sub em que os registros do Kafka serão publicados.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite.
headers.publish Boolean

Quando true, inclua todos os cabeçalhos de registro Kafka como atributos de mensagem do Pub/Sub.

Padrão: false.

maxBufferBytes Long

O número máximo de bytes a serem recebidos em uma partição de tópico do Kafka antes de publicá-los no Pub/Sub.

Padrão: 10.000.000.

maxBufferSize Integer

O número máximo de registros a serem recebidos em uma partição de tópicos do Kafka antes de publicá-los no Pub/Sub.

Padrão: 100;

maxDelayThresholdMs Integer

O tempo máximo de espera para alcançar maxBufferSize ou maxBufferBytes antes de publicar registros pendentes no Pub/Sub, em milissegundos.

Padrão: 100;

maxOutstandingMessages Long

O número máximo de registros que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie a publicação contínua.

Padrão: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

O número máximo de bytes totais que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie mais publicações.

Padrão: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

Tempo limite das solicitações de publicação individuais no Pub/Sub, em milissegundos.

Padrão: 10000.

maxTotalTimeoutMs Integer

O tempo limite total, em milissegundos, para uma chamada publicar no Pub/Sub, incluindo novas tentativas.

Padrão: 60000.

metadata.publish Boolean

Quando true, inclua o tópico, a partição, o deslocamento e o carimbo de data/hora do Kafka como atributos de mensagem do Pub/Sub.

Padrão: false.

messageBodyName String

Ao usar um esquema de valor de struct ou mapa, especifica o nome de um campo ou chave a ser usado como o corpo da mensagem do Pub/Sub. Consulte Conversão do Kafka para o Pub/Sub.

Padrão: "cps_message_body".

orderingKeySource String

Especifica como definir a chave de ordem na mensagem do Pub/Sub. Pode ser um dos seguintes valores:

  • none: não defina a chave de ordem.
  • key: usa a chave de registro do Kafka como chave de ordenação.
  • partition: usa o número da partição, convertido em string, como chave de ordenação. Use essa configuração apenas para tópicos de baixa capacidade ou com milhares de partições.

Padrão: none.

topics String Obrigatório. Uma lista separada por vírgulas de tópicos do Kafka para leitura.

Opções de configuração do conector de origem

O conector de origem é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para o conector de origem do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

O endpoint do Pub/Sub a ser usado.

Padrão: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Quando true, grave a chave de ordem no registro do Kafka usando o mesmo formato dos atributos de mensagem do Pub/Sub. Consulte Conversão do Pub/Sub para registros Kafka.

Padrão: false.

cps.maxBatchSize Integer

O número máximo de mensagens em lote por solicitação de envio para o Pub/Sub.

Padrão: 100

cps.project String Obrigatório. O projeto do Google Cloud que contém o tópico do Pub/Sub.
cps.subscription String Obrigatório. O nome da assinatura do Pub/Sub da qual receber mensagens.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite.
kafka.key.attribute String

O atributo de mensagem do Pub/Sub a ser usado como chave para mensagens publicadas no Kafka. Se definido como "orderingKey", use a chave de ordem da mensagem. Se for null, os registros Kafka não terão uma chave.

Padrão: null.

kafka.partition.count Integer

O número de partições do Kafka para o tópico do Kafka em que as mensagens são publicadas. Esse parâmetro será ignorado se o esquema de partição for "kafka_partitioner".

Padrão: 1;

kafka.partition.scheme String

O esquema para atribuir uma mensagem a uma partição no Kafka. Pode ser um dos seguintes valores:

  • round_robin: atribui partições em modo round-robin.
  • hash_key: encontra a partição fazendo o hash da chave de registro.
  • hash_value: encontre a partição fazendo o hash do valor do registro.
  • kafka_partitioner: delega a lógica de particionamento ao produtor do Kafka. Por padrão, o produtor do Kafka detecta automaticamente o número de partições e executa o mapeamento de partição baseado em hash de detecção ou round-robin, dependendo do fornecimento de uma chave de registro.
  • ordering_key: usa o código hash da chave de ordenação de uma mensagem. Se nenhuma chave de ordem estiver presente, use round_robin.

Padrão: round_robin.

kafka.record.headers Boolean

Se true, escreva atributos de mensagem do Pub/Sub como cabeçalhos do Kafka.

kafka.topic String Obrigatório. O tópico do Kafka que recebe mensagens do Pub/Sub.

Como receber suporte

Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub (link em inglês).

A seguir