Gravar o Dataflow no Apache Kafka

Este documento descreve como gravar dados do Dataflow no Apache Kafka.

O conector de E/S do Apache Beam Kafka (KafkaIO) está disponível de forma nativa para Java e também para Python e Go usando o framework de pipelines em vários idiomas do Apache Beam.

Para pipelines em Java, use o conector de E/S gerenciado para ler do Kafka.

Processamento único

Por padrão, o conector KafkaIO não fornece semântica exatamente uma vez para gravações. Isso significa que os dados podem ser gravados no tópico do Kafka várias vezes. Para ativar gravações exatamente únicas, chame o método withEOS. As gravações exatamente uma vez garantem que os dados sejam gravados no tópico de destino do Kafka exatamente uma vez. No entanto, isso também aumenta o custo do pipeline e diminui a capacidade.

Se você não tiver requisitos rígidos para a semântica exatamente uma vez e a lógica no pipeline puder lidar com registros duplicados, ative o modo pelo menos uma vez em todo o pipeline para reduzir custos. Para mais informações, consulte Definir o modo de streaming do pipeline.

Drenagem de pipeline

Se você esgotar o pipeline, a semântica exatamente uma vez não será garantida. A única garantia é que nenhum dado confirmado é perdido. Como resultado, alguns dados podem ser processados enquanto o pipeline está sendo esvaziado, sem a confirmação de deslocamentos de leitura de volta para o Kafka. Para alcançar a semântica pelo menos uma vez no Kafka ao modificar um pipeline, atualize o pipeline em vez de cancelar o job e iniciar um novo.

Ajustar o Kafka para a semântica "exatamente uma vez"

Ajustar transaction.max.timeout.ms e transactional.id.expiration.ms pode complementar sua estratégia de tolerância a falhas e entrega exatamente uma vez. No entanto, o impacto deles depende da natureza da interrupção e da sua configuração específica. Defina transaction.max.timeout.ms próximo ao tempo de retenção dos seus tópicos do Kafka para evitar a duplicação de dados causada por falhas do agente do Kafka.

Se um broker do Kafka ficar temporariamente indisponível (por exemplo, devido a uma falha de partição de rede ou de nó) e um produtor tiver transações em andamento, elas poderão expirar. Aumentar o valor de transaction.max.timeout.ms dá mais tempo para que as transações sejam concluídas após a recuperação de um broker, evitando a necessidade de reiniciar transações e reenviar mensagens. Essa mitigação ajuda indiretamente a manter a semântica exatamente uma vez, reduzindo a chance de mensagens duplicadas causadas por reinicializações de transação. Por outro lado, um tempo de expiração mais curto pode ajudar a limpar IDs de transação inativos com mais rapidez, reduzindo o uso de recursos.

Configurar rede

Por padrão, o Dataflow inicia instâncias na rede padrão de nuvem privada virtual (VPC). Dependendo da configuração do Kafka, talvez seja necessário configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte Especificar uma rede e uma sub-rede. Ao configurar a rede, crie regras de firewall que permitam que as máquinas de worker do Dataflow alcancem os agentes do Kafka.

Se você estiver usando o VPC Service Controls, coloque o cluster do Kafka dentro do perímetro do VPC Service Controls ou estenda os perímetros para a VPN autorizada ou o Cloud Interconnect.

Se o cluster do Kafka for implantado fora do Google Cloud, crie uma conexão de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes vantagens e desvantagens:

A Interconexão dedicada é a melhor opção para desempenho e confiabilidade previsíveis, mas pode levar mais tempo para configurar porque os novos circuitos precisam ser provisionados por terceiros. Com uma topologia baseada em IP público e pouco trabalho em rede, é possível dar os primeiros passos rapidamente.

As próximas duas seções descrevem essas opções em mais detalhes.

Espaço de endereço do RFC 1918 compartilhado

A Interconexão dedicada e a VPN IPsec oferecem acesso direto aos endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se você estiver usando uma topologia baseada em VPN, configure uma VPN de alta capacidade.

Por padrão, o Dataflow inicia instâncias na rede VPC padrão. Em uma topologia de rede privada com rotas explicitamente definidas no Cloud Router, que conectam sub-redes do Google Cloud ao cluster do Kafka, você precisa de mais controle sobre onde localizar as instâncias do Dataflow. É possível usar o Dataflow para configurar os parâmetros de execução de network e subnetwork.

Verifique se a sub-rede correspondente tem endereços IP suficientes disponíveis para que o Dataflow possa iniciar instâncias enquanto tenta escalonar horizontalmente. Além disso, ao criar uma rede separada para iniciar as instâncias do Dataflow, verifique se você tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede padrão já tem essa regra de firewall configurada.

Espaço de endereço IP público

Essa arquitetura usa o Transport Layer Security (TLS, na sigla em inglês) para proteger o tráfego entre clientes externos e o Kafka e usa tráfego não criptografado para comunicação entre agentes. Quando o listener do Kafka se vincula a uma interface de rede usada para comunicação interna e externa, a configuração dele é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes do Kafka no cluster são diferentes das interfaces de rede internas usadas pelo Kafka. Nesses cenários, é possível usar a propriedade advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Os clientes externos se conectam pela porta 9093 por um canal "SSL", e os clientes internos se conectam pela porta 9092 por um canal de texto simples. Ao especificar um endereço em advertised.listeners, use nomes de DNS (kafkabroker-n.mydomain.com, nesta amostra) que resolvem para a mesma instância para tráfego externo e interno. O uso de endereços IP públicos pode não funcionar porque os endereços podem apresentar falha ao resolver o tráfego interno.

Geração de registros

O registro de KafkaIO pode ser bastante detalhado Considere reduzir o nível de registro na produção da seguinte maneira:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Para mais informações, consulte Definir níveis de registro do worker do pipeline.