Lee desde Apache Kafka a Dataflow

En este documento, se describe cómo leer datos de Apache Kafka a Dataflow.

El conector de E/S de Apache Beam Kafka (KafkaIO) está disponible de forma nativa para Java y también para Python y Go con el framework de canalizaciones de varios lenguajes de Apache Beam.

Para las canalizaciones de Java, considera usar el conector de E/S administrado para leer desde Kafka.

Paralelismo

El paralelismo está limitado por dos factores: la cantidad máxima de trabajadores (max_num_workers) y la cantidad de particiones de Kafka. Dataflow usa de forma predeterminada un fan-out de paralelismo de 4 × max_num_workers. Sin embargo, el fanout está limitado por la cantidad de particiones. Por ejemplo, si hay 100 CPU virtuales disponibles, pero la canalización solo lee de 10 particiones de Kafka, el paralelismo máximo es 10.

Para maximizar el paralelismo, se recomienda tener al menos 4 particiones de Kafka max_num_workers. Si tu trabajo usa Runner v2, considera establecer un paralelismo aún más alto. Un buen punto de partida es tener particiones iguales al doble de la cantidad de CPU virtuales de trabajador.

Si no puedes aumentar la cantidad de particiones, considera insertar un paso Reshuffle o Redistribute después del paso de lectura de Kafka. Este paso permite que Dataflow redistribuya y paralelice los datos de manera más eficiente, pero agrega una sobrecarga adicional para realizar el paso de mezcla. Para obtener más información, consulta Factores que afectan el paralelismo.

Intenta asegurarte de que la carga entre las particiones sea relativamente uniforme y no esté sesgada. Si la carga está sesgada, puede provocar un uso deficiente de los trabajadores. Los trabajadores que leen de particiones con una carga más ligera pueden estar relativamente inactivos, mientras que los que leen de particiones con una carga pesada pueden quedarse atrás. Dataflow proporciona métricas para el retraso por partición.

Si la carga está sesgada, el balanceo dinámico de trabajo puede ayudar a distribuir el trabajo. Por ejemplo, Dataflow podría asignar un trabajador para leer de varias particiones de bajo volumen y otro para leer de una sola partición de alto volumen. Sin embargo, dos trabajadores no pueden leer de la misma partición, por lo que una partición con mucha carga puede hacer que la canalización se retrase.

Prácticas recomendadas

En esta sección, se incluyen recomendaciones para leer desde Kafka en Dataflow.

Temas de volumen bajo

Una situación común es leer de muchos temas de bajo volumen al mismo tiempo, por ejemplo, un tema por cliente. Crear trabajos de Dataflow independientes para cada tema no es rentable, ya que cada trabajo requiere al menos un trabajador completo. En su lugar, considera las siguientes opciones:

  • Combinar temas Combina temas antes de que se transfieran a Dataflow. Transferir algunos temas de gran volumen es mucho más eficiente que transferir muchos temas de bajo volumen. Cada tema de alto volumen puede controlarse con un solo trabajo de Dataflow que utilice por completo sus trabajadores.

  • Lee varios temas. Si no puedes combinar temas antes de transferirlos a Dataflow, considera crear una canalización que lea de varios temas. Este enfoque permite que Dataflow asigne varios temas al mismo trabajador. Existen dos maneras de implementar este enfoque:

    • Paso de lectura único: Crea una sola instancia del conector KafkaIO y configúralo para que lea varios temas. Luego, filtra por nombre de tema para aplicar una lógica diferente por tema. Para obtener un código de ejemplo, consulta Lee de varios temas. Considera esta opción si todos tus temas se encuentran en el mismo clúster. Una desventaja es que los problemas con un solo sink o transformación pueden hacer que todos los temas acumulen un backlog.

      Para casos de uso más avanzados, pasa un conjunto de objetos KafkaSourceDescriptor que especifiquen los temas de los que se debe leer. El uso de KafkaSourceDescriptor te permite actualizar la lista de temas más adelante si es necesario. Esta función requiere Java con Runner v2.

    • Varios pasos de lectura: Para leer de temas ubicados en diferentes clústeres, tu canalización puede incluir varias instancias de KafkaIO. Mientras se ejecuta el trabajo, puedes actualizar fuentes individuales con asignaciones de transformación. La configuración de un tema o clúster nuevo solo se admite cuando se usa Runner v2. La observabilidad es un posible desafío con este enfoque, ya que debes supervisar cada transformación de lectura individual en lugar de depender de métricas a nivel de la canalización.

Confirmación en Kafka

De forma predeterminada, el conector KafkaIO no usa los desfases de Kafka para hacer un seguimiento del progreso ni vuelve a confirmar en Kafka. Si llamas a commitOffsetsInFinalize, el conector hace un mejor esfuerzo para confirmar en Kafka después de que se confirman los registros en Dataflow. Es posible que los registros confirmados en Dataflow no se procesen por completo, por lo que, si cancelas la canalización, es posible que se confirme un desplazamiento sin que los registros se procesen por completo.

Debido a que la configuración de enable.auto.commit=True confirma los desfases en cuanto se leen de Kafka sin ningún procesamiento de Dataflow, no se recomienda usar esta opción. Se recomienda configurar enable.auto.commit=False y commitOffsetsInFinalize=True. Si configuras enable.auto.commit en True, se pueden perder datos si se interrumpe la canalización durante el procesamiento. Es posible que se descarten los registros que ya se hayan confirmado en Kafka.

Marcas de agua

De forma predeterminada, el conector KafkaIO usa el tiempo de procesamiento actual para asignar la marca de agua de salida y la hora del evento. Para cambiar este comportamiento, llama a withTimestampPolicyFactory y asigna un TimestampPolicy. Beam proporciona implementaciones de TimestampPolicy que calculan la marca de agua en función del tiempo de inserción de registros de Kafka o de la hora de creación del mensaje.

Consideraciones para el corredor

El conector KafkaIO tiene dos implementaciones subyacentes para las operaciones de lectura de Kafka: la más antigua ReadFromKafkaViaUnbounded y la más reciente ReadFromKafkaViaSDF. Dataflow elige automáticamente la mejor implementación para tu trabajo según el lenguaje de tu SDK y los requisitos del trabajo. Evita solicitar explícitamente un ejecutor o una implementación de Kafka, a menos que necesites funciones específicas que solo estén disponibles en esa implementación. Para obtener más información sobre cómo elegir un ejecutor, consulta Usa Dataflow Runner v2.

Si tu canalización usa withTopic o withTopics, la implementación anterior consulta a Kafka en el momento de la construcción de la canalización para las particiones disponibles. La máquina que crea la canalización debe tener permiso para conectarse a Kafka. Si recibes un error de permiso, verifica que tengas permisos para conectarte a Kafka de forma local. Puedes evitar este problema con withTopicPartitions, que no se conecta a Kafka durante el tiempo de construcción de la canalización.

Implementar en producción

Cuando implementes tu solución en producción, te recomendamos que uses plantillas de Flex. Cuando se usa una plantilla de Flex, la canalización se inicia desde un entorno coherente, lo que puede ayudar a mitigar los problemas de configuración local.

El registro desde KafkaIO puede ser bastante detallado. Considera reducir el nivel de registro en producción de la siguiente manera:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Para obtener más información, consulta Configura los niveles de registro del trabajador de canalización.

Configura Herramientas de redes

De forma predeterminada, Dataflow inicia instancias dentro de la red de nube privada virtual (VPC) predeterminada. Según tu configuración de Kafka, es posible que debas configurar una red y una subred diferentes para Dataflow. Para obtener más información, consulta Especifica una red y una subred. Cuando configures la red, crea reglas de firewall que permitan que las máquinas de trabajador de Dataflow lleguen a los agentes de Kafka.

Si usas los Controles del servicio de VPC, coloca el clúster de Kafka dentro del perímetro de los Controles del servicio de VPC o extiende los perímetros a la VPN autorizada o a Cloud Interconnect.

Si tu clúster de Kafka se implementa fuera de Google Cloud, debes crear una conexión de red entre Dataflow y el clúster de Kafka. Existen varias opciones de herramientas de redes con diferentes compensaciones:

La interconexión dedicada es la mejor opción para un rendimiento y una confiabilidad predecibles, pero puede tardar más en configurarse porque los terceros deben aprovisionar los circuitos nuevos. Con una topología según la IP pública, puedes comenzar con rapidez porque hay poco trabajo de herramientas de redes para hacer.

En las siguientes dos secciones, se describen estas opciones con más detalle.

Espacio de direcciones de RFC 1918 compartido

La interconexión dedicada y la VPN de IPsec te brindan acceso directo a las direcciones IP RFC 1918 en la nube privada virtual (VPC), lo que puede simplificar la configuración de Kafka. Si usas una topología basada en VPN, considera configurar una VPN de alta capacidad de procesamiento.

De forma predeterminada, Dataflow inicia instancias en la red de VPC predeterminada. En una topología de red privada con rutas definidas de forma explícita en Cloud Router que conectan subredes en Google Cloud a ese clúster de Kafka, necesitas más control sobre dónde ubicar las instancias de Dataflow. Puedes usar Dataflow para configurar los parámetros de ejecución network y subnetwork.

Asegúrate de que la subred correspondiente tenga suficientes direcciones IP disponibles para que Dataflow inicie instancias mientras intenta escalar horizontalmente. Además, cuando crees una red independiente para iniciar las instancias de Dataflow, asegúrate de tener una regla de firewall que habilite el tráfico de TCP entre todas las máquinas virtuales del proyecto. La red predeterminada ya tiene configurada esta regla de firewall.

Espacio de direcciones IP públicas

Esta arquitectura usa la seguridad de la capa de transporte (TLS) para proteger el tráfico entre clientes externos y Kafka, y usa texto simple en la comunicación entre agentes. Cuando el objeto de escucha de Kafka se vincula a una interfaz de red que se usa para la comunicación interna y externa, la configuración del objeto de escucha es sencilla. Sin embargo, en muchas situaciones, las direcciones anunciadas de forma externa de los agentes de Kafka en el clúster difieren de las interfaces de red internas que usa Kafka. En tales casos, puedes usar la propiedad advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Los clientes externos se conectan mediante el puerto 9093 a través de un canal SSL y los clientes internos se conectan mediante el puerto 9092 a través de un canal de texto simple. Cuando especifiques una dirección en advertised.listeners, usa nombres de DNS (kafkabroker-n.mydomain.com, en esta muestra) que se resuelven en la misma instancia para el tráfico interno y externo. Es posible que el uso de direcciones IP públicas no funcione porque las direcciones no pueden resolverse para el tráfico interno.

Ajusta Kafka

La configuración del clúster de Kafka y del cliente de Kafka puede tener un gran impacto en el rendimiento. En particular, es posible que los siguientes parámetros de configuración sean demasiado bajos. En esta sección, se proporcionan algunos puntos de partida sugeridos, pero debes experimentar con estos valores para tu carga de trabajo en particular.

  • unboundedReaderMaxElements. La configuración predeterminada es 10,000. Un valor más alto, como 100,000, puede aumentar el tamaño de los paquetes, lo que puede mejorar el rendimiento de forma significativa si tu canalización incluye agregaciones. Sin embargo, un valor más alto también podría aumentar la latencia. Para establecer el valor, usa setUnboundedReaderMaxElements. Este parámetro de configuración no se aplica a Runner v2.

  • unboundedReaderMaxReadTimeMs. El valor predeterminado es 10,000 ms. Un valor más alto, como 20,000 ms, puede aumentar el tamaño del paquete, mientras que un valor más bajo, como 5,000 ms, puede reducir la latencia o el retraso. Para establecer el valor, usa setUnboundedReaderMaxReadTimeMs. Este parámetro de configuración no se aplica a Runner v2.

  • max.poll.records. La configuración predeterminada es 500. Un valor más alto podría tener un mejor rendimiento, ya que recupera más registros entrantes juntos, en especial cuando se usa Runner v2. Para establecer el valor, llama a withConsumerConfigUpdates.

  • fetch.max.bytes. Valor predeterminado 1 MB. Un valor más alto podría mejorar la capacidad de procesamiento, ya que reduce la cantidad de solicitudes, especialmente cuando se usa Runner v2. Sin embargo, si lo configuras demasiado alto, es posible que aumente la latencia, aunque es más probable que el procesamiento descendente sea el cuello de botella principal. Un valor inicial recomendado es de 100 MB. Para establecer el valor, llama a withConsumerConfigUpdates.

  • max.partition.fetch.bytes. Valor predeterminado 1 MB. Este parámetro establece la cantidad máxima de datos por partición que muestra el servidor. Aumentar el valor puede mejorar la capacidad de procesamiento, ya que reduce la cantidad de solicitudes, en especial, cuando se usa Runner v2. Sin embargo, si lo configuras demasiado alto, es posible que aumente la latencia, aunque es más probable que el procesamiento descendente sea el cuello de botella principal. Un valor inicial recomendado es de 100 MB. Para establecer el valor, llama a withConsumerConfigUpdates.

  • consumerPollingTimeout. La configuración predeterminada es de 2 segundos. Si el cliente consumidor se agota el tiempo de espera antes de poder leer un registro, intenta establecer un valor más alto. Este parámetro de configuración suele ser relevante cuando se realizan lecturas entre regiones o lecturas con una red lenta. Para establecer el valor, llama a withConsumerPollingTimeout.

Asegúrate de que receive.buffer.bytes sea lo suficientemente grande como para controlar el tamaño de los mensajes. Si el valor es demasiado pequeño, es posible que los registros muestren que los consumidores se vuelven a crear de forma continua y buscan un desplazamiento específico.

Ejemplos

En los siguientes ejemplos de código, se muestra cómo crear canalizaciones de Dataflow que leen desde Kafka.

Lee desde un solo tema

En este ejemplo, se lee desde un tema de Kafka y se escriben las cargas útiles de los mensajes en archivos de texto.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Lee desde varios temas

En este ejemplo, se lee desde varios temas de Kafka y se aplica una lógica de canalización independiente para cada tema.

Para casos de uso más avanzados, pasa de forma dinámica un conjunto de objetos KafkaSourceDescriptor para que puedas actualizar la lista de temas de los que se leerá. Este enfoque requiere Java con Runner v2.

Python

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))