Ligue o Pub/Sub ao Apache Kafka

Este documento descreve como integrar o Apache Kafka e o Pub/Sub através do conetor do grupo Kafka do Pub/Sub.

Acerca do conetor Kafka do grupo Pub/Sub

O Apache Kafka é uma plataforma de código aberto para eventos de streaming. É usado comumente em arquiteturas distribuídas para permitir a comunicação entre componentes fracamente acoplados. O Pub/Sub é um serviço gerido para enviar e receber mensagens de forma assíncrona. Tal como com o Kafka, pode usar o Pub/Sub para comunicar entre componentes na sua arquitetura de nuvem.

O conetor Kafka do grupo Pub/Sub permite-lhe integrar estes dois sistemas. Os seguintes conetores estão incluídos no JAR do conetor:

  • O conetor de destino lê registos de um ou mais tópicos do Kafka e publica-os no Pub/Sub.
  • O conetor de origem lê mensagens de um tópico do Pub/Sub e publica-as no Kafka.

Seguem-se alguns cenários em que pode usar o conetor Kafka do grupo Pub/Sub:

  • Está a migrar uma arquitetura baseada no Kafka para o Google Cloud.
  • Tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos seus serviços de back-end, que precisam de receber os eventos do Kafka.
  • Recolhe registos de uma solução Kafka nas instalações e envia-os para o Google Cloud para análise de dados.
  • Tem um sistema de front-end que usa o Google Cloud, mas também armazena dados nas instalações através do Kafka.

O conetor requer o Kafka Connect, que é uma framework para fazer stream de dados entre o Kafka e outros sistemas. Para usar o conetor, tem de executar o Kafka Connect juntamente com o cluster Kafka.

Este documento pressupõe que tem conhecimentos sobre o Kafka e o Pub/Sub. Antes de ler este documento, é aconselhável concluir um dos inícios rápidos do Pub/Sub.

O conetor do Pub/Sub não suporta nenhuma integração entre o IAM e as ACLs do Kafka Connect. Google Cloud

Comece a usar o conetor

Esta secção explica as seguintes tarefas:

  1. Configure o conetor Kafka do grupo Pub/Sub.
  2. Envie eventos do Kafka para o Pub/Sub.
  3. Envie mensagens do Pub/Sub para o Kafka.

Pré-requisitos

Instale o Kafka

Siga o início rápido do Apache Kafka para instalar um Kafka de nó único na sua máquina local. Conclua estes passos no início rápido:

  1. Transfira a versão mais recente do Kafka e extraia-a.
  2. Inicie o ambiente Kafka.
  3. Crie um tópico do Kafka.

Autenticar

O conetor Kafka do grupo Pub/Sub tem de ser autenticado com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estes passos:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  4. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  5. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  8. Install the Google Cloud CLI.

  9. Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.

  10. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  11. Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  14. Transfira o JAR do conetor

    Transfira o ficheiro JAR do conetor para a sua máquina local. Para mais informações, consulte a secção Adquira o conector no ficheiro Readme do GitHub.

    Copie os ficheiros de configuração do conetor

    1. Clone ou transfira o repositório do GitHub para o conector.

      git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
      cd java-pubsub-group-kafka-connector
      
    2. Copie o conteúdo do diretório config para o subdiretório config da sua instalação do Kafka.

      cp config/* [path to Kafka installation]/config/
      

    Estes ficheiros contêm definições de configuração para o conetor.

    Atualize a configuração do Kafka Connect

    1. Navegue para o diretório que contém o ficheiro binário do Kafka Connect que transferiu.
    2. No diretório binário do Kafka Connect, abra o ficheiro denominado config/connect-standalone.properties num editor de texto.
    3. Se a linha plugin.path property estiver comentada, remova o comentário.
    4. Atualize o plugin.path property para incluir o caminho para o JAR do conetor.

      Exemplo:

      plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
      
    5. Defina a propriedade offset.storage.file.filename para um nome de ficheiro local. No modo autónomo, o Kafka usa este ficheiro para armazenar dados de desvio.

      Exemplo:

      offset.storage.file.filename=/tmp/connect.offsets
      

    Encaminhe eventos do Kafka para o Pub/Sub

    Esta secção descreve como iniciar o conetor de destino, publicar eventos no Kafka e, em seguida, ler as mensagens encaminhadas do Pub/Sub.

    1. Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma subscrição.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Substitua o seguinte:

      • PUBSUB_TOPIC: O nome de um tópico Pub/Sub para receber as mensagens do Kafka.
      • PUBSUB_SUBSCRIPTION: o nome de uma subscrição do Pub/Sub para o tópico.
    2. Abra o ficheiro /config/cps-sink-connector.properties num editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas com "TODO" nos comentários:

      topics=KAFKA_TOPICS
      cps.project=PROJECT_ID
      cps.topic=PUBSUB_TOPIC

      Substitua o seguinte:

      • KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka a partir dos quais ler.
      • PROJECT_ID: O projeto Google Cloud que contém o seu tópico do Pub/Sub.
      • PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
    3. No diretório do Kafka, execute o seguinte comando:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-sink-connector.properties
      
    4. Siga os passos no Início rápido do Apache Kafka para escrever alguns eventos no seu tópico do Kafka.

    5. Use a CLI gcloud para ler os eventos do Pub/Sub.

      gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

    Encaminhe mensagens do Pub/Sub para o Kafka

    Esta secção descreve como iniciar o conetor de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.

    1. Use a CLI gcloud para criar um tópico Pub/Sub com uma subscrição.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Substitua o seguinte:

      • PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
      • PUBSUB_SUBSCRIPTION: O nome de uma subscrição do Pub/Sub.
    2. Abra o ficheiro denominado /config/cps-source-connector.properties num editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas com "TODO" nos comentários:

      kafka.topic=KAFKA_TOPIC
      cps.project=PROJECT_ID
      cps.subscription=PUBSUB_SUBSCRIPTION

      Substitua o seguinte:

      • KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
      • PROJECT_ID: O projeto Google Cloud que contém o seu tópico do Pub/Sub.
      • PUBSUB_TOPIC: o tópico do Pub/Sub.
    3. No diretório do Kafka, execute o seguinte comando:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-source-connector.properties
      
    4. Use a CLI gcloud para publicar uma mensagem no Pub/Sub.

      gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    5. Lê a mensagem de Kafka. Siga os passos no guia de início rápido do Apache Kafka para ler as mensagens do tópico do Kafka.

    Conversão de mensagens

    Um registo do Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registo do Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave/valor.

    O Kafka Connect usa conversores para serializar chaves e valores para e a partir do Kafka. Para controlar a serialização, defina as seguintes propriedades nos ficheiros de configuração do conector:

    • key.converter: O conversor usado para serializar chaves de registo.
    • value.converter: o conversor usado para serializar valores de registos.

    O corpo de uma mensagem Pub/Sub é um objeto ByteString, pelo que a conversão mais eficiente é copiar diretamente a carga útil. Por esse motivo, recomendamos a utilização de um conversor que produza tipos de dados primitivos (esquema de números inteiros, de vírgula flutuante, de strings ou de bytes) sempre que possível, para evitar a desserialização e a reserialização do mesmo corpo da mensagem.

    Conversão de Kafka para Pub/Sub

    O conetor de destino converte os registos do Kafka em mensagens do Pub/Sub da seguinte forma:

    • A chave do registo do Kafka é armazenada como um atributo denominado "key" na mensagem do Pub/Sub.
    • Por predefinição, o conetor elimina todos os cabeçalhos no registo do Kafka. No entanto, se definir a opção de configuração headers.publish como true, o conetor escreve os cabeçalhos como atributos do Pub/Sub. O conetor ignora todos os cabeçalhos que excedam os limites de atributos de mensagens do Pub/Sub.
    • Para esquemas de números inteiros, de vírgula flutuante, de strings e de bytes, o conetor transmite os bytes do valor do registo do Kafka diretamente para o corpo da mensagem do Pub/Sub.
    • Para esquemas de struct, o conetor escreve cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for { "id"=123 }, a mensagem do Pub/Sub resultante tem um atributo "id"="123". O valor do campo é sempre convertido numa string. Os tipos de mapa e struct não são suportados como tipos de campos numa struct.
    • Para esquemas de mapas, o conector escreve cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for {"foo":"bar"}, a mensagem Pub/Sub resultante tem dois atributos, foo e bar.{"alice"=1,"bob"=2}"alice"="1""bob"="2" As chaves e os valores são convertidos em strings.

    Os esquemas de estruturas e mapas têm alguns comportamentos adicionais:

    • Opcionalmente, pode especificar um campo struct ou uma chave de mapa específica para ser o corpo da mensagem, definindo a propriedade de configuração messageBodyName. O valor do campo ou da chave é armazenado como um ByteString no corpo da mensagem. Se não definir messageBodyName, o corpo da mensagem fica vazio para esquemas de estrutura e de mapa.

    • Para valores de matriz, o conetor só suporta tipos de matriz primitivos. A sequência de valores na matriz é concatenada num único objeto ByteString.

    Conversão do Pub/Sub para o Kafka

    O conetor de origem converte as mensagens do Pub/Sub em registos do Kafka da seguinte forma:

    • Chave de registo do Kafka: por predefinição, a chave está definida como null. Opcionalmente, pode especificar um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuração kafka.key.attribute. Nesse caso, o conector procura um atributo com esse nome e define a chave do registo para o valor do atributo. Se o atributo especificado não estiver presente, a chave de registo é definida como null.

    • Valor do registo do Kafka. O conetor escreve o valor do registo da seguinte forma:

      • Se a mensagem do Pub/Sub não tiver atributos personalizados, o conetor escreve o corpo da mensagem do Pub/Sub diretamente no valor do registo do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

      • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for false, o conector escreve uma struct no valor do registo. A struct contém um campo para cada atributo e um campo denominado "message" cujo valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):

        {
          "message": "<Pub/Sub message body>",
          "<attribute-1>": "<value-1>",
          "<attribute-2>": "<value-2>",
          ....
        }
        

        Neste caso, tem de usar um value.converter compatível com esquemas struct, como org.apache.kafka.connect.json.JsonConverter.

      • Se a mensagem do Pub/Sub tiver atributos personalizados e kafka.record.headers for true, o conetor escreve os atributos como cabeçalhos de registos do Kafka. Escreve o corpo da mensagem do Pub/Sub diretamente no valor do registo do Kafka como um tipo byte[], usando o conversor especificado por value.converter.

    • Cabeçalhos de registos do Kafka. Por predefinição, os cabeçalhos estão vazios, a menos que defina kafka.record.headers como true.

    Opções de configuração

    Além das configurações fornecidas pela API Kafka Connect, o conetor Kafka do grupo Pub/Sub suporta a configuração de origem e destino, conforme descrito em Configurações do conetor Pub/Sub.

    Receber apoio técnico

    Se precisar de ajuda, crie um pedido de apoio técnico. Para perguntas e debates gerais, crie um problema no repositório do GitHub.

    O que se segue?