Conectar o Pub/Sub ao Apache Kafka

Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de Kafka do grupo do Pub/Sub.

Sobre o conector de Kafka do grupo do Pub/Sub

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. É comum usadas em arquiteturas distribuídas para permitir a comunicação entre acoplados. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como no Kafka, é possível usar Pub/Sub para comunicação entre componentes em sua nuvem do Terraform.

Com o conector de Kafka do grupo Pub/Sub, é possível integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:

  • O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
  • O conector de origem lê mensagens de um tópico do Pub/Sub. e os publica no Kafka.

Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:

  • Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
  • Você tem um sistema de front-end que armazena eventos no Kafka fora Google Cloud, mas também o usa para executar alguns dos back-ends que precisam receber os eventos do Kafka.
  • Você coleta registros de uma solução Kafka local e os envia para Google Cloud para análise de dados.
  • Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando Kafka.

O conector exige Kafka Connect, que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar é necessário executar o Kafka Connect junto com o cluster do Kafka.

Para seguir este documento, é necessário ter familiaridade com o Kafka Pub/Sub Antes de ler este documento, é uma boa ideia conclua uma das Guias de início rápido do Pub/Sub.

O conector do Pub/Sub não dá suporte a nenhuma integração entre as ACLs do Google Cloud IAM e do Kafka Connect.

Começar a usar o conector

Esta seção mostra as seguintes tarefas:

  1. Configurar o conector de Kafka do grupo do Pub/Sub
  2. Enviar eventos do Kafka para o Pub/Sub.
  3. Envie mensagens do Pub/Sub para o Kafka.

Pré-requisitos

Instale o Kafka

Siga o Guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua essas etapas em guia de início rápido:

  1. Faça o download da versão mais recente do Kafka e extraia o arquivo.
  2. Inicie o ambiente Kafka.
  3. Criar um tópico Kafka.

Autenticar

O conector de Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Create local authentication credentials for your user account:

    gcloud auth application-default login
  6. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Create local authentication credentials for your user account:

    gcloud auth application-default login
  11. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Baixe o JAR do conector

Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.

Copie os arquivos de configuração do conector

  1. Clone ou faça o download do Repositório do GitHub para o conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copie o conteúdo do diretório config para o subdiretório config de da instalação do Kafka.

    cp config/* [path to Kafka installation]/config/
    

Esses arquivos contêm as configurações do conector.

Atualizar a configuração do Kafka Connect

  1. Navegue até o diretório que contém o binário do Kafka Connect que você baixado.
  2. No diretório binário do Kafka Connect, abra o arquivo chamado config/connect-standalone.properties em um editor de texto.
  3. Se a plugin.path property estiver comentada, remova a marca de comentário.
  4. Atualize o plugin.path property para incluir o caminho para o JAR do conector.

    Exemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Defina a propriedade offset.storage.file.filename como um nome de arquivo local. Em modo autônomo, o Kafka usa esse arquivo para armazenar dados de deslocamento.

    Exemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Encaminhar eventos do Kafka para o Pub/Sub

Esta seção descreve como iniciar o conector do coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub.

  1. Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para para receber as mensagens do Kafka.
    • PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub uma assinatura para o tópico.
  2. Abra o arquivo /config/cps-sink-connector.properties em um editor de texto. Adicionar valores para as seguintes propriedades, que estão marcadas como "TODO" no comentários:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Substitua:

    • KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura se originou.
    • PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub que receberá o do Kafka.
  3. No diretório Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Siga as etapas no Guia de início rápido do Apache Kafka para gravar alguns eventos no seu tópico Kafka.

  5. Use a CLI gcloud para ler os eventos do Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Encaminhar mensagens do Pub/Sub para o Kafka

Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.

  1. Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
    • PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub assinatura.
  2. Abra o arquivo chamado /config/cps-source-connector.properties em um texto editor. Adicione valores para as seguintes propriedades, que estão marcadas como "TODO" em os comentários:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Substitua:

    • KAFKA_TOPIC: os tópicos do Kafka que receberão o Mensagens do Pub/Sub.
    • PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub.
  3. No diretório Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Use a CLI gcloud para publicar uma mensagem no Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leia a mensagem do Kafka. Siga as etapas no Guia de início rápido do Apache Kafka para ler as mensagens do tópico Kafka.

Conversão de mensagem

um registro Kafka; contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um O registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Um Mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.

O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades no conector arquivos de configuração:

  • key.converter: o conversor usado para serializar chaves de registro.
  • value.converter: o conversor usado para serializar valores de registro.

O corpo de uma mensagem do Pub/Sub é um objeto ByteString. Portanto, a conversão mais eficiente é copiar o payload diretamente. Por esse motivo, recomendamos o uso de um conversor que produz tipos de dados primitivos (número inteiro, flutuante, string ou esquema de bytes) sempre que possível, para evitar a desserialização e reserializando o mesmo corpo da mensagem.

Conversão do Kafka para o Pub/Sub

O conector do coletor converte os registros do Kafka em mensagens do Pub/Sub da seguinte forma:

  • A chave de registro do Kafka é armazenada como um atributo chamado "key" no Mensagem do Pub/Sub.
  • Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se Você define a opção de configuração headers.publish como true, o conector grava os cabeçalhos como atributos do Pub/Sub. O conector pula os cabeçalhos que excederem o Pub/Sub limites de atributos de mensagem.
  • Para esquemas de número inteiro, flutuante, string e bytes, o conector transmite os bytes. do valor do registro Kafka diretamente na mensagem do Pub/Sub corpo
  • Para esquemas struct, o conector grava cada campo como um atributo do Mensagem do Pub/Sub. Por exemplo, se o campo for { "id"=123 }, a mensagem resultante do Pub/Sub tem um atributo "id"="123". O valor do campo é sempre convertido em uma string. Os tipos Map e struct não são compatíveis como tipos de campo em um struct.
  • Para esquemas de mapas, o conector grava cada par de chave-valor como um atributo de a mensagem do Pub/Sub. Por exemplo, se o mapa for {"alice"=1,"bob"=2}, a mensagem resultante do Pub/Sub terá duas atributos "alice"="1" e "bob"="2". As chaves e os valores são convertidos em strings.

Os esquemas de struct e mapa têm alguns outros comportamentos:

  • Opcionalmente, você pode especificar um campo struct ou chave de mapa específico como corpo da mensagem definindo a propriedade de configuração messageBodyName. O do campo ou da chave é armazenado como ByteString no corpo da mensagem. Se você não definir messageBodyName, o corpo da mensagem ficará vazio para struct e esquemas de mapa.

  • Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. O sequência de valores na matriz é concatenada em uma única ByteString objeto.

Conversão do Pub/Sub para o Kafka

O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte forma:

  • Chave de gravação do Kafka: por padrão, a chave é definida como null. Opcionalmente, você pode especificar um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuração kafka.key.attribute. Nesse caso, o conector procura um atributo com esse nome e define a chave de registro como o . Se o atributo especificado não estiver presente, a chave de registro será Defina como null.

  • Valor do registro Kafka: O conector grava o valor do registro da seguinte forma:

    • Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector grava o corpo da mensagem do Pub/Sub diretamente no registro do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for false, o conector grava uma estrutura no . O struct contém um campo para cada atributo e um campo chamado "message", cujo valor é o corpo da mensagem do Pub/Sub (armazenados como bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Nesse caso, é necessário usar um value.converter compatível com Esquemas struct, como org.apache.kafka.connect.json.JsonConverter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for true, o conector grava os atributos como Cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente ao valor do registro Kafka como um tipo byte[], usando o conversor especificado por value.converter.

  • Cabeçalhos de registro Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina kafka.record.headers para true.

Opções de configuração

Além das configurações fornecidas pela API Kafka Connect, a O conector de Kafka do grupo Pub/Sub oferece suporte à configuração do coletor e da origem conforme descritos em Configurações do conector do Pub/Sub.

Como receber suporte

Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.

A seguir