Leer de Apache Kafka a Dataflow

En este documento se describe cómo leer datos de Apache Kafka en Dataflow y se incluyen consejos y prácticas recomendadas para mejorar el rendimiento.

En la mayoría de los casos prácticos, te recomendamos que uses el conector de entrada/salida gestionado para leer datos de Kafka.

Si necesitas una optimización del rendimiento más avanzada, te recomendamos que uses el conector KafkaIO. El conector KafkaIO está disponible para Java o mediante el framework de las canalizaciones multilingüe para Python y Go.

Paralelismo

En las siguientes secciones se describe cómo configurar el paralelismo al leer datos de Kafka.

Información general

El paralelismo está limitado por dos factores: el número máximo de trabajadores (max_num_workers) y el número de particiones de Kafka. Dataflow utiliza de forma predeterminada un factor de ramificación de paralelismo de 4 × max_num_workers. Sin embargo, la distribución se limita al número de particiones. Por ejemplo, si hay 100 vCPUs disponibles, pero la canalización solo lee de 10 particiones de Kafka, el paralelismo máximo es 10.

Para maximizar el paralelismo, se recomienda tener al menos 4 particiones de Kafka max_num_workers. Si tu trabajo usa Runner v2, plantéate aumentar aún más el paralelismo. Un buen punto de partida es tener particiones iguales al doble del número de vCPUs de los trabajadores.

Redistribuir

Si no puedes aumentar el número de particiones, puedes aumentar el paralelismo llamando a KafkaIO.Read.withRedistribute. Este método añade una transformación Redistribute a la canalización, que proporciona una pista a Dataflow para redistribuir y paralelizar los datos de forma más eficiente. Te recomendamos que especifiques el número óptimo de particiones llamando a KafkaIO.Read.withRedistributeNumKeys. Si solo usas KafkaIO.Read.withRedistribute, se pueden generar numerosas claves, lo que puede provocar problemas de rendimiento. Para obtener más información, consulta Identificar fases con un alto paralelismo. Redistribuir los datos añade una sobrecarga adicional para realizar el paso de aleatorización. Para obtener más información, consulta Evitar la fusión.

Para minimizar el coste de la redistribución aleatoria, llama a KafkaIO.Read.withOffsetDeduplication. Este modo minimiza la cantidad de datos que se deben conservar como parte del shuffle, al tiempo que proporciona un procesamiento exactamente una vez.

Si no es necesario que el procesamiento sea exacto, puedes permitir duplicados llamando a KafkaIO.Read.withAllowDuplicates.

En la siguiente tabla se resumen las opciones de redistribución:

Opción Modo de procesamiento Apache Beam Configuración
Redistribuir entrada Exactamente una vez v2.60 o versiones posteriores KafkaIO.Read.withRedistribute()
Permitir duplicados Al menos una vez v2.60 o versiones posteriores KafkaIO.Read.withRedistribute().withAllowDuplicates()
Anulación de duplicación por desplazamiento Exactamente una vez v2.65 o versiones posteriores KafkaIO.Read.withRedistribute().withOffsetDeduplication()

Desviación de carga

Intenta que la carga entre particiones sea relativamente uniforme y no esté sesgada. Si la carga está sesgada, puede provocar que los trabajadores no se utilicen de forma adecuada. Los trabajadores que leen de particiones con una carga más ligera pueden estar relativamente inactivos, mientras que los que leen de particiones con una carga pesada pueden quedarse atrás. Dataflow proporciona métricas de backlog por partición.

Si la carga está sesgada, el balanceo de carga dinámico puede ayudar a distribuir el trabajo. Por ejemplo, Dataflow puede asignar un trabajador para leer de varias particiones de poco volumen y otro trabajador para leer de una sola partición de gran volumen. Sin embargo, dos workers no pueden leer de la misma partición, por lo que una partición con mucha carga puede provocar que la canalización se retrase.

Prácticas recomendadas

En esta sección se incluyen recomendaciones para leer datos de Kafka en Dataflow.

Temas con poco volumen

Un caso habitual es leer de muchos temas de poco volumen al mismo tiempo, por ejemplo, un tema por cliente. Crear tareas de Dataflow independientes para cada tema no es rentable, ya que cada tarea requiere al menos un trabajador completo. En su lugar, considera las siguientes opciones:

  • Combinar temas. Combina temas antes de que se ingieran en Dataflow. Ingerir unos pocos temas de gran volumen es mucho más eficiente que ingerir muchos temas de poco volumen. Cada tema de gran volumen puede gestionarse con una sola tarea de Dataflow que utilice todos sus trabajadores.

  • Leer varios temas. Si no puedes combinar temas antes de ingerirlos en Dataflow, considera la posibilidad de crear una canalización que lea de varios temas. Este enfoque permite que Dataflow asigne varios temas al mismo trabajador. Hay dos formas de implementar este enfoque:

    • Paso de lectura único. Crea una sola instancia del conector KafkaIO y configúrala para que lea varios temas. A continuación, filtra por nombre de tema para aplicar una lógica diferente a cada tema. Para ver un código de ejemplo, consulta Leer de varios temas. Considera esta opción si todos tus temas están ubicados en el mismo clúster. Un inconveniente es que los problemas con un único receptor o transformación pueden provocar que todos los temas acumulen un retraso.

      Para casos prácticos más avanzados, envía un conjunto de objetos KafkaSourceDescriptor que especifiquen los temas de los que se va a leer. Con KafkaSourceDescriptor puedes actualizar la lista de temas más adelante si es necesario. Esta función requiere Java con Runner v2.

    • Leer varios pasos. Para leer temas ubicados en diferentes clústeres, tu canal puede incluir varias instancias de KafkaIO. Mientras el trabajo se está ejecutando, puedes actualizar fuentes concretas mediante mapeados de transformación. Solo se admite la configuración de un nuevo tema o clúster cuando se usa Runner v2. La observabilidad es un posible problema de este enfoque, ya que debes monitorizar cada transformación de lectura en lugar de depender de las métricas a nivel de la canalización.

Volver a enviar a Kafka

De forma predeterminada, el conector KafkaIO no usa desplazamientos de Kafka para monitorizar el progreso y no vuelve a Kafka. Si llamas a commitOffsetsInFinalize, el conector hará todo lo posible para volver a Kafka después de que se hayan confirmado los registros en Dataflow. Es posible que los registros confirmados en Dataflow no se procesen por completo, por lo que, si cancelas la canalización, se puede confirmar un desplazamiento sin que los registros se procesen por completo.

Como enable.auto.commit=True confirma los desplazamientos en cuanto se leen de Kafka sin que Dataflow los procese, no se recomienda usar esta opción. Se recomienda definir tanto enable.auto.commit=False como commitOffsetsInFinalize=True. Si asignas el valor True a enable.auto.commit, se pueden perder datos si la canalización se interrumpe durante el procesamiento. Es posible que se eliminen los registros ya confirmados en Kafka.

Marcas de agua

De forma predeterminada, el conector KafkaIO usa el tiempo de procesamiento actual para asignar la marca de agua de salida y la hora del evento. Para cambiar este comportamiento, llama a withTimestampPolicyFactory y asigna un TimestampPolicy. Beam proporciona implementaciones de TimestampPolicy que calculan la marca de agua en función de la hora de anexión del registro de Kafka o de la hora de creación del mensaje.

Consideraciones sobre el corredor

El conector KafkaIO tiene dos implementaciones subyacentes para las lecturas de Kafka: la versión antigua ReadFromKafkaViaUnbounded y la más reciente ReadFromKafkaViaSDF. Dataflow elige automáticamente la mejor implementación para tu tarea en función del lenguaje del SDK y los requisitos de la tarea. Evita solicitar explícitamente un ejecutor o una implementación de Kafka, a menos que necesites funciones específicas que solo estén disponibles en esa implementación. Para obtener más información sobre cómo elegir un runner, consulta el artículo sobre cómo usar Dataflow Runner v2.

Si tu canal usa withTopic o withTopics, la implementación anterior consulta Kafka en el momento de la creación del canal para obtener las particiones disponibles. La máquina que crea la canalización debe tener permiso para conectarse a Kafka. Si recibes un error de permiso, comprueba que tienes permisos para conectarte a Kafka de forma local. Puedes evitar este problema usando withTopicPartitions, que no se conecta a Kafka durante la creación de la canalización.

Desplegar en producción

Cuando implementes tu solución en producción, te recomendamos que uses plantillas flexibles. Si usas una plantilla flexible, la canalización se inicia desde un entorno coherente, lo que puede ayudar a mitigar los problemas de configuración local.

Los registros de KafkaIO pueden ser bastante detallados. Plantéate reducir el nivel de registro en producción de la siguiente manera:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Para obtener más información, consulta Definir niveles de registro de los elementos de trabajo de la canalización.

Configurar redes

De forma predeterminada, Dataflow inicia instancias en tu red de nube privada virtual (VPC) predeterminada. Según tu configuración de Kafka, es posible que tengas que configurar una red y una subred diferentes para Dataflow. Para obtener más información, consulta Especificar una red y una subred. Cuando configures tu red, crea reglas de cortafuegos que permitan que las máquinas de trabajador de Dataflow accedan a los brokers de Kafka.

Si usas Controles de Servicio de VPC, coloca el clúster de Kafka dentro del perímetro de Controles de Servicio de VPC. De lo contrario, amplía los perímetros a la VPN o a Cloud Interconnect autorizados.

Si tu clúster de Kafka se ha desplegado fuera de Google Cloud, debes crear una conexión de red entre Dataflow y el clúster de Kafka. Hay varias opciones de redes con diferentes ventajas e inconvenientes:

La interconexión dedicada es la mejor opción para disfrutar de un rendimiento y una fiabilidad predecibles, pero puede tardar más en configurarse porque terceros deben aprovisionar los nuevos circuitos. Con una topología basada en IPs públicas, puedes empezar rápidamente porque no es necesario hacer mucho trabajo de redes.

En las dos secciones siguientes se describen estas opciones con más detalle.

Espacio de direcciones RFC 1918 compartido

Tanto Dedicated Interconnect como la VPN IPsec te ofrecen acceso directo a las direcciones IP RFC 1918 de tu nube privada virtual (VPC), lo que puede simplificar tu configuración de Kafka. Si utilizas una topología basada en VPN, te recomendamos que configures una VPN de alto rendimiento.

De forma predeterminada, Dataflow inicia instancias en tu red de VPC predeterminada. En una topología de red privada con rutas definidas explícitamente en Cloud Router que conectan subredes en Google Cloud con ese clúster de Kafka, necesitas más control sobre dónde ubicar tus instancias de Dataflow. Puedes usar Dataflow para configurar los network y subnetwork parámetros de ejecución.

Asegúrate de que la subred correspondiente tenga suficientes direcciones IP disponibles para que Dataflow pueda iniciar instancias cuando intente ampliar la capacidad. Además, cuando crees una red independiente para iniciar tus instancias de Dataflow, asegúrate de que tienes una regla de cortafuegos que habilite el tráfico TCP entre todas las máquinas virtuales del proyecto. La red predeterminada ya tiene configurada esta regla de cortafuegos.

Espacio de direcciones IP públicas

Esta arquitectura usa Seguridad en la capa de transporte (TLS) para proteger el tráfico entre clientes externos y Kafka, y usa tráfico sin cifrar para la comunicación entre brokers. Cuando el receptor de Kafka se vincula a una interfaz de red que se usa tanto para la comunicación interna como para la externa, configurar el receptor es sencillo. Sin embargo, en muchos casos, las direcciones anunciadas externamente de los brokers de Kafka del clúster difieren de las interfaces de red internas que usa Kafka. En estos casos, puedes usar la propiedad advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Los clientes externos se conectan mediante el puerto 9093 a través de un canal "SSL", y los clientes internos se conectan mediante el puerto 9092 a través de un canal de texto sin formato. Cuando especifiques una dirección en advertised.listeners, usa nombres de DNS (kafkabroker-n.mydomain.com en este ejemplo) que se resuelvan en la misma instancia para el tráfico externo e interno. Es posible que el uso de direcciones IP públicas no funcione porque las direcciones no se resuelvan para el tráfico interno.

Ajustar Kafka

La configuración de tu clúster y cliente de Kafka puede influir mucho en el rendimiento. En concreto, es posible que los siguientes ajustes sean demasiado bajos. En esta sección se ofrecen algunos puntos de partida sugeridos, pero debes experimentar con estos valores para tu carga de trabajo concreta.

  • unboundedReaderMaxElements. El valor predeterminado es 10.000. Un valor más alto, como 100.000, puede aumentar el tamaño de los paquetes, lo que puede mejorar el rendimiento significativamente si tu canalización incluye agregaciones. Sin embargo, un valor más alto también puede aumentar la latencia. Para definir el valor, usa setUnboundedReaderMaxElements. Este ajuste no se aplica a Runner v2. En Runner v2, usa la opción de servicio sdf_checkpoint_after_output_bytes Dataflow.

  • unboundedReaderMaxReadTimeMs. El valor predeterminado es 10.000 ms. Un valor más alto, como 20.000 ms, puede aumentar el tamaño del paquete, mientras que un valor más bajo, como 5000 ms, puede reducir la latencia o el trabajo pendiente. Para definir el valor, usa setUnboundedReaderMaxReadTimeMs. Este ajuste no se aplica a Runner v2. En Runner v2, usa la opción de servicio sdf_checkpoint_after_duration Dataflow.

  • max.poll.records. El valor predeterminado es 500. Un valor más alto puede dar mejores resultados al recuperar más registros entrantes a la vez, sobre todo si se usa Runner v2. Para definir el valor, llama a withConsumerConfigUpdates.

  • fetch.max.bytes. El valor predeterminado es 1 MB. Un valor más alto puede mejorar el rendimiento al reducir el número de solicitudes, sobre todo si se usa Runner v2. Sin embargo, si lo configuras con un valor demasiado alto, puede aumentar la latencia, aunque es más probable que el cuello de botella principal se deba al procesamiento posterior. El valor inicial recomendado es 100 MB. Para definir el valor, llama a withConsumerConfigUpdates.

  • max.partition.fetch.bytes. El valor predeterminado es 1 MB. Este parámetro define la cantidad máxima de datos por partición que devuelve el servidor. Si aumenta el valor, puede mejorar el rendimiento, ya que se reduce el número de solicitudes, sobre todo si usa Runner v2. Sin embargo, si lo configuras con un valor demasiado alto, puede aumentar la latencia, aunque es más probable que el cuello de botella principal sea el procesamiento de descarga. El valor inicial recomendado es 100 MB. Para definir el valor, llama a withConsumerConfigUpdates.

  • consumerPollingTimeout. El valor predeterminado es 2 segundos. Si el cliente de consumidor agota el tiempo de espera antes de poder leer ningún registro, prueba a definir un valor más alto. Este ajuste suele ser relevante cuando se realizan lecturas entre regiones o lecturas con una red lenta. Para definir el valor, llama a withConsumerPollingTimeout.

Asegúrate de que receive.buffer.bytes sea lo suficientemente grande para gestionar el tamaño de los mensajes. Si el valor es demasiado pequeño, es posible que los registros muestren que los consumidores se vuelven a crear continuamente y buscan un desplazamiento específico.

Ejemplos

En los siguientes ejemplos de código se muestra cómo crear canalizaciones de Dataflow que leen datos de Kafka. Cuando se usan las credenciales predeterminadas de la aplicación junto con el gestor de retrollamada proporcionado por Google Cloud Managed Service para Apache Kafka, se requiere kafka-clients versión 3.7.0 o posterior.

Leer sobre un solo tema

En este ejemplo se usa el conector de entrada/salida gestionado. Muestra cómo leer de un tema de Kafka y escribir las cargas útiles de los mensajes en archivos de texto.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | beam.managed.Read(
                beam.managed.KAFKA,
                config={
                  "bootstrap_servers": options.bootstrap_server,
                  "topic": options.topic,
                  "data_format": "RAW",
                  "auto_offset_reset_config": "earliest",
                  # The max_read_time_seconds parameter is intended for testing.
                  # Avoid using this parameter in production.
                  "max_read_time_seconds": 5
                }
            )
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Leer de varios temas

En este ejemplo se usa el conector KafkaIO. Muestra cómo leer de varios temas de Kafka y aplicar una lógica de canalización independiente para cada tema.

Para casos prácticos más avanzados, envía dinámicamente un conjunto de objetos KafkaSourceDescriptor para que puedas actualizar la lista de temas de los que leer. Este enfoque requiere Java con Runner v2.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

Siguientes pasos