Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de Kafka do grupo do Pub/Sub.
Sobre o conector de Kafka do grupo do Pub/Sub
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. Ele é comumente usado em arquiteturas distribuídas para permitir a comunicação entre componentes com acoplamento flexível. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de modo assíncrono. Assim como no Kafka, é possível usar o Pub/Sub para a comunicação entre componentes da arquitetura em nuvem.
Com o conector de Kafka do grupo Pub/Sub, é possível integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:
- O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
- O conector de origem lê mensagens de um tópico do Pub/Sub e as publica no Kafka.
Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:
- Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
- Você tem um sistema de front-end que armazena eventos no Kafka fora do Google Cloud, mas também usa o Google Cloud para executar alguns dos serviços de back-end, que precisam receber os eventos do Kafka.
- Você coleta registros de uma solução Kafka local e os envia ao Google Cloud para análise de dados.
- Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando o Kafka.
O conector requer o Kafka Connect, que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar o conector, execute o Kafka Connect junto com o cluster do Kafka.
Neste documento, você precisa conhecer o Kafka e o Pub/Sub. Antes de ler este documento, é recomendável concluir um dos guias de início rápido do Pub/Sub.
O conector do Pub/Sub não é compatível com a integração entre as ACLs do Google Cloud IAM e do Kafka Connect.
Começar a usar o conector
Nesta seção, explicamos as seguintes tarefas:- Configurar o conector de Kafka do grupo do Pub/Sub
- Enviar eventos do Kafka para o Pub/Sub.
- Envie mensagens do Pub/Sub para o Kafka.
Pré-requisitos
Instale o Kafka
Siga o guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua estas etapas no guia de início rápido:
- Faça o download da versão mais recente do Kafka e extraia-o.
- Inicie o ambiente Kafka.
- Criar um tópico Kafka.
Autenticar
O conector do Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:
- Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Crie ou selecione um projeto do Google Cloud.
-
Crie um projeto do Google Cloud:
gcloud projects create PROJECT_ID
Substitua
PROJECT_ID
por um nome para o projeto do Google Cloud que você está criando. -
Selecione o projeto do Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_ID
pelo nome do projeto do Google Cloud.
-
-
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Crie ou selecione um projeto do Google Cloud.
-
Crie um projeto do Google Cloud:
gcloud projects create PROJECT_ID
Substitua
PROJECT_ID
por um nome para o projeto do Google Cloud que você está criando. -
Selecione o projeto do Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_ID
pelo nome do projeto do Google Cloud.
-
-
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
Baixar o JAR do conector
Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.
Copie os arquivos de configuração do conector
Clone ou faça o download do repositório do GitHub para o conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copie o conteúdo do diretório
config
para o subdiretórioconfig
da instalação do Kafka.cp config/* [path to Kafka installation]/config/
Esses arquivos contêm as configurações do conector.
Atualizar a configuração do Kafka Connect
- Navegue até o diretório que contém o binário do Kafka Connect que você fez download.
- No diretório binário do Kafka Connect, abra o arquivo chamado
config/connect-standalone.properties
em um editor de texto. - Se a
plugin.path property
estiver comentada, remova a marca de comentário. Atualize o
plugin.path property
para incluir o caminho para o JAR do conector.Exemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Defina a propriedade
offset.storage.file.filename
como um nome de arquivo local. No modo independente, o Kafka usa esse arquivo para armazenar dados de deslocamento.Exemplo:
offset.storage.file.filename=/tmp/connect.offsets
Encaminhar eventos do Kafka para o Pub/Sub
Esta seção descreve como iniciar o conector do coletor, publicar eventos no Kafka e, em seguida, ler as mensagens encaminhadas do Pub/Sub.
Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para receber as mensagens do Kafka.
- PUBSUB_SUBSCRIPTION: o nome de uma assinatura Pub/Sub do tópico.
Abra o arquivo
/config/cps-sink-connector.properties
em um editor de texto. Adicione valores para as propriedades abaixo, marcadas como"TODO"
nos comentários:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Substitua:
- KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura.
- PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub para receber as mensagens do Kafka.
No diretório Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Siga as etapas do guia de início rápido do Apache Kafka para gravar alguns eventos no tópico do Kafka.
Use a CLI gcloud para ler os eventos do Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Encaminhar mensagens do Pub/Sub para o Kafka
Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.
Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
- PUBSUB_SUBSCRIPTION: o nome de uma assinatura do Pub/Sub.
Abra o arquivo chamado
/config/cps-source-connector.properties
em um editor de texto. Adicione valores para as propriedades abaixo, que estão marcadas como"TODO"
nos comentários:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Substitua:
- KAFKA_TOPIC: os tópicos do Kafka para receber as mensagens do Pub/Sub.
- PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub.
No diretório Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use a CLI gcloud para publicar uma mensagem no Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leia a mensagem do Kafka. Siga as etapas do guia de início rápido do Apache Kafka para ler as mensagens do tópico Kafka.
Conversão de mensagem
Um registro Kafka contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Uma mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.
O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades nos arquivos de configuração do conector:
key.converter
: o conversor usado para serializar chaves de registro.value.converter
: o conversor usado para serializar valores de registro.
O corpo de uma mensagem do Pub/Sub é um objeto ByteString
. Portanto, a conversão mais eficiente é copiar o payload diretamente. Por esse motivo, recomendamos o uso de um conversor que produza tipos de dados primitivos (número inteiro, flutuante, string ou esquema de bytes) sempre que possível para evitar a desserialização e reserialização do mesmo corpo da mensagem.
Conversão do Kafka para o Pub/Sub
O conector do coletor converte registros do Kafka em mensagens do Pub/Sub da seguinte maneira:
- A chave de registro do Kafka é armazenada como um atributo chamado
"key"
na mensagem do Pub/Sub. - Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se você definir a opção de configuração
headers.publish
comotrue
, o conector gravará os cabeçalhos como atributos do Pub/Sub. O conector ignora os cabeçalhos que excedem os limites de atributos de mensagens do Pub/Sub. - Para esquemas de número inteiro, flutuante, string e bytes, o conector transmite os bytes do valor do registro do Kafka diretamente para o corpo da mensagem do Pub/Sub.
- Para esquemas de estrutura, o conector grava cada campo como um atributo da mensagem do Pub/Sub. Por exemplo, se o campo for
{ "id"=123 }
, a mensagem do Pub/Sub resultante terá um atributo"id"="123"
. O valor do campo é sempre convertido em uma string. Os tipos de mapa e struct não são aceitos como tipos de campo em um struct. - Para esquemas de mapas, o conector grava cada par de chave-valor como um atributo da mensagem do Pub/Sub. Por exemplo, se o mapa for
{"alice"=1,"bob"=2}
, a mensagem resultante do Pub/Sub terá dois atributos,"alice"="1"
e"bob"="2"
. As chaves e os valores são convertidos em strings.
Os esquemas de struct e mapa têm alguns outros comportamentos:
Também é possível especificar um campo de struct ou uma chave de mapa específico como o corpo da mensagem, definindo a propriedade de configuração
messageBodyName
. O valor do campo ou da chave é armazenado comoByteString
no corpo da mensagem. Se você não definirmessageBodyName
, o corpo da mensagem ficará vazio para esquemas de struct e mapa.Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. A sequência de valores na matriz é concatenada em um único objeto
ByteString
.
Conversão do Pub/Sub para o Kafka
O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte maneira:
Chave de gravação do Kafka: por padrão, a chave é definida como
null
. Opcionalmente, especifique um atributo de mensagem do Pub/Sub para usar como a chave, definindo a opção de configuraçãokafka.key.attribute
. Nesse caso, ele procura um atributo com esse nome e define a chave de registro como o valor do atributo. Se o atributo especificado não estiver presente, a chave de registro será definida comonull
.Valor do registro Kafka: O conector grava o valor do registro da seguinte forma:
Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector gravará o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipo
byte[]
, usando o conversor especificado porvalue.converter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
forfalse
, o conector gravará uma estrutura no valor do registro. O struct contém um campo para cada atributo e um campo chamado"message"
cujo valor é o corpo da mensagem do Pub/Sub (armazenado como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Nesse caso, é necessário usar um
value.converter
compatível com esquemas destruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
fortrue
, o conector gravará os atributos como cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente no valor do registro do Kafka como um tipobyte[]
, usando o conversor especificado porvalue.converter
.
Cabeçalhos de registro Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina
kafka.record.headers
comotrue
.
Opções de configuração
Além das configurações fornecidas pela API Kafka Connect, o conector de Kafka do grupo Pub/Sub é compatível com as configurações a seguir.
Opções de configuração do conector do coletor
O conector do coletor é compatível com as opções de configuração a seguir.
Configuração | Tipo de dados | Descrição |
---|---|---|
connector.class |
String |
Obrigatório. A classe Java do conector. Para o conector do coletor do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.sink.CloudPubSubSinkConnector .
|
cps.endpoint |
String |
O endpoint do Pub/Sub a ser usado. Padrão: |
cps.project |
String |
Obrigatório. O Google Cloud que contém o tópico do Pub/Sub. |
cps.topic |
String |
Obrigatório. O tópico do Pub/Sub em que os registros do Kafka serão publicados. |
gcp.credentials.file.path |
String |
Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite. |
headers.publish |
Boolean |
Quando Padrão: |
maxBufferBytes |
Long |
O número máximo de bytes a serem recebidos em uma partição do Kafka de tópico antes de publicá-los no Pub/Sub. Padrão: 10000000. |
maxBufferSize |
Integer |
O número máximo de registros a serem recebidos em uma partição de tópico do Kafka antes de publicá-los no Pub/Sub. Padrão: 100; |
maxDelayThresholdMs |
Integer |
O tempo máximo de espera para alcançar
Padrão: 100; |
maxOutstandingMessages |
Long |
O número máximo de registros que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie outras publicações. Padrão: |
maxOutstandingRequestBytes |
Long |
É o número máximo total de bytes que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie publicações. Padrão: |
maxRequestTimeoutMs |
Integer |
O tempo limite para solicitações de publicação individuais no Pub/Sub, em milissegundos. Padrão: 10000. |
maxTotalTimeoutMs |
Integer |
O tempo limite total, em milissegundos, de uma chamada ser publicada no Pub/Sub, incluindo novas tentativas. Padrão: 60.000. |
metadata.publish |
Boolean |
Quando Padrão: |
messageBodyName |
String |
Ao usar um esquema de struct ou de valor de mapa, especifica o nome de um campo ou chave para usar como o corpo da mensagem do Pub/Sub. Consulte Conversão do Kafka para o Pub/Sub. Padrão: |
orderingKeySource |
String |
Especifica como definir a chave de ordem na mensagem do Pub/Sub. Pode ser um dos seguintes valores:
Padrão: |
topics |
String |
Obrigatório. Uma lista separada por vírgulas de tópicos do Kafka para leitura. |
Opções de configuração do conector de origem
O conector de origem é compatível com as opções de configuração a seguir.
Configuração | Tipo de dados | Descrição |
---|---|---|
connector.class |
String |
Obrigatório. A classe Java do conector. Para o conector de origem do Pub/Sub, o valor precisa ser com.google.pubsub.kafka.source.CloudPubSubSourceConnector .
|
cps.endpoint |
String |
O endpoint do Pub/Sub a ser usado. Padrão: |
cps.makeOrderingKeyAttribute |
Boolean |
Quando Padrão: |
cps.maxBatchSize |
Integer |
O número máximo de mensagens a serem agrupadas por solicitação de envio para o Pub/Sub. Padrão: 100 |
cps.project |
String |
Obrigatório. O projeto do Google Cloud que contém o tópico do Pub/Sub. |
cps.subscription |
String |
Obrigatório. O nome da assinatura do Pub/Sub que receberá as mensagens recebidas. |
gcp.credentials.file.path |
String |
Opcional. O caminho para um arquivo que armazena credenciais do Google Cloud para autenticar o Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Um blob JSON que contém o Google Cloud para autenticar o Pub/Sub Lite. |
kafka.key.attribute |
String |
O atributo de mensagem do Pub/Sub a ser usado como chave para
mensagens publicadas no Kafka. Se definido como Padrão: |
kafka.partition.count |
Integer |
O número de partições do Kafka para o tópico Kafka em que as mensagens
são publicadas. Esse parâmetro será ignorado se o esquema de partição for
Padrão: 1; |
kafka.partition.scheme |
String |
O esquema para atribuir uma mensagem a uma partição no Kafka. Pode ser um dos seguintes valores:
Padrão: |
kafka.record.headers |
Boolean |
Se |
kafka.topic |
String |
Obrigatório. O tópico Kafka que recebe mensagens do Pub/Sub. |
Como receber suporte
Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.
A seguir
- Entenda as diferenças entre o Kafka e o Pub/Sub.
- Saiba mais sobre o conector de Kafka do grupo do Pub/Sub.
- Consulte o repositório do GitHub para o conector de Kafka do grupo do Pub/Sub.