Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de Kafka do grupo do Pub/Sub.
Sobre o conector de Kafka do grupo do Pub/Sub
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. É comum usadas em arquiteturas distribuídas para permitir a comunicação entre acoplados. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como no Kafka, é possível usar Pub/Sub para comunicação entre componentes em sua nuvem do Terraform.
Com o conector de Kafka do grupo Pub/Sub, é possível integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:
- O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
- O conector de origem lê mensagens de um tópico do Pub/Sub. e os publica no Kafka.
Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:
- Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
- Você tem um sistema de front-end que armazena eventos no Kafka fora Google Cloud, mas também o usa para executar alguns dos back-ends que precisam receber os eventos do Kafka.
- Você coleta registros de uma solução Kafka local e os envia para Google Cloud para análise de dados.
- Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando Kafka.
O conector exige Kafka Connect, que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar é necessário executar o Kafka Connect junto com o cluster do Kafka.
Para seguir este documento, é necessário ter familiaridade com o Kafka Pub/Sub Antes de ler este documento, é uma boa ideia conclua uma das Guias de início rápido do Pub/Sub.
O conector do Pub/Sub não dá suporte a nenhuma integração entre as ACLs do Google Cloud IAM e do Kafka Connect.
Começar a usar o conector
Esta seção mostra as seguintes tarefas:- Configurar o conector de Kafka do grupo do Pub/Sub
- Enviar eventos do Kafka para o Pub/Sub.
- Envie mensagens do Pub/Sub para o Kafka.
Pré-requisitos
Instale o Kafka
Siga o Guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua essas etapas em guia de início rápido:
- Faça o download da versão mais recente do Kafka e extraia o arquivo.
- Inicie o ambiente Kafka.
- Criar um tópico Kafka.
Autenticar
O conector de Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Baixe o JAR do conector
Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.
Copie os arquivos de configuração do conector
Clone ou faça o download do Repositório do GitHub para o conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copie o conteúdo do diretório
config
para o subdiretórioconfig
de da instalação do Kafka.cp config/* [path to Kafka installation]/config/
Esses arquivos contêm as configurações do conector.
Atualizar a configuração do Kafka Connect
- Navegue até o diretório que contém o binário do Kafka Connect que você baixado.
- No diretório binário do Kafka Connect, abra o arquivo chamado
config/connect-standalone.properties
em um editor de texto. - Se a
plugin.path property
estiver comentada, remova a marca de comentário. Atualize o
plugin.path property
para incluir o caminho para o JAR do conector.Exemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Defina a propriedade
offset.storage.file.filename
como um nome de arquivo local. Em modo autônomo, o Kafka usa esse arquivo para armazenar dados de deslocamento.Exemplo:
offset.storage.file.filename=/tmp/connect.offsets
Encaminhar eventos do Kafka para o Pub/Sub
Esta seção descreve como iniciar o conector do coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub.
Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para para receber as mensagens do Kafka.
- PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub uma assinatura para o tópico.
Abra o arquivo
/config/cps-sink-connector.properties
em um editor de texto. Adicionar valores para as seguintes propriedades, que estão marcadas como"TODO"
no comentários:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Substitua:
- KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura se originou.
- PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub que receberá o do Kafka.
No diretório Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Siga as etapas no Guia de início rápido do Apache Kafka para gravar alguns eventos no seu tópico Kafka.
Use a CLI gcloud para ler os eventos do Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Encaminhar mensagens do Pub/Sub para o Kafka
Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.
Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
- PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub assinatura.
Abra o arquivo chamado
/config/cps-source-connector.properties
em um texto editor. Adicione valores para as seguintes propriedades, que estão marcadas como"TODO"
em os comentários:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Substitua:
- KAFKA_TOPIC: os tópicos do Kafka que receberão o Mensagens do Pub/Sub.
- PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub.
No diretório Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use a CLI gcloud para publicar uma mensagem no Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leia a mensagem do Kafka. Siga as etapas no Guia de início rápido do Apache Kafka para ler as mensagens do tópico Kafka.
Conversão de mensagem
um registro Kafka; contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um O registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Um Mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.
O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades no conector arquivos de configuração:
key.converter
: o conversor usado para serializar chaves de registro.value.converter
: o conversor usado para serializar valores de registro.
O corpo de uma mensagem do Pub/Sub é um objeto ByteString
. Portanto,
a conversão mais eficiente é copiar o payload diretamente. Por esse motivo,
recomendamos o uso de um conversor que produz tipos de dados primitivos (número inteiro, flutuante,
string ou esquema de bytes) sempre que possível, para evitar a desserialização e
reserializando o mesmo corpo da mensagem.
Conversão do Kafka para o Pub/Sub
O conector do coletor converte os registros do Kafka em mensagens do Pub/Sub da seguinte forma:
- A chave de registro do Kafka é armazenada como um atributo chamado
"key"
no Mensagem do Pub/Sub. - Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se
Você define a opção de configuração
headers.publish
comotrue
, o conector grava os cabeçalhos como atributos do Pub/Sub. O conector pula os cabeçalhos que excederem o Pub/Sub limites de atributos de mensagem. - Para esquemas de número inteiro, flutuante, string e bytes, o conector transmite os bytes. do valor do registro Kafka diretamente na mensagem do Pub/Sub corpo
- Para esquemas struct, o conector grava cada campo como um atributo do
Mensagem do Pub/Sub. Por exemplo, se o campo for
{ "id"=123 }
, a mensagem resultante do Pub/Sub tem um atributo"id"="123"
. O valor do campo é sempre convertido em uma string. Os tipos Map e struct não são compatíveis como tipos de campo em um struct. - Para esquemas de mapas, o conector grava cada par de chave-valor como um atributo de
a mensagem do Pub/Sub. Por exemplo, se o mapa for
{"alice"=1,"bob"=2}
, a mensagem resultante do Pub/Sub terá duas atributos"alice"="1"
e"bob"="2"
. As chaves e os valores são convertidos em strings.
Os esquemas de struct e mapa têm alguns outros comportamentos:
Opcionalmente, você pode especificar um campo struct ou chave de mapa específico como corpo da mensagem definindo a propriedade de configuração
messageBodyName
. O do campo ou da chave é armazenado comoByteString
no corpo da mensagem. Se você não definirmessageBodyName
, o corpo da mensagem ficará vazio para struct e esquemas de mapa.Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. O sequência de valores na matriz é concatenada em uma única
ByteString
objeto.
Conversão do Pub/Sub para o Kafka
O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte forma:
Chave de gravação do Kafka: por padrão, a chave é definida como
null
. Opcionalmente, você pode especificar um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuraçãokafka.key.attribute
. Nesse caso, o conector procura um atributo com esse nome e define a chave de registro como o . Se o atributo especificado não estiver presente, a chave de registro será Defina comonull
.Valor do registro Kafka: O conector grava o valor do registro da seguinte forma:
Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector grava o corpo da mensagem do Pub/Sub diretamente no registro do Kafka como um tipo
byte[]
, usando o conversor especificado porvalue.converter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
forfalse
, o conector grava uma estrutura no . O struct contém um campo para cada atributo e um campo chamado"message"
, cujo valor é o corpo da mensagem do Pub/Sub (armazenados como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Nesse caso, é necessário usar um
value.converter
compatível com Esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
fortrue
, o conector grava os atributos como Cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente ao valor do registro Kafka como um tipobyte[]
, usando o conversor especificado porvalue.converter
.
Cabeçalhos de registro Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina
kafka.record.headers
paratrue
.
Opções de configuração
Além das configurações fornecidas pela API Kafka Connect, a O conector de Kafka do grupo Pub/Sub oferece suporte à configuração do coletor e da origem conforme descritos em Configurações do conector do Pub/Sub.
Como receber suporte
Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.
A seguir
- Entenda as diferenças entre o Kafka e o Pub/Sub.
- Saiba mais sobre o conector de Kafka do grupo do Pub/Sub.
- Consulte o conector do Kafka do grupo do Pub/Sub Repositório do GitHub.