Este documento descreve como integrar o Apache Kafka e o Pub/Sub através do conetor do grupo Kafka do Pub/Sub.
Acerca do conetor Kafka do grupo Pub/Sub
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. É usado comumente em arquiteturas distribuídas para permitir a comunicação entre componentes fracamente acoplados. O Pub/Sub é um serviço gerido para enviar e receber mensagens de forma assíncrona. Tal como com o Kafka, pode usar o Pub/Sub para comunicar entre componentes na sua arquitetura de nuvem.
O conetor Kafka do grupo Pub/Sub permite-lhe integrar estes dois sistemas. Os seguintes conetores estão incluídos no JAR do conetor:
- O conetor de destino lê registos de um ou mais tópicos do Kafka e publica-os no Pub/Sub.
- O conetor de origem lê mensagens de um tópico do Pub/Sub e publica-as no Kafka.
Seguem-se alguns cenários em que pode usar o conetor Kafka do grupo Pub/Sub:
- Está a migrar uma arquitetura baseada no Kafka para o Google Cloud.
- Tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos seus serviços de back-end, que precisam de receber os eventos do Kafka.
- Recolhe registos de uma solução Kafka nas instalações e envia-os para o Google Cloud para análise de dados.
- Tem um sistema de front-end que usa o Google Cloud, mas também armazena dados nas instalações através do Kafka.
O conetor requer o Kafka Connect, que é uma framework para fazer stream de dados entre o Kafka e outros sistemas. Para usar o conetor, tem de executar o Kafka Connect juntamente com o cluster Kafka.
Este documento pressupõe que tem conhecimentos sobre o Kafka e o Pub/Sub. Antes de ler este documento, é aconselhável concluir um dos inícios rápidos do Pub/Sub.
O conetor do Pub/Sub não suporta nenhuma integração entre o IAM e as ACLs do Kafka Connect. Google Cloud
Comece a usar o conetor
Esta secção explica as seguintes tarefas:- Configure o conetor Kafka do grupo Pub/Sub.
- Envie eventos do Kafka para o Pub/Sub.
- Envie mensagens do Pub/Sub para o Kafka.
Pré-requisitos
Instale o Kafka
Siga o início rápido do Apache Kafka para instalar um Kafka de nó único na sua máquina local. Conclua estes passos no início rápido:
- Transfira a versão mais recente do Kafka e extraia-a.
- Inicie o ambiente Kafka.
- Crie um tópico do Kafka.
Autenticar
O conetor Kafka do grupo Pub/Sub tem de ser autenticado com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estes passos:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Se estiver a usar um fornecedor de identidade (IdP) externo, tem primeiro de iniciar sessão na CLI gcloud com a sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Create or select a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator
(
roles/resourcemanager.projectCreator
), which contains theresourcemanager.projects.create
permission. Learn how to grant roles.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Clone ou transfira o repositório do GitHub para o conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copie o conteúdo do diretório
config
para o subdiretórioconfig
da sua instalação do Kafka.cp config/* [path to Kafka installation]/config/
- Navegue para o diretório que contém o ficheiro binário do Kafka Connect que transferiu.
- No diretório binário do Kafka Connect, abra o ficheiro denominado
config/connect-standalone.properties
num editor de texto. - Se a linha
plugin.path property
estiver comentada, remova o comentário. Atualize o
plugin.path property
para incluir o caminho para o JAR do conetor.Exemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Defina a propriedade
offset.storage.file.filename
para um nome de ficheiro local. No modo autónomo, o Kafka usa este ficheiro para armazenar dados de desvio.Exemplo:
offset.storage.file.filename=/tmp/connect.offsets
Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma subscrição.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua o seguinte:
- PUBSUB_TOPIC: O nome de um tópico Pub/Sub para receber as mensagens do Kafka.
- PUBSUB_SUBSCRIPTION: o nome de uma subscrição do Pub/Sub para o tópico.
Abra o ficheiro
/config/cps-sink-connector.properties
num editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas com"TODO"
nos comentários:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Substitua o seguinte:
- KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka a partir dos quais ler.
- PROJECT_ID: O projeto Google Cloud que contém o seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
No diretório do Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Siga os passos no Início rápido do Apache Kafka para escrever alguns eventos no seu tópico do Kafka.
Use a CLI gcloud para ler os eventos do Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Use a CLI gcloud para criar um tópico Pub/Sub com uma subscrição.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua o seguinte:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
- PUBSUB_SUBSCRIPTION: O nome de uma subscrição do Pub/Sub.
Abra o ficheiro denominado
/config/cps-source-connector.properties
num editor de texto. Adicione valores para as seguintes propriedades, que estão marcadas com"TODO"
nos comentários:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Substitua o seguinte:
- KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
- PROJECT_ID: O projeto Google Cloud que contém o seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub.
No diretório do Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use a CLI gcloud para publicar uma mensagem no Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Lê a mensagem de Kafka. Siga os passos no guia de início rápido do Apache Kafka para ler as mensagens do tópico do Kafka.
key.converter
: O conversor usado para serializar chaves de registo.value.converter
: o conversor usado para serializar valores de registos.- A chave do registo do Kafka é armazenada como um atributo denominado
"key"
na mensagem do Pub/Sub. - Por predefinição, o conetor elimina todos os cabeçalhos no registo do Kafka. No entanto, se definir a opção de configuração
headers.publish
comotrue
, o conetor escreve os cabeçalhos como atributos do Pub/Sub. O conetor ignora todos os cabeçalhos que excedam os limites de atributos de mensagens do Pub/Sub. - Para esquemas de números inteiros, de vírgula flutuante, de strings e de bytes, o conetor transmite os bytes do valor do registo do Kafka diretamente para o corpo da mensagem do Pub/Sub.
- Para esquemas de struct, o conetor escreve cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for
{ "id"=123 }
, a mensagem do Pub/Sub resultante tem um atributo"id"="123"
. O valor do campo é sempre convertido numa string. Os tipos de mapa e struct não são suportados como tipos de campos numa struct. - Para esquemas de mapas, o conector escreve cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for {"foo":"bar"}, a mensagem Pub/Sub resultante tem dois atributos, foo e bar.
{"alice"=1,"bob"=2}
"alice"="1"
"bob"="2"
As chaves e os valores são convertidos em strings. Opcionalmente, pode especificar um campo struct ou uma chave de mapa específica para ser o corpo da mensagem, definindo a propriedade de configuração
messageBodyName
. O valor do campo ou da chave é armazenado como umByteString
no corpo da mensagem. Se não definirmessageBodyName
, o corpo da mensagem fica vazio para esquemas de estrutura e de mapa.Para valores de matriz, o conetor só suporta tipos de matriz primitivos. A sequência de valores na matriz é concatenada num único objeto
ByteString
.Chave de registo do Kafka: por predefinição, a chave está definida como
null
. Opcionalmente, pode especificar um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuraçãokafka.key.attribute
. Nesse caso, o conector procura um atributo com esse nome e define a chave do registo para o valor do atributo. Se o atributo especificado não estiver presente, a chave de registo é definida comonull
.Valor do registo do Kafka. O conetor escreve o valor do registo da seguinte forma:
Se a mensagem do Pub/Sub não tiver atributos personalizados, o conetor escreve o corpo da mensagem do Pub/Sub diretamente no valor do registo do Kafka como um tipo
byte[]
, usando o conversor especificado porvalue.converter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
forfalse
, o conector escreve uma struct no valor do registo. A struct contém um campo para cada atributo e um campo denominado"message"
cujo valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Neste caso, tem de usar um
value.converter
compatível com esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
fortrue
, o conetor escreve os atributos como cabeçalhos de registos do Kafka. Escreve o corpo da mensagem do Pub/Sub diretamente no valor do registo do Kafka como um tipobyte[]
, usando o conversor especificado porvalue.converter
.
Cabeçalhos de registos do Kafka. Por predefinição, os cabeçalhos estão vazios, a menos que defina
kafka.record.headers
comotrue
.- Compreenda as diferenças entre o Kafka e o Pub/Sub.
- Saiba mais sobre o conetor do Kafka do grupo Pub/Sub.
- Consulte o repositório do GitHub do conetor Kafka do grupo Pub/Sub.
Transfira o JAR do conetor
Transfira o ficheiro JAR do conetor para a sua máquina local. Para mais informações, consulte a secção Adquira o conector no ficheiro Readme do GitHub.
Copie os ficheiros de configuração do conetor
Estes ficheiros contêm definições de configuração para o conetor.
Atualize a configuração do Kafka Connect
Encaminhe eventos do Kafka para o Pub/Sub
Esta secção descreve como iniciar o conetor de destino, publicar eventos no Kafka e, em seguida, ler as mensagens encaminhadas do Pub/Sub.
Encaminhe mensagens do Pub/Sub para o Kafka
Esta secção descreve como iniciar o conetor de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.
Conversão de mensagens
Um registo do Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registo do Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave/valor.
O Kafka Connect usa conversores para serializar chaves e valores para e a partir do Kafka. Para controlar a serialização, defina as seguintes propriedades nos ficheiros de configuração do conector:
O corpo de uma mensagem Pub/Sub é um objeto ByteString
, pelo que a conversão mais eficiente é copiar diretamente a carga útil. Por esse motivo, recomendamos a utilização de um conversor que produza tipos de dados primitivos (esquema de números inteiros, de vírgula flutuante, de strings ou de bytes) sempre que possível, para evitar a desserialização e a reserialização do mesmo corpo da mensagem.
Conversão de Kafka para Pub/Sub
O conetor de destino converte os registos do Kafka em mensagens do Pub/Sub da seguinte forma:
Os esquemas de estruturas e mapas têm alguns comportamentos adicionais:
Conversão do Pub/Sub para o Kafka
O conetor de origem converte as mensagens do Pub/Sub em registos do Kafka da seguinte forma:
Opções de configuração
Além das configurações fornecidas pela API Kafka Connect, o conetor Kafka do grupo Pub/Sub suporta a configuração de origem e destino, conforme descrito em Configurações do conetor Pub/Sub.
Receber apoio técnico
Se precisar de ajuda, crie um pedido de apoio técnico. Para perguntas e debates gerais, crie um problema no repositório do GitHub.