Neste documento, descrevemos como integrar o Apache Kafka e o Pub/Sub usando o conector de Kafka do grupo do Pub/Sub.
Sobre o conector de Kafka do grupo do Pub/Sub
O Apache Kafka é uma plataforma de código aberto para eventos de streaming. É comum usadas em arquiteturas distribuídas para permitir a comunicação entre acoplados. O Pub/Sub é um serviço gerenciado para enviar e receber mensagens de forma assíncrona. Assim como no Kafka, é possível usar Pub/Sub para comunicação entre componentes em sua nuvem do Terraform.
Com o conector de Kafka do grupo Pub/Sub, é possível integrar esses dois sistemas. Os seguintes conectores são empacotados no JAR do conector:
- O conector do coletor lê registros de um ou mais tópicos do Kafka e os publica no Pub/Sub.
- O conector de origem lê mensagens de um tópico do Pub/Sub. e os publica no Kafka.
Veja alguns cenários em que é possível usar o conector de Kafka do grupo do Pub/Sub:
- Você está migrando uma arquitetura baseada em Kafka para o Google Cloud.
- Você tem um sistema de front-end que armazena eventos no Kafka fora Google Cloud, mas também o usa para executar alguns dos back-ends que precisam receber os eventos do Kafka.
- Você coleta registros de uma solução Kafka local e os envia para Google Cloud para análise de dados.
- Você tem um sistema de front-end que usa o Google Cloud, mas também armazena dados no local usando Kafka.
O conector exige Kafka Connect, que é um framework para streaming de dados entre o Kafka e outros sistemas. Para usar é necessário executar o Kafka Connect junto com o cluster do Kafka.
Para seguir este documento, você precisa conhecer o Kafka e o Pub/Sub Antes de ler este documento, é uma boa ideia conclua uma das Guias de início rápido do Pub/Sub.
O conector do Pub/Sub não dá suporte a nenhuma integração entre as ACLs do Google Cloud IAM e do Kafka Connect.
Começar a usar o conector
Esta seção mostra as seguintes tarefas:- Configurar o conector de Kafka do grupo do Pub/Sub
- Enviar eventos do Kafka para o Pub/Sub.
- Envie mensagens do Pub/Sub para o Kafka.
Pré-requisitos
Instale o Kafka
Siga o Guia de início rápido do Apache Kafka para instalar um Kafka de nó único na máquina local. Conclua essas etapas em guia de início rápido:
- Faça o download da versão mais recente do Kafka e extraia-o.
- Inicie o ambiente Kafka.
- Criar um tópico Kafka.
Autenticar
O conector de Kafka do grupo do Pub/Sub precisa ser autenticado com o Pub/Sub para para enviar e receber mensagens do Pub/Sub. Para configurar a autenticação, siga estas etapas:
- Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Crie ou selecione um projeto do Google Cloud.
-
Crie um projeto do Google Cloud:
gcloud projects create PROJECT_ID
Substitua
PROJECT_ID
por um nome para o projeto do Google Cloud que você está criando. -
Selecione o projeto do Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_ID
pelo nome do projeto do Google Cloud.
-
-
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
- Instale a CLI do Google Cloud.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init
-
Crie ou selecione um projeto do Google Cloud.
-
Crie um projeto do Google Cloud:
gcloud projects create PROJECT_ID
Substitua
PROJECT_ID
por um nome para o projeto do Google Cloud que você está criando. -
Selecione o projeto do Google Cloud que você criou:
gcloud config set project PROJECT_ID
Substitua
PROJECT_ID
pelo nome do projeto do Google Cloud.
-
-
Crie as credenciais de autenticação para sua Conta do Google:
gcloud auth application-default login
-
Atribua os papéis à sua Conta do Google. Execute uma vez o seguinte comando para cada um dos seguintes papéis do IAM:
roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
- Substitua
PROJECT_ID
pela ID do seu projeto. - Substitua
EMAIL_ADDRESS
pelo seu endereço de e-mail. - Substitua
ROLE
por cada papel individual.
- Substitua
Baixar o JAR do conector
Faça o download do arquivo JAR do conector na sua máquina local. Para mais informações, consulte Adquirir o conector no arquivo readme do GitHub.
Copie os arquivos de configuração do conector
Clone ou faça o download do Repositório do GitHub para o conector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copie o conteúdo do diretório
config
para o subdiretórioconfig
de da instalação do Kafka.cp config/* [path to Kafka installation]/config/
Esses arquivos contêm as configurações do conector.
Atualizar a configuração do Kafka Connect
- Navegue até o diretório que contém o binário do Kafka Connect que você baixado.
- No diretório binário do Kafka Connect, abra o arquivo chamado
config/connect-standalone.properties
em um editor de texto. - Se a
plugin.path property
estiver comentada, remova a marca de comentário. Atualize o
plugin.path property
para incluir o caminho para o JAR do conector.Exemplo:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Defina a propriedade
offset.storage.file.filename
como um nome de arquivo local. Em modo autônomo, o Kafka usa esse arquivo para armazenar dados de deslocamento.Exemplo:
offset.storage.file.filename=/tmp/connect.offsets
Encaminhar eventos do Kafka para o Pub/Sub
Esta seção descreve como iniciar o conector do coletor, publicar eventos no Kafka e ler as mensagens encaminhadas do Pub/Sub.
Use a Google Cloud CLI para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub para para receber as mensagens do Kafka.
- PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub uma assinatura para o tópico.
Abra o arquivo
/config/cps-sink-connector.properties
em um editor de texto. Adicionar valores para as seguintes propriedades, que estão marcadas como"TODO"
no comentários:topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
Substitua:
- KAFKA_TOPICS: uma lista separada por vírgulas de tópicos do Kafka para leitura se originou.
- PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub que receberá o do Kafka.
No diretório Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties
Siga as etapas no Guia de início rápido do Apache Kafka para gravar alguns eventos no seu tópico Kafka.
Use a CLI gcloud para ler os eventos do Pub/Sub.
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
Encaminhar mensagens do Pub/Sub para o Kafka
Esta seção descreve como iniciar o conector de origem, publicar mensagens no Pub/Sub e ler as mensagens encaminhadas do Kafka.
Use a CLI gcloud para criar um tópico do Pub/Sub com uma assinatura.
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
Substitua:
- PUBSUB_TOPIC: o nome de um tópico do Pub/Sub.
- PUBSUB_SUBSCRIPTION: o nome de um Pub/Sub assinatura.
Abra o arquivo chamado
/config/cps-source-connector.properties
em um texto editor. Adicione valores para as seguintes propriedades, que estão marcadas como"TODO"
em os comentários:kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
Substitua:
- KAFKA_TOPIC: os tópicos do Kafka que receberão o Mensagens do Pub/Sub.
- PROJECT_ID: o projeto do Google Cloud que contém seu tópico do Pub/Sub.
- PUBSUB_TOPIC: o tópico do Pub/Sub.
No diretório Kafka, execute o seguinte comando:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties
Use a CLI gcloud para publicar uma mensagem no Pub/Sub.
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
Leia a mensagem do Kafka. Siga as etapas no Guia de início rápido do Apache Kafka para ler as mensagens do tópico Kafka.
Conversão de mensagem
um registro Kafka; contém uma chave e um valor, que são matrizes de bytes de comprimento variável. Opcionalmente, um O registro Kafka também pode ter cabeçalhos, que são pares de chave-valor. Um Mensagem do Pub/Sub tem duas partes principais: o corpo da mensagem e zero ou mais atributos de chave-valor.
O Kafka Connect usa conversores para serializar chaves e valores de e para o Kafka. Para controlar a serialização, defina as seguintes propriedades no conector arquivos de configuração:
key.converter
: o conversor usado para serializar chaves de registro.value.converter
: o conversor usado para serializar valores de registro.
O corpo de uma mensagem do Pub/Sub é um objeto ByteString
. Portanto,
a conversão mais eficiente é copiar o payload diretamente. Por esse motivo,
recomendamos o uso de um conversor que produz tipos de dados primitivos (número inteiro, flutuante,
string ou esquema de bytes) sempre que possível, para evitar a desserialização e
reserializando o mesmo corpo da mensagem.
Conversão do Kafka para o Pub/Sub
O conector do coletor converte os registros do Kafka em mensagens do Pub/Sub da seguinte forma:
- A chave de registro do Kafka é armazenada como um atributo chamado
"key"
no Mensagem do Pub/Sub. - Por padrão, o conector descarta todos os cabeçalhos no registro Kafka. No entanto, se
Você define a opção de configuração
headers.publish
comotrue
, o conector grava os cabeçalhos como atributos do Pub/Sub. O conector pula os cabeçalhos que excederem o Pub/Sub limites de atributos de mensagem. - Para esquemas de número inteiro, flutuante, string e bytes, o conector transmite os bytes. do valor do registro Kafka diretamente na mensagem do Pub/Sub body.
- Para esquemas struct, o conector grava cada campo como um atributo do
Mensagem do Pub/Sub. Por exemplo, se o campo for
{ "id"=123 }
, a mensagem resultante do Pub/Sub tem um atributo"id"="123"
. O é sempre convertido em uma string. Os tipos Map e struct não são compatíveis como tipos de campo em um struct. - Para esquemas de mapas, o conector grava cada par de chave-valor como um atributo de
a mensagem do Pub/Sub. Por exemplo, se o mapa for
{"alice"=1,"bob"=2}
, a mensagem resultante do Pub/Sub terá duas atributos"alice"="1"
e"bob"="2"
. As chaves e os valores são convertidos em strings.
Os esquemas de struct e mapa têm alguns outros comportamentos:
Opcionalmente, você pode especificar um campo struct ou chave de mapa específico como corpo da mensagem definindo a propriedade de configuração
messageBodyName
. O do campo ou da chave é armazenado comoByteString
no corpo da mensagem. Se você não definirmessageBodyName
, o corpo da mensagem ficará vazio para struct e esquemas de mapa.Para valores de matriz, o conector aceita apenas tipos de matriz primitiva. O sequência de valores na matriz é concatenada em uma única
ByteString
objeto.
Conversão do Pub/Sub para o Kafka
O conector de origem converte mensagens do Pub/Sub em registros Kafka da seguinte forma:
Chave de gravação do Kafka: por padrão, a chave é definida como
null
. Opcionalmente, você pode especificar um atributo de mensagem do Pub/Sub para usar como chave, definindo a opção de configuraçãokafka.key.attribute
. Nesse caso, o conector procura um atributo com esse nome e define a chave de registro como o . Se o atributo especificado não estiver presente, a chave de registro será Defina comonull
.Valor do registro Kafka: O conector grava o valor do registro da seguinte forma:
Se a mensagem do Pub/Sub não tiver atributos personalizados, o conector grava o corpo da mensagem do Pub/Sub diretamente no registro do Kafka como um tipo
byte[]
, usando o conversor especificado porvalue.converter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
forfalse
, o conector grava uma estrutura no . O struct contém um campo para cada atributo e um campo chamado"message"
, cujo valor é o corpo da mensagem do Pub/Sub (armazenados como bytes):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }
Nesse caso, é necessário usar um
value.converter
compatível com Esquemasstruct
, comoorg.apache.kafka.connect.json.JsonConverter
.Se a mensagem do Pub/Sub tiver atributos personalizados e
kafka.record.headers
fortrue
, o conector grava os atributos como Cabeçalhos de registro Kafka. Ele grava o corpo da mensagem do Pub/Sub diretamente ao valor do registro Kafka como um tipobyte[]
, usando o conversor especificado porvalue.converter
.
Cabeçalhos de registro Kafka. Por padrão, os cabeçalhos ficam vazios, a menos que você defina
kafka.record.headers
paratrue
.
Opções de configuração
Além das configurações fornecidas pela API Kafka Connect, a O conector de Kafka do grupo do Pub/Sub oferece suporte às configurações a seguir.
Opções de configuração do conector do coletor
O conector do coletor é compatível com as opções de configuração a seguir.
Configuração | Tipo de dados | Descrição |
---|---|---|
connector.class |
String |
Obrigatório. A classe Java do conector. Para
conector do coletor do Pub/Sub, o valor precisa ser
com.google.pubsub.kafka.sink.CloudPubSubSinkConnector :
|
cps.endpoint |
String |
O endpoint do Pub/Sub a ser usado. Padrão: |
cps.project |
String |
Obrigatório. O Google Cloud que contém os tópico do Pub/Sub. |
cps.topic |
String |
Obrigatório. O tópico do Pub/Sub a ser publicado O Kafka faz os registros. |
gcp.credentials.file.path |
String |
Opcional. O caminho para um arquivo que armazena as credenciais do Google Cloud para autenticar o Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Um blob JSON que contém o Google Cloud para para autenticar o Pub/Sub Lite. |
headers.publish |
Boolean |
Quando Padrão: |
maxBufferBytes |
Long |
O número máximo de bytes a serem recebidos em uma partição do Kafka de tópico antes de publicá-los no Pub/Sub. Padrão: 10000000. |
maxBufferSize |
Integer |
O número máximo de registros a serem recebidos em uma partição de tópico do Kafka antes de publicá-los no Pub/Sub. Padrão: 100; |
maxDelayThresholdMs |
Integer |
O tempo máximo de espera para alcançar
Padrão: 100; |
maxOutstandingMessages |
Long |
O número máximo de registros que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie mais publicação. Padrão: |
maxOutstandingRequestBytes |
Long |
O número máximo de bytes totais que podem estar pendentes, incluindo lotes incompletos e pendentes, antes que o editor bloqueie mais publicação. Padrão: |
maxRequestTimeoutMs |
Integer |
O tempo limite para solicitações de publicação individuais no Pub/Sub em milissegundos. Padrão: 10000. |
maxTotalTimeoutMs |
Integer |
O tempo limite total, em milissegundos, de uma chamada para publicar Pub/Sub, incluindo novas tentativas. Padrão: 60.000. |
metadata.publish |
Boolean |
Quando Padrão: |
messageBodyName |
String |
Ao usar um esquema de valor de struct ou mapa, especifica o nome de um campo ou chave para usar como o corpo da mensagem do Pub/Sub. Consulte Conversão do Kafka para o Pub/Sub. Padrão: |
orderingKeySource |
String |
Especifica como definir a chave de ordem no Pub/Sub mensagem. Pode ser um dos seguintes valores:
Padrão: |
topics |
String |
Obrigatório. Uma lista separada por vírgulas de tópicos Kafka para ler. |
Opções de configuração do conector de origem
O conector de origem é compatível com as opções de configuração a seguir.
Configuração | Tipo de dados | Descrição |
---|---|---|
connector.class |
String |
Obrigatório. A classe Java do conector. Para
conector de origem do Pub/Sub, o valor precisa ser
com.google.pubsub.kafka.source.CloudPubSubSourceConnector :
|
cps.endpoint |
String |
O endpoint do Pub/Sub a ser usado. Padrão: |
cps.makeOrderingKeyAttribute |
Boolean |
Quando Padrão: |
cps.maxBatchSize |
Integer |
O número máximo de mensagens a serem agrupadas por solicitação de envio para Pub/Sub Padrão: 100 |
cps.project |
String |
Obrigatório. O projeto do Google Cloud que contém tópico do Pub/Sub. |
cps.subscription |
String |
Obrigatório. O nome do Pub/Sub assinatura da qual receber mensagens. |
gcp.credentials.file.path |
String |
Opcional. O caminho para um arquivo que armazena as credenciais do Google Cloud para autenticar o Pub/Sub Lite. |
gcp.credentials.json |
String |
Opcional. Um blob JSON que contém o Google Cloud para para autenticar o Pub/Sub Lite. |
kafka.key.attribute |
String |
O atributo de mensagem do Pub/Sub que será usado como chave para
de mensagens publicadas no Kafka. Se definido como Padrão: |
kafka.partition.count |
Integer |
O número de partições do Kafka para o tópico Kafka em que as mensagens
forem publicados. Este parâmetro será ignorado se o esquema de partição for
Padrão: 1; |
kafka.partition.scheme |
String |
O esquema para atribuir uma mensagem a uma partição no Kafka. Pode ser um dos seguintes valores:
Padrão: |
kafka.record.headers |
Boolean |
Se |
kafka.topic |
String |
Obrigatório. O tópico Kafka que recebe mensagens Pub/Sub |
Como receber suporte
Se precisar de ajuda, crie um tíquete de suporte. Para perguntas e discussões gerais, crie um problema no repositório do GitHub.
A seguir
- Entenda as diferenças entre o Kafka e o Pub/Sub.
- Saiba mais sobre o conector de Kafka do grupo do Pub/Sub.
- Consulte o conector do Kafka do grupo do Pub/Sub Repositório do GitHub.