Ler do Apache Kafka para o Dataflow

Este documento descreve como ler dados do Apache Kafka para o Dataflow.

O conector de E/S do Apache Beam Kafka (KafkaIO) está disponível de forma nativa para Java e também para Python e Go usando o framework de pipelines em vários idiomas do Apache Beam.

Para pipelines em Java, use o conector de E/S gerenciado para ler do Kafka.

Paralelismo

O paralelismo é limitado por dois fatores: o número máximo de workers (max_num_workers) e o número de partições do Kafka. O Dataflow usa como padrão um fanout de paralelismo de 4 x max_num_workers. No entanto, o fanout é limitado pelo número de partições. Por exemplo, se 100 vCPUs estiverem disponíveis, mas o pipeline só ler de 10 partições do Kafka, o paralelismo máximo será 10.

Para maximizar o paralelismo, é recomendável ter pelo menos 4 x partições do max_num_workers Kafka. Se o job usar o Runner v2, considere definir o paralelismo ainda mais alto. Um bom ponto de partida é ter partições iguais ao dobro do número de vCPUs de workers.

Se não for possível aumentar o número de partições, insira uma etapa Reshuffle ou Redistribute após a etapa de leitura do Kafka. Essa etapa permite que o Dataflow redistribua e paralelize os dados de maneira mais eficiente, mas adiciona uma sobrecarga adicional para realizar a etapa de embaralhamento. Para mais informações, consulte Fatores que afetam o paralelismo.

Tente garantir que a carga entre as partições seja relativamente uniforme e não desequilibrada. Se a carga estiver distorcida, isso pode levar a uma utilização inadequada dos workers. Os workers que leem de partições com carga mais leve podem ficar relativamente ociosos, enquanto os que leem de partições com carga pesada podem ficar para trás. O Dataflow oferece métricas para o backlog de cada partição.

Se a carga estiver distorcida, o rebalanceamento de trabalho dinâmico pode ajudar a distribuir o trabalho. Por exemplo, o Dataflow pode alocar um worker para ler de várias partições de baixo volume e alocar outro worker para ler de uma única partição de alto volume. No entanto, dois workers não podem ler da mesma partição. Portanto, uma partição com carga pesada ainda pode fazer com que o pipeline fique para trás.

Práticas recomendadas

Esta seção contém recomendações para leitura do Kafka no Dataflow.

Tópicos de baixo volume

Um cenário comum é ler muitos tópicos de baixo volume ao mesmo tempo, por exemplo, um tópico por cliente. Criar jobs do Dataflow separados para cada tópico não é eficiente em termos de custo, porque cada job exige pelo menos um worker completo. Em vez disso, considere as seguintes opções:

  • Mesclar tópicos. Combine os tópicos antes de eles serem transferidos para o Dataflow. A ingestão de alguns tópicos de alto volume é muito mais eficiente do que a de muitos tópicos de baixo volume. Cada tópico de alto volume pode ser gerenciado por um único job do Dataflow que utiliza totalmente os workers.

  • Leia vários tópicos. Se não for possível combinar os tópicos antes de ingerir no Dataflow, crie um pipeline que leia de vários tópicos. Essa abordagem permite que o Dataflow atribua vários tópicos ao mesmo worker. Há duas maneiras de implementar essa abordagem:

    • Etapa de leitura única. Crie uma única instância do conector KafkaIO e configure-a para ler vários tópicos. Em seguida, filtre por nome de tópico para aplicar uma lógica diferente por tópico. Para conferir um exemplo de código, consulte Ler de vários tópicos. Considere essa opção se todos os tópicos estiverem no mesmo cluster. Uma desvantagem é que problemas com um único sink ou transformação podem fazer com que todos os tópicos acumulem pendências.

      Para casos de uso mais avançados, transmita um conjunto de objetos KafkaSourceDescriptor que especifiquem os tópicos a serem lidos. O uso de KafkaSourceDescriptor permite atualizar a lista de tópicos mais tarde, se necessário. Este recurso requer Java com o Runner v2.

    • Várias etapas de leitura. Para ler de tópicos localizados em clusters diferentes, o pipeline pode incluir várias instâncias de KafkaIO. Enquanto o job está em execução, é possível atualizar origens individuais usando mapeamentos de transformação. A configuração de um novo tópico ou cluster só é possível com o Runner v2. A observabilidade é um possível desafio com essa abordagem, porque você precisa monitorar cada transformação de leitura individual em vez de depender de métricas no nível do pipeline.

Confirmação de volta ao Kafka

Por padrão, o conector KafkaIO não usa os deslocamentos do Kafka para acompanhar o progresso e não confirma de volta ao Kafka. Se você chamar commitOffsetsInFinalize, o conector fará o possível para confirmar no Kafka depois que os registros forem confirmados no Dataflow. Os registros confirmados no Dataflow podem não ser totalmente processados. Portanto, se você cancelar o pipeline, um deslocamento poderá ser confirmado sem que os registros sejam totalmente processados.

Como a configuração enable.auto.commit=True confirma os deslocamentos assim que eles são lidos do Kafka sem nenhum processamento pelo Dataflow, não é recomendável usar essa opção. A recomendação é definir enable.auto.commit=False e commitOffsetsInFinalize=True. Se você definir enable.auto.commit como True, os dados poderão ser perdidos se o pipeline for interrompido durante o processamento. Os registros já confirmados no Kafka podem ser descartados.

Marcas-d'água

Por padrão, o conector KafkaIO usa o tempo de processamento atual para atribuir a marca d'água de saída e o horário do evento. Para mudar esse comportamento, chame withTimestampPolicyFactory e atribua um TimestampPolicy. O Beam oferece implementações de TimestampPolicy que calculam a marca d'água com base no tempo de anexação de registro do Kafka ou na criação da mensagem.

Considerações sobre o runner

O conector KafkaIO tem duas implementações subjacentes para leituras do Kafka, a mais antiga ReadFromKafkaViaUnbounded e a mais recente ReadFromKafkaViaSDF. O Dataflow escolhe automaticamente a melhor implementação para seu job com base na linguagem do SDK e nos requisitos do job. Evite solicitar explicitamente uma implementação de runner ou Kafka, a menos que você precise de recursos específicos disponíveis apenas nessa implementação. Para mais informações sobre como escolher um executor, consulte Usar o Dataflow Runner v2.

Se o pipeline usa withTopic ou withTopics, a implementação mais antiga consulta o Kafka no momento da construção do pipeline para as partições disponíveis. A máquina que cria o pipeline precisa ter permissão para se conectar ao Kafka. Se você receber um erro de permissão, verifique se tem permissões para se conectar ao Kafka localmente. Para evitar esse problema, use withTopicPartitions, que não se conecta ao Kafka no momento da construção do pipeline.

Implantar para a produção

Ao implantar a solução na produção, é recomendável usar modelos flexíveis. Ao usar um modelo Flex, o pipeline é iniciado em um ambiente consistente, o que pode ajudar a reduzir problemas de configuração local.

O registro de KafkaIO pode ser bastante detalhado Considere reduzir o nível de registro na produção da seguinte maneira:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Para mais informações, consulte Definir níveis de registro do worker do pipeline.

Configurar rede

Por padrão, o Dataflow inicia instâncias na rede padrão de nuvem privada virtual (VPC). Dependendo da configuração do Kafka, talvez seja necessário configurar uma rede e uma sub-rede diferentes para o Dataflow. Para mais informações, consulte Especificar uma rede e uma sub-rede. Ao configurar a rede, crie regras de firewall que permitam que as máquinas de worker do Dataflow alcancem os agentes do Kafka.

Se você estiver usando o VPC Service Controls, coloque o cluster do Kafka dentro do perímetro do VPC Service Controls ou estenda os perímetros para a VPN autorizada ou o Cloud Interconnect.

Se o cluster do Kafka for implantado fora do Google Cloud, crie uma conexão de rede entre o Dataflow e o cluster do Kafka. Existem várias opções de rede com diferentes vantagens e desvantagens:

A Interconexão dedicada é a melhor opção para desempenho e confiabilidade previsíveis, mas pode levar mais tempo para configurar porque os novos circuitos precisam ser provisionados por terceiros. Com uma topologia baseada em IP público e pouco trabalho em rede, é possível dar os primeiros passos rapidamente.

As próximas duas seções descrevem essas opções em mais detalhes.

Espaço de endereço do RFC 1918 compartilhado

A Interconexão dedicada e a VPN IPsec oferecem acesso direto aos endereços IP RFC 1918 na sua nuvem privada virtual (VPC), o que pode simplificar a configuração do Kafka. Se você estiver usando uma topologia baseada em VPN, configure uma VPN de alta capacidade.

Por padrão, o Dataflow inicia instâncias na rede VPC padrão. Em uma topologia de rede privada com rotas explicitamente definidas no Cloud Router, que conectam sub-redes do Google Cloud ao cluster do Kafka, você precisa de mais controle sobre onde localizar as instâncias do Dataflow. É possível usar o Dataflow para configurar os parâmetros de execução de network e subnetwork.

Verifique se a sub-rede correspondente tem endereços IP suficientes disponíveis para que o Dataflow possa iniciar instâncias enquanto tenta escalonar horizontalmente. Além disso, ao criar uma rede separada para iniciar as instâncias do Dataflow, verifique se você tem uma regra de firewall que ativa o tráfego TCP entre todas as máquinas virtuais no projeto. A rede padrão já tem essa regra de firewall configurada.

Espaço de endereço IP público

Essa arquitetura usa o Transport Layer Security (TLS, na sigla em inglês) para proteger o tráfego entre clientes externos e o Kafka e usa tráfego não criptografado para comunicação entre agentes. Quando o listener do Kafka se vincula a uma interface de rede usada para comunicação interna e externa, a configuração dele é simples. No entanto, em muitos cenários, os endereços anunciados externamente dos agentes do Kafka no cluster são diferentes das interfaces de rede internas usadas pelo Kafka. Nesses cenários, é possível usar a propriedade advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Os clientes externos se conectam pela porta 9093 por um canal "SSL", e os clientes internos se conectam pela porta 9092 por um canal de texto simples. Ao especificar um endereço em advertised.listeners, use nomes de DNS (kafkabroker-n.mydomain.com, nesta amostra) que resolvem para a mesma instância para tráfego externo e interno. O uso de endereços IP públicos pode não funcionar porque os endereços podem apresentar falha ao resolver o tráfego interno.

Ajustar o Kafka

As configurações do cluster e do cliente do Kafka podem ter um grande impacto no desempenho. Especificamente, as seguintes configurações podem estar muito baixas: Esta seção dá algumas sugestões de pontos de partida, mas você deve testar esses valores para sua carga de trabalho específica.

  • unboundedReaderMaxElements. O padrão é 100. Um valor maior, como 100.000, pode aumentar o tamanho dos pacotes, o que pode melhorar significativamente o desempenho se o pipeline incluir agregações. No entanto, um valor mais alto também pode aumentar a latência. Para definir o valor, use setUnboundedReaderMaxElements. Essa configuração não se aplica ao Runner v2.

  • unboundedReaderMaxReadTimeMs. O padrão é 10.000 milissegundos. Um valor mais alto, como 20.000 mseg, pode aumentar o tamanho do pacote, enquanto um valor mais baixo, como 5.000 mseg, pode reduzir a latência ou o backlog. Para definir o valor, use setUnboundedReaderMaxReadTimeMs. Essa configuração não se aplica ao Runner v2.

  • max.poll.records. O padrão é 500. Um valor mais alto pode ter um desempenho melhor ao recuperar mais registros de entrada juntos, especialmente ao usar o Runner v2. Para definir o valor, chame withConsumerConfigUpdates.

  • fetch.max.bytes. O padrão é 1MB. Um valor maior pode melhorar a capacidade de processamento reduzindo o número de solicitações, especialmente ao usar o Runner v2. No entanto, definir um valor muito alto pode aumentar a latência, embora o processamento downstream seja mais provável de ser o gargalo principal. O valor inicial recomendado é 100 MB. Para definir o valor, chame withConsumerConfigUpdates.

  • max.partition.fetch.bytes. O padrão é 1MB. Esse parâmetro define a quantidade máxima de dados por partição que o servidor retorna. O aumento do valor pode melhorar a capacidade de processamento, reduzindo o número de solicitações, especialmente ao usar o Runner v2. No entanto, definir um valor muito alto pode aumentar a latência, embora o processamento downstream seja mais provável de ser o gargalo principal. O valor inicial recomendado é 100 MB. Para definir o valor, chame withConsumerConfigUpdates.

  • consumerPollingTimeout. O padrão é 2 segundos. Se o tempo do cliente expirar antes que ele possa ler os registros, tente definir um valor maior. Essa configuração é mais relevante ao realizar leituras entre regiões ou leituras com uma rede lenta. Para definir o valor, chame withConsumerPollingTimeout.

Verifique se receive.buffer.bytes é grande o suficiente para processar o tamanho das mensagens. Se o valor for muito pequeno, os registros poderão mostrar que os consumidores estão sendo recriados continuamente e buscando um deslocamento específico.

Exemplos

Os exemplos de código abaixo mostram como criar pipelines do Dataflow que leem do Kafka.

Ler em um único tópico

Este exemplo lê de um tópico do Kafka e grava os payloads de mensagem em arquivos de texto.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Ler de vários tópicos

Este exemplo lê de vários tópicos do Kafka e aplica uma lógica de pipeline separada para cada tópico.

Para casos de uso mais avançados, transmita dinamicamente um conjunto de objetos KafkaSourceDescriptor para atualizar a lista de tópicos a serem lidos. Essa abordagem requer Java com o Runner v2.

Python

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))