Conectar o Pub/Sub ao Apache Kafka

Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de Kafka do grupo do Pub/Sub.

Sobre o conector de Kafka do grupo do Pub/Sub

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. É comum usadas em arquiteturas distribuídas para permitir a comunicação entre acoplados. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como no Kafka, é possível usar Pub/Sub para comunicação entre componentes em sua nuvem do Terraform.

Com o conector de Kafka do grupo Pub/Sub, é possível integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:

  • O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
  • O conector de origem lê mensagens de um tópico do Pub/Sub. e os publica no Kafka.

Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:

  • Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
  • Você tem um sistema de front-end que armazena eventos no Kafka fora Google Cloud, mas também o usa para executar alguns dos back-ends que precisam receber os eventos do Kafka.
  • Você coleta registros de uma solução Kafka local e os envia para Google Cloud para análise de dados.
  • Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando Kafka.

O conector exige Kafka Connect, que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar é necessário executar o Kafka Connect junto com o cluster do Kafka.

Para seguir este documento, você precisa conhecer o Kafka e o Pub/Sub Antes de ler este documento, é uma boa ideia conclua uma das Guias de início rápido do Pub/Sub.

O conector do Pub/Sub não dá suporte a nenhuma integração entre as ACLs do Google Cloud IAM e do Kafka Connect.

Começar a usar o conector

Esta seção mostra as seguintes tarefas:

  1. Configurar o conector de Kafka do grupo do Pub/Sub
  2. Enviar eventos do Kafka para o Pub/Sub.
  3. Envie mensagens do Pub/Sub para o Kafka.
.

Pré-requisitos

Instale o Kafka

Siga o Guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua essas etapas em guia de início rápido:

  1. Faça o download da versão mais recente do Kafka e extraia-o.
  2. Inicie o ambiente Kafka.
  3. Criar um tópico Kafka.

Autenticar

O conector de Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  6. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.
  7. Instale a CLI do Google Cloud.
  8. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  9. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  10. Crie as credenciais de autenticação para sua Conta do Google:

    gcloud auth application-default login
  11. Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • Substitua PROJECT_ID pela ID do seu projeto.
    • Substitua EMAIL_ADDRESS pelo seu endereço de e-mail.
    • Substitua ROLE por cada papel individual.

Baixar o JAR do conector

Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.

Copie os arquivos de configuração do conector

  1. Clone ou faça o download do Repositório do GitHub para o conector.

    git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
    cd java-pubsub-group-kafka-connector
    
  2. Copie o conteúdo do diretório config para o subdiretório config de da instalação do Kafka.

    cp config/* [path to Kafka installation]/config/
    

Esses arquivos contêm as configurações do conector.

Atualizar a configuração do Kafka Connect

  1. Navegue até o diretório que contém o binário do Kafka Connect que você baixado.
  2. No diretório binário do Kafka Connect, abra o arquivo chamado config/connect-standalone.properties em um editor de texto.
  3. Se a plugin.path property estiver comentada, remova a marca de comentário.
  4. Atualize o plugin.path property para incluir o caminho para o JAR do conector.

    Exemplo:

    plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
    
  5. Defina a propriedade offset.storage.file.filename como um nome de arquivo local. Em modo autônomo, o Kafka usa esse arquivo para armazenar dados de deslocamento.

    Exemplo:

    offset.storage.file.filename=/tmp/connect.offsets
    

Encaminhar eventos do Kafka para o Pub/Sub

Esta seção descreve como iniciar o conector do coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub.

  1. Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para para receber as mensagens do Kafka.
    • PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub uma assinatura para o tópico.
  2. Abra o arquivo /config/cps-sink-connector.properties em um editor de texto. Adicionar valores para as seguintes propriedades, que estão marcadas como "TODO" no comentários:

    topics=KAFKA_TOPICS
    cps.project=PROJECT_ID
    cps.topic=PUBSUB_TOPIC
    

    Substitua:

    • KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura se originou.
    • PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub que receberá o do Kafka.
  3. No diretório Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-sink-connector.properties
    
  4. Siga as etapas no Guia de início rápido do Apache Kafka para gravar alguns eventos no seu tópico Kafka.

  5. Use a CLI gcloud para ler os eventos do Pub/Sub.

    gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
    

Encaminhar mensagens do Pub/Sub para o Kafka

Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.

  1. Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.

    gcloud pubsub topics create PUBSUB_TOPIC
    gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
    

    Substitua:

    • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
    • PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub assinatura.
  2. Abra o arquivo chamado /config/cps-source-connector.properties em um texto editor. Adicione valores para as seguintes propriedades, que estão marcadas como "TODO" em os comentários:

    kafka.topic=KAFKA_TOPIC
    cps.project=PROJECT_ID
    cps.subscription=PUBSUB_SUBSCRIPTION
    

    Substitua:

    • KAFKA_TOPIC: os tópicos do Kafka que receberão o Mensagens do Pub/Sub.
    • PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
    • PUBSUB_TOPIC: o tópico do Pub/Sub.
  3. No diretório Kafka, execute o seguinte comando:

    bin/connect-standalone.sh \
      config/connect-standalone.properties \
      config/cps-source-connector.properties
    
  4. Use a CLI gcloud para publicar uma mensagem no Pub/Sub.

    gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    
  5. Leia a mensagem do Kafka. Siga as etapas no Guia de início rápido do Apache Kafka para ler as mensagens do tópico Kafka.

Conversão de mensagem

um registro Kafka; contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um O registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Um Mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.

O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades no conector arquivos de configuração:

  • key.converter: o conversor usado para serializar chaves de registro.
  • value.converter: o conversor usado para serializar valores de registro.

O corpo de uma mensagem do Pub/Sub é um objeto ByteString. Portanto, a conversão mais eficiente é copiar o payload diretamente. Por esse motivo, recomendamos o uso de um conversor que produz tipos de dados primitivos (número inteiro, flutuante, string ou esquema de bytes) sempre que possível, para evitar a desserialização e reserializando o mesmo corpo da mensagem.

Conversão do Kafka para o Pub/Sub

O conector do coletor converte os registros do Kafka em mensagens do Pub/Sub da seguinte forma:

  • A chave de registro do Kafka é armazenada como um atributo chamado "key" no Mensagem do Pub/Sub.
  • Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se Você define a opção de configuração headers.publish como true, o conector grava os cabeçalhos como atributos do Pub/Sub. O conector pula os cabeçalhos que excederem o Pub/Sub limites de atributos de mensagem.
  • Para esquemas de número inteiro, flutuante, string e bytes, o conector transmite os bytes. do valor do registro Kafka diretamente na mensagem do Pub/Sub body.
  • Para esquemas struct, o conector grava cada campo como um atributo do Mensagem do Pub/Sub. Por exemplo, se o campo for { "id"=123 }, a mensagem resultante do Pub/Sub tem um atributo "id"="123". O é sempre convertido em uma string. Os tipos Map e struct não são compatíveis como tipos de campo em um struct.
  • Para esquemas de mapas, o conector grava cada par de chave-valor como um atributo de a mensagem do Pub/Sub. Por exemplo, se o mapa for {"alice"=1,"bob"=2}, a mensagem resultante do Pub/Sub terá duas atributos "alice"="1" e "bob"="2". As chaves e os valores são convertidos em strings.

Os esquemas de struct e mapa têm alguns outros comportamentos:

  • Opcionalmente, você pode especificar um campo struct ou chave de mapa específico como corpo da mensagem definindo a propriedade de configuração messageBodyName. O do campo ou da chave é armazenado como ByteString no corpo da mensagem. Se você não definir messageBodyName, o corpo da mensagem ficará vazio para struct e esquemas de mapa.

  • Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. O sequência de valores na matriz é concatenada em uma única ByteString objeto.

Conversão do Pub/Sub para o Kafka

O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte forma:

  • Chave de gravação do Kafka: por padrão, a chave é definida como null. Opcionalmente, você pode especificar um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuração kafka.key.attribute. Nesse caso, o conector procura um atributo com esse nome e define a chave de registro como o . Se o atributo especificado não estiver presente, a chave de registro será Defina como null.

  • Valor do registro Kafka: O conector grava o valor do registro da seguinte forma:

    • Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector grava o corpo da mensagem do Pub/Sub diretamente no registro do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for false, o conector grava uma estrutura no . O struct contém um campo para cada atributo e um campo chamado "message", cujo valor é o corpo da mensagem do Pub/Sub (armazenados como bytes):

      {
        "message": "<Pub/Sub message body>",
        "<attribute-1>": "<value-1>",
        "<attribute-2>": "<value-2>",
        ....
      }
      

      Nesse caso, é necessário usar um value.converter compatível com Esquemas struct, como org.apache.kafka.connect.json.JsonConverter.

    • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for true, o conector grava os atributos como Cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente ao valor do registro Kafka como um tipo byte[], usando o conversor especificado por value.converter.

  • Cabeçalhos de registro Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina kafka.record.headers para true.

.

Opções de configuração

Além das configurações fornecidas pela API Kafka Connect, a O conector de Kafka do grupo do Pub/Sub oferece suporte às configurações a seguir.

Opções de configuração do conector do coletor

O conector do coletor é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para conector do coletor do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector:
cps.endpoint String

O endpoint do Pub/Sub a ser usado.

Padrão: "pubsub.googleapis.com:443".

cps.project String Obrigatório. O Google Cloud que contém os tópico do Pub/Sub.
cps.topic String Obrigatório. O tópico do Pub/Sub a ser publicado O Kafka faz os registros.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena as credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para para autenticar o Pub/Sub Lite.
headers.publish Boolean

Quando true, inclua quaisquer cabeçalhos de registros Kafka como Atributos de mensagem do Pub/Sub.

Padrão: false.

maxBufferBytes Long

O número máximo de bytes a serem recebidos em uma partição do Kafka de tópico antes de publicá-los no Pub/Sub.

Padrão: 10000000.

maxBufferSize Integer

O número máximo de registros a serem recebidos em uma partição de tópico do Kafka antes de publicá-los no Pub/Sub.

Padrão: 100;

maxDelayThresholdMs Integer

O tempo máximo de espera para alcançar maxBufferSize ou maxBufferBytes antes da publicação registros pendentes no Pub/Sub, em milissegundos.

Padrão: 100;

maxOutstandingMessages Long

O número máximo de registros que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie mais publicação.

Padrão: Long.MAX_VALUE.

maxOutstandingRequestBytes Long

O número máximo de bytes totais que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie mais publicação.

Padrão: Long.MAX_VALUE.

maxRequestTimeoutMs Integer

O tempo limite para solicitações de publicação individuais no Pub/Sub em milissegundos.

Padrão: 10000.

maxTotalTimeoutMs Integer

O tempo limite total, em milissegundos, de uma chamada para publicar Pub/Sub, incluindo novas tentativas.

Padrão: 60.000.

metadata.publish Boolean

Quando true, inclua o tópico Kafka, a partição, o deslocamento, e carimbo de data/hora como atributos de mensagem do Pub/Sub.

Padrão: false.

messageBodyName String

Ao usar um esquema de valor de struct ou mapa, especifica o nome de um campo ou chave para usar como o corpo da mensagem do Pub/Sub. Consulte Conversão do Kafka para o Pub/Sub.

Padrão: "cps_message_body".

orderingKeySource String

Especifica como definir a chave de ordem no Pub/Sub mensagem. Pode ser um dos seguintes valores:

  • none: não defina a chave de ordem.
  • key: usa a chave do registro Kafka como chave de ordem.
  • partition: usa o número da partição, convertido em um string, como a chave de ordenação. Use essa configuração apenas para baixa capacidade de processamento ou tópicos com milhares de partições.

Padrão: none.

topics String Obrigatório. Uma lista separada por vírgulas de tópicos Kafka para ler.

Opções de configuração do conector de origem

O conector de origem é compatível com as opções de configuração a seguir.

Configuração Tipo de dados Descrição
connector.class String Obrigatório. A classe Java do conector. Para conector de origem do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector:
cps.endpoint String

O endpoint do Pub/Sub a ser usado.

Padrão: "pubsub.googleapis.com:443".

cps.makeOrderingKeyAttribute Boolean

Quando true, grave a chave de ordem no registro do Kafka. usando o mesmo formato dos atributos de mensagem do Pub/Sub. Consulte Conversão do Pub/Sub para registros Kafka (em inglês).

Padrão: false.

cps.maxBatchSize Integer

O número máximo de mensagens a serem agrupadas por solicitação de envio para Pub/Sub

Padrão: 100

cps.project String Obrigatório. O projeto do Google Cloud que contém tópico do Pub/Sub.
cps.subscription String Obrigatório. O nome do Pub/Sub assinatura da qual receber mensagens.
gcp.credentials.file.path String Opcional. O caminho para um arquivo que armazena as credenciais do Google Cloud para autenticar o Pub/Sub Lite.
gcp.credentials.json String Opcional. Um blob JSON que contém o Google Cloud para para autenticar o Pub/Sub Lite.
kafka.key.attribute String

O atributo de mensagem do Pub/Sub que será usado como chave para de mensagens publicadas no Kafka. Se definido como "orderingKey", use o chave de ordem da mensagem. Se for null, os registros do Kafka não terão de dados.

Padrão: null.

kafka.partition.count Integer

O número de partições do Kafka para o tópico Kafka em que as mensagens forem publicados. Este parâmetro será ignorado se o esquema de partição for "kafka_partitioner":

Padrão: 1;

kafka.partition.scheme String

O esquema para atribuir uma mensagem a uma partição no Kafka. Pode ser um dos seguintes valores:

  • round_robin: atribuir partições em um round-robin de maneira
  • hash_key: encontrar a partição fazendo o hash do registro de dados.
  • hash_value: encontrar a partição fazendo o hash do registro .
  • kafka_partitioner: delega a lógica de particionamento ao é um produtor Kafka. Por padrão, o produtor Kafka detecta automaticamente de partições e executa uma tabela de partição de murmur ou round-robin, dependendo da inclusão de uma chave de registro.
  • ordering_key: usa o código hash do código de uma mensagem chave de ordem. Se nenhuma chave de ordem estiver presente, use round_robin:

Padrão: round_robin.

kafka.record.headers Boolean

Se true, grave os atributos de mensagem do Pub/Sub como cabeçalhos do Kafka.

kafka.topic String Obrigatório. O tópico Kafka que recebe mensagens Pub/Sub

Como receber suporte

Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.

A seguir