Este documento descreve como gravar dados do Dataflow no Apache Kafka.
O conector de E/S do Apache Beam Kafka (KafkaIO
) está disponível de forma nativa para
Java
e também para
Python
e Go
usando o framework de pipelines em vários idiomas do Apache Beam.
Para pipelines em Java, use o conector de E/S gerenciado para ler do Kafka.
Processamento único
Por padrão, o conector KafkaIO
não fornece
semântica exatamente uma vez para gravações. Isso
significa que os dados podem ser gravados no tópico do Kafka várias vezes. Para ativar
gravações exatamente únicas, chame o método withEOS
. As gravações exatamente uma vez
garantem que os dados sejam gravados no tópico de destino do Kafka exatamente uma vez.
No entanto, isso também aumenta o custo do pipeline e diminui a capacidade.
Se você não tiver requisitos rígidos para a semântica exatamente uma vez e a lógica no pipeline puder lidar com registros duplicados, ative o modo pelo menos uma vez em todo o pipeline para reduzir custos. Para mais informações, consulte Definir o modo de streaming do pipeline.
Drenagem de pipeline
Se você esgotar o pipeline, a semântica exatamente uma vez não será garantida. A única garantia é que nenhum dado confirmado é perdido. Como resultado, alguns dados podem ser processados enquanto o pipeline está sendo esvaziado, sem a confirmação de deslocamentos de leitura de volta para o Kafka. Para alcançar a semântica pelo menos uma vez no Kafka ao modificar um pipeline, atualize o pipeline em vez de cancelar o job e iniciar um novo.
Ajustar o Kafka para a semântica "exatamente uma vez"
Ajustar transaction.max.timeout.ms
e transactional.id.expiration.ms
pode
complementar sua estratégia de tolerância a falhas e entrega exatamente uma vez.
No entanto, o impacto deles depende da natureza da interrupção e da sua configuração específica. Defina transaction.max.timeout.ms
próximo ao tempo de retenção dos
seus tópicos do Kafka para evitar a duplicação de dados causada por falhas do agente do Kafka.
Se um broker do Kafka ficar temporariamente indisponível (por exemplo, devido a uma falha de partição
de rede ou de nó) e um produtor tiver transações em andamento, elas
poderão expirar. Aumentar o valor de
transaction.max.timeout.ms
dá mais tempo para que as transações sejam concluídas após a
recuperação de um broker, evitando a necessidade de reiniciar transações e
reenviar mensagens. Essa mitigação ajuda indiretamente a manter a semântica exatamente uma vez, reduzindo a chance de mensagens duplicadas causadas por reinicializações de transação. Por outro lado, um tempo de expiração mais curto pode ajudar a limpar
IDs de transação inativos com mais rapidez, reduzindo o uso de recursos.
Configurar rede
Por padrão, o Dataflow inicia instâncias na rede padrão de nuvem privada virtual (VPC). Dependendo da configuração do Kafka, talvez seja necessário configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte Especificar uma rede e uma sub-rede. Ao configurar a rede, crie regras de firewall que permitam que as máquinas de worker do Dataflow alcancem os agentes do Kafka.
Se você estiver usando o VPC Service Controls, coloque o cluster do Kafka dentro do perímetro do VPC Service Controls ou estenda os perímetros para a VPN autorizada ou o Cloud Interconnect.
Se o cluster do Kafka for implantado fora do Google Cloud, crie uma conexão de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes vantagens e desvantagens:
- Conecte-se usando um espaço de endereço RFC 1918 compartilhado usando uma das seguintes opções:
- Acesse seu cluster do Kafka hospedado externamente com endereços IP públicos usando uma das seguintes opções:
- Internet pública
- Peering direto
- Peering de operadora
A Interconexão dedicada é a melhor opção para desempenho e confiabilidade previsíveis, mas pode levar mais tempo para configurar porque os novos circuitos precisam ser provisionados por terceiros. Com uma topologia baseada em IP público e pouco trabalho em rede, é possível dar os primeiros passos rapidamente.
As próximas duas seções descrevem essas opções em mais detalhes.
Espaço de endereço do RFC 1918 compartilhado
A Interconexão dedicada e a VPN IPsec oferecem acesso direto aos endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se você estiver usando uma topologia baseada em VPN, configure uma VPN de alta capacidade.
Por padrão, o Dataflow inicia instâncias na
rede VPC padrão. Em uma topologia de rede privada com rotas explicitamente definidas no Cloud Router, que conectam sub-redes do Google Cloud ao cluster do Kafka, você precisa de mais controle sobre onde localizar as instâncias do Dataflow. É possível
usar o Dataflow para configurar os parâmetros de execução
de network
e subnetwork
.
Verifique se a sub-rede correspondente tem endereços IP suficientes disponíveis para que o Dataflow possa iniciar instâncias enquanto tenta escalonar horizontalmente. Além disso, ao criar uma rede separada para iniciar as instâncias do Dataflow, verifique se você tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede padrão já tem essa regra de firewall configurada.
Espaço de endereço IP público
Essa arquitetura usa o Transport Layer Security (TLS, na sigla em inglês) para proteger o tráfego entre clientes externos e o Kafka e usa tráfego não criptografado para comunicação entre agentes. Quando o listener do Kafka se vincula a uma interface de rede usada
para comunicação interna e externa,
a configuração dele é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes do Kafka
no cluster são diferentes das interfaces de rede internas
usadas pelo Kafka. Nesses cenários, é possível usar a propriedade
advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Os clientes externos se conectam pela porta 9093 por um canal "SSL", e os clientes internos
se conectam pela porta 9092 por um canal de texto simples. Ao especificar um endereço em advertised.listeners
,
use nomes de DNS (kafkabroker-n.mydomain.com
, nesta amostra) que resolvem para a mesma instância
para tráfego externo e interno. O uso de endereços IP públicos pode não funcionar porque os endereços podem apresentar falha ao resolver o tráfego interno.
Geração de registros
O registro de KafkaIO
pode ser bastante detalhado Considere reduzir o nível de registro
na produção da seguinte maneira:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Para mais informações, consulte Definir níveis de registro do worker do pipeline.