Este documento descreve como ler dados do Apache Kafka para o Dataflow.
O conector de E/S do Apache Beam Kafka (KafkaIO
) está disponível de forma nativa para
Java
e também para
Python
e Go
usando o framework de pipelines em vários idiomas do Apache Beam.
Para pipelines em Java, use o conector de E/S gerenciado para ler do Kafka.
Paralelismo
O paralelismo é limitado por dois fatores: o
número máximo de workers
(max_num_workers
) e o número de partições do Kafka. O Dataflow
usa como padrão um fanout de paralelismo de 4 x max_num_workers
. No entanto, o fanout é
limitado pelo número de partições. Por exemplo, se 100 vCPUs estiverem disponíveis,
mas o pipeline só ler de 10 partições do Kafka, o paralelismo máximo será
10.
Para maximizar o paralelismo, é recomendável ter pelo menos 4 x
partições do max_num_workers
Kafka. Se o job usar o
Runner v2, considere definir o paralelismo ainda mais alto.
Um bom ponto de partida é ter partições iguais ao dobro do número de vCPUs
de workers.
Se não for possível aumentar o número de partições, insira uma etapa Reshuffle
ou Redistribute
após a etapa de leitura do Kafka. Essa etapa permite que o Dataflow redistribua e paralelize os dados de maneira mais eficiente, mas adiciona uma sobrecarga adicional para realizar a etapa de embaralhamento. Para
mais informações, consulte
Fatores que afetam o paralelismo.
Tente garantir que a carga entre as partições seja relativamente uniforme e não desequilibrada. Se a carga estiver distorcida, isso pode levar a uma utilização inadequada dos workers. Os workers que leem de partições com carga mais leve podem ficar relativamente ociosos, enquanto os que leem de partições com carga pesada podem ficar para trás. O Dataflow oferece métricas para o backlog de cada partição.
Se a carga estiver distorcida, o rebalanceamento de trabalho dinâmico pode ajudar a distribuir o trabalho. Por exemplo, o Dataflow pode alocar um worker para ler de várias partições de baixo volume e alocar outro worker para ler de uma única partição de alto volume. No entanto, dois workers não podem ler da mesma partição. Portanto, uma partição com carga pesada ainda pode fazer com que o pipeline fique para trás.
Práticas recomendadas
Esta seção contém recomendações para leitura do Kafka no Dataflow.
Tópicos de baixo volume
Um cenário comum é ler muitos tópicos de baixo volume ao mesmo tempo, por exemplo, um tópico por cliente. Criar jobs do Dataflow separados para cada tópico não é eficiente em termos de custo, porque cada job exige pelo menos um worker completo. Em vez disso, considere as seguintes opções:
Mesclar tópicos. Combine os tópicos antes de eles serem transferidos para o Dataflow. A ingestão de alguns tópicos de alto volume é muito mais eficiente do que a de muitos tópicos de baixo volume. Cada tópico de alto volume pode ser gerenciado por um único job do Dataflow que utiliza totalmente os workers.
Leia vários tópicos. Se não for possível combinar os tópicos antes de ingerir no Dataflow, crie um pipeline que leia de vários tópicos. Essa abordagem permite que o Dataflow atribua vários tópicos ao mesmo worker. Há duas maneiras de implementar essa abordagem:
Etapa de leitura única. Crie uma única instância do conector
KafkaIO
e configure-a para ler vários tópicos. Em seguida, filtre por nome de tópico para aplicar uma lógica diferente por tópico. Para conferir um exemplo de código, consulte Ler de vários tópicos. Considere essa opção se todos os tópicos estiverem no mesmo cluster. Uma desvantagem é que problemas com um único sink ou transformação podem fazer com que todos os tópicos acumulem pendências.Para casos de uso mais avançados, transmita um conjunto de objetos
KafkaSourceDescriptor
que especifiquem os tópicos a serem lidos. O uso deKafkaSourceDescriptor
permite atualizar a lista de tópicos mais tarde, se necessário. Este recurso requer Java com o Runner v2.Várias etapas de leitura. Para ler de tópicos localizados em clusters diferentes, o pipeline pode incluir várias instâncias de
KafkaIO
. Enquanto o job está em execução, é possível atualizar origens individuais usando mapeamentos de transformação. A configuração de um novo tópico ou cluster só é possível com o Runner v2. A observabilidade é um possível desafio com essa abordagem, porque você precisa monitorar cada transformação de leitura individual em vez de depender de métricas no nível do pipeline.
Confirmação de volta ao Kafka
Por padrão, o conector KafkaIO
não usa os deslocamentos do Kafka para acompanhar o progresso
e não confirma de volta ao Kafka. Se você chamar
commitOffsetsInFinalize
, o conector fará o possível
para confirmar no Kafka depois que os registros forem confirmados no
Dataflow. Os registros confirmados no Dataflow podem não ser
totalmente processados. Portanto, se você
cancelar o pipeline, um deslocamento
poderá ser confirmado sem que os registros sejam totalmente processados.
Como a configuração enable.auto.commit=True
confirma os deslocamentos assim que eles são lidos do
Kafka sem nenhum processamento pelo Dataflow, não é recomendável usar essa opção.
A recomendação é definir enable.auto.commit=False
e
commitOffsetsInFinalize=True
. Se você definir
enable.auto.commit
como True
, os dados poderão ser perdidos se o pipeline for interrompido
durante o processamento. Os registros já confirmados no Kafka podem ser descartados.
Marcas-d'água
Por padrão, o conector KafkaIO
usa o tempo de processamento atual para atribuir
a marca d'água de saída
e o horário do evento. Para mudar esse comportamento, chame
withTimestampPolicyFactory
e atribua um
TimestampPolicy
. O Beam oferece implementações de TimestampPolicy
que calculam a marca d'água com base no tempo de anexação de registro do Kafka ou na criação da mensagem.
Considerações sobre o runner
O conector KafkaIO
tem duas implementações subjacentes para leituras do Kafka, a
mais antiga ReadFromKafkaViaUnbounded
e a mais recente
ReadFromKafkaViaSDF
. O Dataflow
escolhe automaticamente a melhor implementação para seu job com base na linguagem do SDK
e nos requisitos do job. Evite solicitar explicitamente uma implementação de
runner ou Kafka, a menos que você precise de recursos específicos
disponíveis apenas nessa implementação. Para mais informações sobre como escolher um executor, consulte
Usar o Dataflow Runner v2.
Se o pipeline usa withTopic
ou withTopics
,
a implementação mais antiga consulta o Kafka no momento da construção do pipeline para as
partições disponíveis. A máquina que cria o pipeline precisa ter permissão para se conectar ao Kafka. Se você receber um erro de permissão, verifique se tem
permissões para se conectar ao Kafka localmente. Para evitar esse problema, use
withTopicPartitions
, que não se conecta ao Kafka
no momento da construção do pipeline.
Implantar para a produção
Ao implantar a solução na produção, é recomendável usar modelos flexíveis. Ao usar um modelo Flex, o pipeline é iniciado em um ambiente consistente, o que pode ajudar a reduzir problemas de configuração local.
O registro de KafkaIO
pode ser bastante detalhado Considere reduzir o nível de registro
na produção da seguinte maneira:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Para mais informações, consulte Definir níveis de registro do worker do pipeline.
Configurar rede
Por padrão, o Dataflow inicia instâncias na rede padrão de nuvem privada virtual (VPC). Dependendo da configuração do Kafka, talvez seja necessário configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte Especificar uma rede e uma sub-rede. Ao configurar a rede, crie regras de firewall que permitam que as máquinas de worker do Dataflow alcancem os agentes do Kafka.
Se você estiver usando o VPC Service Controls, coloque o cluster do Kafka dentro do perímetro do VPC Service Controls ou estenda os perímetros para a VPN autorizada ou o Cloud Interconnect.
Se o cluster do Kafka for implantado fora do Google Cloud, crie uma conexão de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes vantagens e desvantagens:
- Conecte-se usando um espaço de endereço RFC 1918 compartilhado usando uma das seguintes opções:
- Acesse seu cluster do Kafka hospedado externamente com endereços IP públicos usando uma das seguintes opções:
- Internet pública
- Peering direto
- Peering de operadora
A Interconexão dedicada é a melhor opção para desempenho e confiabilidade previsíveis, mas pode levar mais tempo para configurar porque os novos circuitos precisam ser provisionados por terceiros. Com uma topologia baseada em IP público e pouco trabalho em rede, é possível dar os primeiros passos rapidamente.
As próximas duas seções descrevem essas opções em mais detalhes.
Espaço de endereço do RFC 1918 compartilhado
A Interconexão dedicada e a VPN IPsec oferecem acesso direto aos endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se você estiver usando uma topologia baseada em VPN, configure uma VPN de alta capacidade.
Por padrão, o Dataflow inicia instâncias na
rede VPC padrão. Em uma topologia de rede privada com rotas explicitamente definidas no Cloud Router, que conectam sub-redes do Google Cloud ao cluster do Kafka, você precisa de mais controle sobre onde localizar as instâncias do Dataflow. É possível
usar o Dataflow para configurar os parâmetros de execução
de network
e subnetwork
.
Verifique se a sub-rede correspondente tem endereços IP suficientes disponíveis para que o Dataflow possa iniciar instâncias enquanto tenta escalonar horizontalmente. Além disso, ao criar uma rede separada para iniciar as instâncias do Dataflow, verifique se você tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede padrão já tem essa regra de firewall configurada.
Espaço de endereço IP público
Essa arquitetura usa o Transport Layer Security (TLS, na sigla em inglês) para proteger o tráfego entre clientes externos e o Kafka e usa tráfego não criptografado para comunicação entre agentes. Quando o listener do Kafka se vincula a uma interface de rede usada
para comunicação interna e externa,
a configuração dele é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes do Kafka
no cluster são diferentes das interfaces de rede internas
usadas pelo Kafka. Nesses cenários, é possível usar a propriedade
advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Os clientes externos se conectam pela porta 9093 por um canal "SSL", e os clientes internos
se conectam pela porta 9092 por um canal de texto simples. Ao especificar um endereço em advertised.listeners
,
use nomes de DNS (kafkabroker-n.mydomain.com
, nesta amostra) que resolvem para a mesma instância
para tráfego externo e interno. O uso de endereços IP públicos pode não funcionar porque os endereços podem apresentar falha ao resolver o tráfego interno.
Ajustar o Kafka
As configurações do cluster e do cliente do Kafka podem ter um grande impacto no desempenho. Especificamente, as seguintes configurações podem estar muito baixas: Esta seção dá algumas sugestões de pontos de partida, mas você deve testar esses valores para sua carga de trabalho específica.
unboundedReaderMaxElements
. O padrão é 100. Um valor maior, como 100.000, pode aumentar o tamanho dos pacotes, o que pode melhorar significativamente o desempenho se o pipeline incluir agregações. No entanto, um valor mais alto também pode aumentar a latência. Para definir o valor, usesetUnboundedReaderMaxElements
. Essa configuração não se aplica ao Runner v2.unboundedReaderMaxReadTimeMs
. O padrão é 10.000 milissegundos. Um valor mais alto, como 20.000 mseg, pode aumentar o tamanho do pacote, enquanto um valor mais baixo, como 5.000 mseg, pode reduzir a latência ou o backlog. Para definir o valor, usesetUnboundedReaderMaxReadTimeMs
. Essa configuração não se aplica ao Runner v2.max.poll.records
. O padrão é 500. Um valor mais alto pode ter um desempenho melhor ao recuperar mais registros de entrada juntos, especialmente ao usar o Runner v2. Para definir o valor, chamewithConsumerConfigUpdates
.fetch.max.bytes
. O padrão é 1MB. Um valor maior pode melhorar a capacidade de processamento reduzindo o número de solicitações, especialmente ao usar o Runner v2. No entanto, definir um valor muito alto pode aumentar a latência, embora o processamento downstream seja mais provável de ser o gargalo principal. O valor inicial recomendado é 100 MB. Para definir o valor, chamewithConsumerConfigUpdates
.max.partition.fetch.bytes
. O padrão é 1MB. Esse parâmetro define a quantidade máxima de dados por partição que o servidor retorna. O aumento do valor pode melhorar a capacidade de processamento, reduzindo o número de solicitações, especialmente ao usar o Runner v2. No entanto, definir um valor muito alto pode aumentar a latência, embora o processamento downstream seja mais provável de ser o gargalo principal. O valor inicial recomendado é 100 MB. Para definir o valor, chamewithConsumerConfigUpdates
.consumerPollingTimeout
. O padrão é 2 segundos. Se o tempo do cliente expirar antes que ele possa ler os registros, tente definir um valor maior. Essa configuração é mais relevante ao realizar leituras entre regiões ou leituras com uma rede lenta. Para definir o valor, chamewithConsumerPollingTimeout
.
Verifique se receive.buffer.bytes
é grande o suficiente para processar o tamanho das
mensagens. Se o valor for muito pequeno, os registros poderão mostrar que os consumidores estão
sendo recriados continuamente e buscando um deslocamento específico.
Exemplos
Os exemplos de código abaixo mostram como criar pipelines do Dataflow que leem do Kafka.
Ler em um único tópico
Este exemplo lê de um tópico do Kafka e grava os payloads de mensagem em arquivos de texto.
Java
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Python
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Ler de vários tópicos
Este exemplo lê de vários tópicos do Kafka e aplica uma lógica de pipeline separada para cada tópico.
Para casos de uso mais avançados, transmita dinamicamente um conjunto de
objetos KafkaSourceDescriptor
para atualizar
a lista de tópicos a serem lidos. Essa abordagem requer Java com o Runner v2.
Python
Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.