Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de grupo do Pub/Sub Kafka.
Sobre o conector de grupo do Pub/Sub do Kafka
O Apache Kafka é uma plataforma de código aberto para streaming de eventos. Geralmente é usado em arquiteturas distribuídas para permitir a comunicação entre componentes acoplados com flexibilidade. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como no Kafka, é possível usar o Pub/Sub para comunicação entre componentes na sua arquitetura de nuvem.
O conector de grupo do Pub/Sub do Kafka permite integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:
- O conector do coletor lê os registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
- O conector de origem lê mensagens de um tópico do Pub/Sub e as publica no Kafka.
Veja alguns cenários em que você pode usar o conector de grupo do Pub/Sub do Kafka:
- Você está migrando para uma arquitetura baseada em Kafka no Google Cloud.
- Você tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos serviços de back-end, que precisam receber os eventos do Kafka.
- Você coleta registros de uma solução Kafka local e os envia ao Google Cloud para análise de dados.
- Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando o Kafka.
O conector requer o Kafka Connect (em inglês), que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar o conector, execute o Kafka Connect com seu cluster do Kafka.
Neste documento, presumimos que você conhece o Kafka e o Pub/Sub. Antes de ler este documento, conclua um dos guias de início rápido do Pub/Sub.
Começar a usar o conector
Esta seção mostra as seguintes tarefas:- Configurar o conector de Kafka do grupo do Pub/Sub.
- Enviar eventos do Kafka para o Pub/Sub.
- Enviar mensagens do Pub/Sub para o Kafka.
Pré-requisitos
Instalar Kafka
Siga o guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua estas etapas no guia de início rápido:
- Faça o download da versão mais recente do Kafka e extraia o arquivo.
- Inicie o ambiente do Kafka.
- Criar um tópico do Kafka
Autenticar
O conector de grupo do Pub/Sub do Kafka precisa se autenticar com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:
- Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Criar ou selecionar um projeto do Google Cloud.
-
Criar um projeto do Cloud:
gcloud projects create PROJECT_ID
-
Selecionar o projeto do Cloud que você criou:
gcloud config set project PROJECT_ID
-
-
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Criar ou selecionar um projeto do Google Cloud.
-
Criar um projeto do Cloud:
gcloud projects create PROJECT_ID
-
Selecionar o projeto do Cloud que você criou:
gcloud config set project PROJECT_ID
-
-
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
Fazer o download do JAR do conector
Faça o download do arquivo JAR do conector para a máquina local. Para mais informações, consulte Adquirir o conector no readme do GitHub.
Copiar os arquivos de configuração do conector
Clone ou faça o download do repositório do GitHub para o conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copie o conteúdo do diretório
config
para o subdiretórioconfig
da instalação do Kafka.cp config/* [path to Kafka installation]/config/
Esses arquivos contêm as configurações do conector.
Atualizar a configuração do Kafka Connect
- Navegue até seu diretório do Kafka.
- Abra o arquivo
config/connect-standalone.properties
em um editor de texto. - Se a
plugin.path property
estiver comentada, remova a marca de comentário. Atualize o
plugin.path property
para incluir o caminho para o JAR do conector.Exemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Defina a propriedade
offset.storage.file.filename
como um nome de arquivo local. No modo independente, o Kafka usa esse arquivo para armazenar dados de deslocamento.Exemplo:
offset.storage.file.filename=/tmp/connect.offsets
Encaminhar eventos do Kafka para o Pub/Sub
Nesta seção, descrevemos como iniciar o conector do coletor, publicar eventos no Kafka e, em seguida, ler as mensagens encaminhadas do Pub/Sub.
Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para receber as mensagens do Kafka.
- PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub para o tópico.
Abra o arquivo
/config/cps-sink-connector.properties
em um editor de texto. Adicione valores para as seguintes propriedades, marcadas como"TODO"
nos comentários:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Substitua:
- KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura.
- PROJECT_ID: o projeto do Google Cloud que contém o tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
No diretório do Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Siga as etapas no guia de início rápido do Apache Kafka para gravar alguns eventos no tópico do Kafka.
Use a CLI gcloud para ler os eventos do Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Encaminhar mensagens do Pub/Sub para o Kafka
Nesta seção, descrevemos como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.
Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
- PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub.
Abra o arquivo chamado
/config/cps-source-connector.properties
em um editor de texto. Adicione valores para as seguintes propriedades, marcadas como"TODO"
nos comentários:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Substitua:
- KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
- PROJECT_ID: o projeto do Google Cloud que contém o tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub.
No diretório do Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use a CLI gcloud para publicar uma mensagem no Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leia a mensagem do Kafka. Siga as etapas no guia de início rápido do Apache Kafka para ler as mensagens do tópico do Kafka.
Conversão de mensagem
Um registro do Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.
O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades nos arquivos de configuração do conector:
key.converter
: o conversor usado para serializar chaves de registro.value.converter
: o conversor usado para serializar valores de registro.
O corpo de uma mensagem do Pub/Sub é um objeto ByteString
. Portanto, a
conversão mais eficiente é copiar o payload diretamente. Por isso, recomendamos
usar um conversor que produza tipos de dados primitivos (esquema inteiro, flutuante,
string ou bytes), sempre que possível, para evitar a desserialização e
reserialização do mesmo corpo da mensagem.
Conversão do Kafka para o Pub/Sub
O conector do coletor converte os registros Kafka em mensagens do Pub/Sub da seguinte maneira:
- A chave de registro do Kafka é armazenada como um atributo chamado
"key"
na mensagem do Pub/Sub. - Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se você definir a opção de configuração
headers.publish
comotrue
, o conector gravará os cabeçalhos como atributos do Pub/Sub. O conector pula todos os cabeçalhos que excedem os limites dos atributos de mensagens do Pub/Sub. - Nos esquemas de número inteiro, float, string e bytes, o conector transmite os bytes do valor do registro Kafka diretamente para o corpo da mensagem do Pub/Sub.
- Nos esquemas de estrutura, o conector grava cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for
{ "id"=123 }
, a mensagem do Pub/Sub resultante terá um atributo"id"="123"
. O valor do campo é sempre convertido em uma string. - Nos esquemas do mapa, o conector grava cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for
{"alice"=1,"bob"=2}
, a mensagem resultante do Pub/Sub tem dois atributos,"alice"="1"
e"bob"="2"
. As chaves e valores são convertidos em strings.
Os esquemas de estrutura e mapa têm alguns comportamentos adicionais:
Também é possível especificar um campo de struct ou uma chave de mapa específicos para ser o corpo da mensagem, definindo a propriedade de configuração
messageBodyName
. O valor do campo ou da chave é armazenado como umByteString
no corpo da mensagem. Se você não definirmessageBodyName
, o corpo da mensagem estará vazio para esquemas de estrutura e mapa.Para valores de matriz, o conector oferece suporte apenas a tipos de matriz primitiva. A sequência de valores na matriz é concatenada em um único objeto
ByteString
.
Conversão do Pub/Sub para Kafka
O conector de origem converte mensagens do Pub/Sub em registros do Kafka da seguinte maneira:
Chave de registro Kafka: por padrão, a chave é definida como
null
. Se quiser, especifique um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuraçãokafka.key.attribute
. Nesse caso, o conector procura um atributo com esse nome e define a chave de registro para o valor do atributo. Se o atributo especificado não estiver presente, a chave do registro será definida comonull
.Valor do registro Kafka. O conector grava o valor do registro da seguinte forma:
Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector gravará o corpo da mensagem do Pub/Sub diretamente no valor do registro Kafka como um tipo
byte[]
, usando o conversor especificado porvalue.converter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
forfalse
, o conector gravará uma estrutura no valor do registro. O struct contém um campo para cada atributo e um campo chamado"message"
, em que o valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Nesse caso, é necessário usar um
value.converter
compatível com esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
fortrue
, o conector gravará os atributos como cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente no valor de registro Kafka como um tipobyte[]
, usando o conversor especificado porvalue.converter
.
Cabeçalhos de registro do Kafka. Por padrão, os cabeçalhos estão vazios, a menos que você defina
kafka.record.headers
comotrue
.
Opções de configuração
Além das configurações fornecidas pela API Kafka Connect, o conector do Kafka do grupo do Pub/Sub é compatível com as configurações a seguir.
Opções de configuração do conector do coletor
O conector do coletor é compatível com as opções de configuração a seguir.
Configuração | Tipo de dados | Descrição |
---|---|---|
connector.class |
String |
Obrigatório. A classe Java do conector. Para o conector do coletor do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
O endpoint do Pub/Sub a ser usado. Padrão: |
cps.project |
String |
Obrigatório. O Google Cloud que contém o tópico do Pub/Sub. |
cps.topic |
String |
Obrigatório. O tópico do Pub/Sub em que os registros do Kafka serão publicados. |
gcp.credentials.file.path |
String |
Opcional. O caminho para um arquivo que armazena as credenciais do Google Cloud para autenticar o Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite. |
headers.publish |
Boolean |
Quando Padrão: |
maxBufferBytes |
Long |
O número máximo de bytes a serem recebidos em uma partição do tópico do Kafka antes de publicá-los no Pub/Sub. Padrão: 10000000. |
maxBufferSize |
Integer |
O número máximo de registros que serão recebidos em uma partição do tópico do Kafka antes da publicação no Pub/Sub. Padrão: 100; |
maxDelayThresholdMs |
Integer |
O tempo máximo de espera para alcançar
Padrão: 100; |
maxOutstandingMessages |
Long |
É o número máximo de registros que podem ser pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie a publicação. Padrão: |
maxOutstandingRequestBytes |
Long |
É o número máximo de bytes pendentes que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie a publicação. Padrão: |
maxRequestTimeoutMs |
Integer |
Tempo limite para solicitações de publicação individuais para o Pub/Sub, em milissegundos. Padrão: 10000. |
maxTotalTimeoutMs |
Integer |
O tempo limite total, em milissegundos, de uma chamada a ser publicada no Pub/Sub, incluindo novas tentativas. Padrão: 60000. |
metadata.publish |
Boolean |
Quando Padrão: |
messageBodyName |
String |
Ao usar um esquema de valor de struct ou mapa, especifica o nome de um campo ou chave a ser usado como o corpo da mensagem do Pub/Sub. Consulte Conversão do Kafka para o Pub/Sub. Padrão: |
orderingKeySource |
String |
Especifica como definir a chave de ordem na mensagem do Pub/Sub. Pode ser um dos seguintes valores:
Padrão: |
topics |
String |
Obrigatório. Uma lista separada por vírgulas de tópicos do Kafka para leitura. |
Opções de configuração do conector de origem
O conector de origem é compatível com as opções de configuração a seguir.
Configuração | Tipo de dados | Descrição |
---|---|---|
connector.class |
String |
Obrigatório. A classe Java do conector. Para o conector de origem do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
O endpoint do Pub/Sub a ser usado. Padrão: |
cps.makeOrderingKeyAttribute |
Boolean |
Quando Padrão: |
cps.maxBatchSize |
Integer |
O número máximo de mensagens em lote para solicitações de envio para o Pub/Sub. Padrão: 100 |
cps.project |
String |
Obrigatório. O projeto do Google Cloud que contém o tópico do Pub/Sub. |
cps.subscription |
String |
Obrigatório. O nome da assinatura do Pub/Sub de onde as mensagens serão extraídas. |
gcp.credentials.file.path |
String |
Opcional. O caminho para um arquivo que armazena as credenciais do Google Cloud para autenticar o Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite. |
kafka.key.attribute |
String |
O atributo de mensagem do Pub/Sub a ser usado como chave para
mensagens publicadas no Kafka. Se definida como Padrão: |
kafka.partition.count |
Integer |
O número de partições do Kafka para o tópico do Kafka em que as mensagens
são publicadas. Esse parâmetro será ignorado se o esquema da partição for
Padrão: 1; |
kafka.partition.scheme |
String |
O esquema para atribuir uma mensagem a uma partição no Kafka. Pode ser um dos seguintes valores:
Padrão: |
kafka.record.headers |
Boolean |
Se |
kafka.topic |
String |
Obrigatório. O tópico do Kafka que recebe mensagens do Pub/Sub. |
Como receber suporte
Se precisar de ajuda, crie um tíquete de suporte. Para perguntas gerais e discussões, crie um problema no repositório do GitHub (link em inglês).
A seguir
- Entenda as diferenças entre o Kafka e o Pub/Sub.
- Saiba mais sobre o conector de Kafka do grupo do Pub/Sub.
- Consulte o repositório GitHub (em inglês) do conector do Kafka do grupo do Pub/Sub.