En este documento, se describe cómo escribir datos de Dataflow en Apache Kafka.
El conector de E/S de Apache Beam Kafka (KafkaIO
) está disponible de forma nativa para Java y también para Python y Go con el framework de canalizaciones de varios lenguajes de Apache Beam.
Para las canalizaciones de Java, considera usar el conector de E/S administrado para leer desde Kafka.
Procesamiento “exactamente una vez”
De forma predeterminada, el conector KafkaIO
no proporciona semántica de exactamente una vez para las operaciones de escritura. Eso significa que los datos se pueden escribir en tu tema de Kafka varias veces. Para habilitar las operaciones de escritura de exactamente una vez, llama al método withEOS
. Las operaciones de escritura exactamente una vez garantizan que los datos se escriban en el tema de Kafka de destino exactamente una vez.
Sin embargo, también aumenta el costo de la canalización y disminuye la capacidad de procesamiento.
Si no tienes requisitos estrictos para la semántica de exactamente una vez y la lógica de tu canalización puede controlar registros duplicados, considera habilitar el modo de al menos una vez para toda la canalización para reducir los costos. Para obtener más información, consulta Configura el modo de transmisión de la canalización.
Drenes de canalizaciones
Si drenas la canalización, no se garantiza la semántica de exactamente una vez. La única garantía es que no se pierdan los datos confirmados. Como resultado, es posible que algunos datos se procesen mientras se drena la canalización, sin que se confirmen los desfases de lectura en Kafka. Para lograr la semántica de al menos una vez para Kafka cuando modificas una canalización, actualiza la canalización en lugar de cancelar el trabajo y comenzar uno nuevo.
Ajusta Kafka para la semántica de “exactamente una vez”
Ajustar transaction.max.timeout.ms
y transactional.id.expiration.ms
puede complementar tu estrategia general de tolerancia a fallas y entrega exactamente una vez.
Sin embargo, su impacto depende de la naturaleza de la interrupción y de tu configuración
específica. Establece transaction.max.timeout.ms
cerca del tiempo de retención de tus temas de Kafka para evitar la duplicación de datos causada por interrupciones del agente de Kafka.
Si un agente de Kafka deja de estar disponible temporalmente (por ejemplo, debido a una partición de red o una falla de nodo) y un productor tiene transacciones en curso, es posible que esas transacciones se agoten. Aumentar el valor de transaction.max.timeout.ms
les da a las transacciones más tiempo para completarse después de que se recupera un agente, lo que podría evitar la necesidad de reiniciar las transacciones y volver a enviar los mensajes. Esta mitigación ayuda de forma indirecta a mantener la semántica de exactamente una vez, ya que reduce la posibilidad de mensajes duplicados causados por reinicios de transacciones. Por otro lado, un tiempo de vencimiento más corto puede ayudar a limpiar los IDs de transacción inactivos con mayor rapidez, lo que reduce el uso potencial de recursos.
Configura Herramientas de redes
De forma predeterminada, Dataflow inicia instancias dentro de la red de nube privada virtual (VPC) predeterminada. Según tu configuración de Kafka, es posible que debas configurar una red y una subred diferentes para Dataflow. Para obtener más información, consulta Especifica una red y una subred. Cuando configures la red, crea reglas de firewall que permitan que las máquinas de trabajador de Dataflow lleguen a los agentes de Kafka.
Si usas los Controles del servicio de VPC, coloca el clúster de Kafka dentro del perímetro de los Controles del servicio de VPC o extiende los perímetros a la VPN autorizada o a Cloud Interconnect.
Si tu clúster de Kafka se implementa fuera de Google Cloud, debes crear una conexión de red entre Dataflow y el clúster de Kafka. Existen varias opciones de herramientas de redes con diferentes compensaciones:
- Conéctate con un espacio de direcciones RFC 1918 compartido mediante una de las siguientes opciones:
- Accede al clúster de Kafka alojado de forma externa a través de direcciones IP públicas con una de las siguientes opciones:
- Internet pública
- Intercambio de tráfico directo
- Intercambio de tráfico con proveedores
La interconexión dedicada es la mejor opción para un rendimiento y una confiabilidad predecibles, pero puede tardar más en configurarse porque los terceros deben aprovisionar los circuitos nuevos. Con una topología según la IP pública, puedes comenzar con rapidez porque hay poco trabajo de herramientas de redes para hacer.
En las siguientes dos secciones, se describen estas opciones con más detalle.
Espacio de direcciones de RFC 1918 compartido
La interconexión dedicada y la VPN de IPsec te brindan acceso directo a las direcciones IP RFC 1918 en la nube privada virtual (VPC), lo que puede simplificar la configuración de Kafka. Si usas una topología basada en VPN, considera configurar una VPN de alta capacidad de procesamiento.
De forma predeterminada, Dataflow inicia instancias en la red de VPC predeterminada. En una topología de red privada con rutas definidas de forma explícita en Cloud Router que conectan subredes en Google Cloud a ese clúster de Kafka, necesitas más control sobre dónde ubicar las instancias de Dataflow. Puedes usar Dataflow para configurar los parámetros de ejecución network
y subnetwork
.
Asegúrate de que la subred correspondiente tenga suficientes direcciones IP disponibles para que Dataflow inicie instancias mientras intenta escalar horizontalmente. Además, cuando crees una red independiente para iniciar las instancias de Dataflow, asegúrate de tener una regla de firewall que habilite el tráfico de TCP entre todas las máquinas virtuales del proyecto. La red predeterminada ya tiene configurada esta regla de firewall.
Espacio de direcciones IP públicas
Esta arquitectura usa la seguridad de la capa de transporte (TLS) para proteger el tráfico entre clientes externos y Kafka, y usa texto simple en la comunicación entre agentes. Cuando el objeto de escucha de Kafka se vincula a una interfaz de red que se usa para la comunicación interna y externa, la configuración del objeto de escucha es sencilla. Sin embargo, en muchas situaciones, las direcciones anunciadas de forma externa de los agentes de Kafka en el clúster difieren de las interfaces de red internas que usa Kafka. En tales casos, puedes usar la propiedad advertised.listeners
:
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
Los clientes externos se conectan mediante el puerto 9093 a través de un canal SSL y los clientes internos se conectan mediante el puerto 9092 a través de un canal de texto simple. Cuando especifiques una dirección en advertised.listeners
, usa nombres de DNS (kafkabroker-n.mydomain.com
, en esta muestra) que se resuelven en la misma instancia para el tráfico interno y externo. Es posible que el uso de direcciones IP públicas no funcione porque las direcciones no pueden resolverse para el tráfico interno.
Logging
El registro desde KafkaIO
puede ser bastante detallado. Considera reducir el nivel de registro
en producción de la siguiente manera:
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
Para obtener más información, consulta Cómo configurar los niveles de registro del trabajador de canalización.