Conectar o Pub/Sub ao Apache Kafka

Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de Kafka do grupo do Pub/Sub.

Sobre o conector de Kafka do grupo do Pub/Sub

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. Ele é comumente usado em arquiteturas distribuídas para permitir a comunicação entre componentes com acoplamento flexível. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de modo assíncrono. Assim como no Kafka, é possível usar o Pub/Sub para a comunicação entre componentes da arquitetura em nuvem.

Com o conector de Kafka do grupo Pub/Sub, é possível integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:

  • O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
  • O conector de origem lê mensagens de um tópico do Pub/Sub e as publica no Kafka.

Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:

  • Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
  • Você tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos serviços de back-end, que precisam receber os eventos do Kafka.
  • Você coleta registros de uma solução Kafka local e os envia ao Google Cloud para análise de dados.
  • Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando o Kafka.

O conector requer o Kafka Connect, que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar o conector, execute o Kafka Connect junto com o cluster do Kafka.

Neste documento, você precisa conhecer o Kafka e o Pub/Sub. Antes de ler este documento, é recomendável concluir um dos guias de início rápido do Pub/Sub.

O conector do Pub/Sub não é compatível com a integração entre as ACLs do Google Cloud IAM e do Kafka Connect.

Começar a usar o conector

Nesta seção, explicamos as seguintes tarefas:

  1. Configurar o conector de Kafka do grupo do Pub/Sub
  2. Enviar eventos do Kafka para o Pub/Sub.
  3. Envie mensagens do Pub/Sub para o Kafka.

Pré-requisitos

Instale o Kafka

Siga o guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua estas etapas no guia de início rápido:

  1. Faça o download da versão mais recente do Kafka e extraia-o.
  2. Inicie o ambiente Kafka.
  3. Criar um tópico Kafka.

Autenticar

O conector do Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  6. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  7. Instale a CLI do Google Cloud.
  8. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  9. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  10. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  11. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.

Baixar o JAR do conector

Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.

Copie os arquivos de configuração do conector

  1. Clone ou faça o download do repositório do GitHub para o conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copie o conteúdo do diretório config para o subdiretório config da instalação do Kafka.

    cp config/* [path to Kafka installation]/config/
    

Esses arquivos contêm as configurações do conector.

Atualizar a configuração do Kafka Connect

  1. Navegue até o diretório que contém o binário do Kafka Connect que você fez download.
  2. No diretório binário do Kafka Connect, abra o arquivo chamado config/connect-standalone.properties em um editor de texto.
  3. Se a plugin.path property estiver comentada, remova a marca de comentário.
  4. Atualize o plugin.path property para incluir o caminho para o JAR do conector.

    Exemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Defina a propriedade offset.storage.file.filename como um nome de arquivo local. No modo independente, o Kafka usa esse arquivo para armazenar dados de deslocamento.

    Exemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Encaminhar eventos do Kafka para o Pub/Sub

Esta seção descreve como iniciar o conector do coletor, publicar eventos no Kafka e, em seguida, ler as mensagens encaminhadas do Pub/Sub.

  1. Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para receber as mensagens do Kafka.
    • PUBSUB_SUBSCRIPTION: o nome de uma assinatura Pub/Sub do tópico.
  2. Abra o arquivo /config/cps-sink-connector.properties em um editor de texto. Adicione valores para as propriedades abaixo, marcadas como "TODO" nos comentários:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Substitua:

    • KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura.
    • PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
  3. No diretório Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Siga as etapas do guia de início rápido do Apache Kafka para gravar alguns eventos no tópico do Kafka.

  5. Use a CLI gcloud para ler os eventos do Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Encaminhar mensagens do Pub/Sub para o Kafka

Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.

  1. Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
    • PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub.
  2. Abra o arquivo chamado /config/cps-source-connector.properties em um editor de texto. Adicione valores para as propriedades abaixo, que estão marcadas como "TODO" nos comentários:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Substitua:

    • KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
    • PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub.
  3. No diretório Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Use a CLI gcloud para publicar uma mensagem no Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leia a mensagem do Kafka. Siga as etapas do guia de início rápido do Apache Kafka para ler as mensagens do tópico Kafka.

Conversão de mensagem

Um registro Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.

O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades nos arquivos de configuração do conector:

  • key.converter: o conversor usado para serializar chaves de registro.
  • value.converter: o conversor usado para serializar valores de registro.

O corpo de uma mensagem do Pub/Sub é um objeto ByteString. Portanto, a conversão mais eficiente é copiar o payload diretamente. Por esse motivo, recomendamos o uso de um conversor que produza tipos de dados primitivos (número inteiro, flutuante, string ou esquema de bytes) sempre que possível para evitar a desserialização e reserialização do mesmo corpo da mensagem.

Conversão do Kafka para o Pub/Sub

O conector do coletor converte registros do Kafka em mensagens do Pub/Sub da seguinte maneira:

  • A chave de registro do Kafka é armazenada como um atributo chamado "key" na mensagem do Pub/Sub.
  • Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se você definir a opção de configuração headers.publish como true, o conector gravará os cabeçalhos como atributos do Pub/Sub. O conector ignora os cabeçalhos que excedem os limites de atributos de mensagens do Pub/Sub.
  • Para esquemas de número inteiro, flutuante, string e bytes, o conector transmite os bytes do valor do registro do Kafka diretamente para o corpo da mensagem do Pub/Sub.
  • Para esquemas de estrutura, o conector grava cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for { "id"=123 }, a mensagem do Pub/Sub resultante terá um atributo "id"="123". O valor do campo é sempre convertido em uma string. Os tipos de mapa e struct não são aceitos como tipos de campo em um struct.
  • Para esquemas de mapas, o conector grava cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for {"alice"=1,"bob"=2}, a mensagem resultante do Pub/Sub terá dois atributos, "alice"="1" e "bob"="2". As chaves e os valores são convertidos em strings.

Os esquemas de struct e mapa têm alguns outros comportamentos:

  • Também é possível especificar um campo de struct ou uma chave de mapa específico como o corpo da mensagem, definindo a propriedade de configuração messageBodyName. O valor do campo ou da chave é armazenado como ByteString no corpo da mensagem. Se você não definir messageBodyName, o corpo da mensagem ficará vazio para esquemas de struct e mapa.

  • Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. A sequência de valores na matriz é concatenada em um único objeto ByteString.

Conversão do Pub/Sub para o Kafka

O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte maneira:

  • Chave de gravação do Kafka: por padrão, a chave é definida como null. Opcionalmente, especifique um atributo de mensagem do Pub/Sub para usar como a chave, definindo a opção de configuração kafka.key.attribute. Nesse caso, ele procura um atributo com esse nome e define a chave de registro como o valor do atributo. Se o atributo especificado não estiver presente, a chave de registro será definida como null.

  • Valor do registro Kafka: O conector grava o valor do registro da seguinte forma:

    • Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector gravará o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for false, o conector gravará uma estrutura no valor do registro. O struct contém um campo para cada atributo e um campo chamado "message" cujo valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Nesse caso, é necessário usar um value.converter compatível com esquemas de struct, como org.apache.kafka.connect.json.JsonConverter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for true, o conector gravará os atributos como cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

  • Cabeçalhos de registro Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina kafka.record.headers como true.

Opções de configuração

Além das configurações fornecidas pela API Kafka Connect, o conector de Kafka do grupo Pub/Sub é compatível com as configurações a seguir.

Opções de configuração do conector do coletor

O conector do coletor é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para o conector do coletor do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector.
cps.endpoint String

O endpoint do Pub/Sub a ser usado.

Padrão: "pubsub.googleapis.com:443".

cps.project String Obrigatório. O Google Cloud que contém o tópico do Pub/Sub.
cps.topic String Obrigatório. O tópico do Pub/Sub em que os registros do Kafka serão publicados.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite.
headers.publish Boolean

Quando true, inclua todos os cabeçalhos de registro do Kafka como atributos de mensagens do Pub/Sub.

Padrão: false.

maxBufferBytes Long

O número máximo de bytes a serem recebidos em uma partição do Kafka de tópico antes de publicá-los no Pub/Sub.

Padrão: 10000000.

maxBufferSize Integer

O número máximo de registros a serem recebidos em uma partição de tópico do Kafka antes de publicá-los no Pub/Sub.

Padrão: 100;

maxDelayThresholdMs Integer

O tempo máximo de espera para alcançar maxBufferSize ou maxBufferBytes antes de publicar registros pendentes no Pub/Sub, em milissegundos.

Padrão: 100;

maxOutstandingMessages Long

O número máximo de registros que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie outras publicações.

Padrão: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

É o número máximo total de bytes que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie publicações.

Padrão: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

O tempo limite para solicitações de publicação individuais no Pub/Sub, em milissegundos.

Padrão: 10000.

maxTotalTimeoutMs Integer

O tempo limite total, em milissegundos, de uma chamada ser publicada no Pub/Sub, incluindo novas tentativas.

Padrão: 60.000.

metadata.publish Boolean

Quando true, inclua o tópico, a partição, o deslocamento e o carimbo de data/hora do Kafka como atributos de mensagens do Pub/Sub.

Padrão: false.

messageBodyName String

Ao usar um esquema de struct ou de valor de mapa, especifica o nome de um campo ou chave para usar como o corpo da mensagem do Pub/Sub. Consulte Conversão do Kafka para o Pub/Sub.

Padrão: "cps_message_body".

orderingKeySource String

Especifica como definir a chave de ordem na mensagem do Pub/Sub. Pode ser um dos seguintes valores:

  • none: não defina a chave de ordem.
  • key: usa a chave do registro Kafka como chave de ordem.
  • partition: usa o número de partição convertido em uma string como chave de ordem. Use essa configuração apenas para tópicos de baixa capacidade ou tópicos com milhares de partições.

Padrão: none.

topics String Obrigatório. Uma lista separada por vírgulas de tópicos do Kafka para leitura.

Opções de configuração do conector de origem

O conector de origem é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para o conector de origem do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector.
cps.endpoint String

O endpoint do Pub/Sub a ser usado.

Padrão: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Quando true, grave a chave de ordem no registro do Kafka usando o mesmo formato dos atributos de mensagens do Pub/Sub. Consulte Conversão do Pub/Sub para registros do Kafka.

Padrão: false.

cps.maxBatchSize Integer

O número máximo de mensagens a serem agrupadas por solicitação de envio para o Pub/Sub.

Padrão: 100

cps.project String Obrigatório. O projeto do Google Cloud que contém o tópico do Pub/Sub.
cps.subscription String Obrigatório. O nome da assinatura do Pub/Sub que receberá as mensagens recebidas.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite.
kafka.key.attribute String

O atributo de mensagem do Pub/Sub a ser usado como chave para mensagens publicadas no Kafka. Se definido como "orderingKey", use a chave de ordem da mensagem. Se for null, os registros do Kafka não terão uma chave.

Padrão: null.

kafka.partition.count Integer

O número de partições do Kafka para o tópico Kafka em que as mensagens são publicadas. Esse parâmetro será ignorado se o esquema de partição for "kafka_partitioner".

Padrão: 1;

kafka.partition.scheme String

O esquema para atribuir uma mensagem a uma partição no Kafka. Pode ser um dos seguintes valores:

  • round_robin: atribui partições em estilo round-robin.
  • hash_key: encontre a partição gerando o hash da chave de registro.
  • hash_value: encontre a partição gerando o hash do valor do registro.
  • kafka_partitioner: delega a lógica de particionamento ao produtor Kafka. Por padrão, o produtor Kafka detecta automaticamente o número de partições e executa o mapeamento de partição baseado em hash de murmur ou o round-robin, dependendo da disponibilidade de uma chave de registro.
  • ordering_key: usa o código hash da chave de ordem de uma mensagem. Se nenhuma chave de ordem estiver presente, use round_robin.

Padrão: round_robin.

kafka.record.headers Boolean

Se true, grave os atributos de mensagem do Pub/Sub como cabeçalhos do Kafka.

kafka.topic String Obrigatório. O tópico Kafka que recebe mensagens do Pub/Sub.

Como receber suporte

Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.

A seguir