Escribir datos de Dataflow en Apache Kafka

En este documento se describe cómo escribir datos de Dataflow en Apache Kafka.

En la mayoría de los casos prácticos, te recomendamos que uses el conector de entrada/salida gestionado para escribir en Kafka.

Si necesitas una optimización del rendimiento más avanzada, te recomendamos que uses el conector KafkaIO. El conector KafkaIO está disponible para Java o mediante el framework de las canalizaciones multilingüe para Python y Go.

Procesamiento exacto

De forma predeterminada, el conector KafkaIO no proporciona semántica de una sola vez para las escrituras. Esto significa que es posible que los datos se escriban en tu tema de Kafka varias veces. Para habilitar las escrituras exactamente una vez, llama al método withEOS. La escritura exactamente una vez garantiza que los datos se escriben en el tema de Kafka de destino exactamente una vez. Sin embargo, también aumenta el coste de la canalización y reduce el rendimiento.

Si no tienes requisitos estrictos para la semántica de exactamente una vez y la lógica de tu canalización puede gestionar registros duplicados, considera habilitar el modo de al menos una vez en toda la canalización para reducir los costes. Para obtener más información, consulta Configurar el modo de streaming de la canalización.

Drenajes de tuberías

Si vacías la canalización, no se garantiza la semántica de exactamente una vez. La única garantía es que no se pierden los datos confirmados. Por lo tanto, es posible que algunos datos se procesen mientras se vacía la canalización, sin que se confirmen los desplazamientos de lectura en Kafka. Para conseguir la semántica de al menos una vez en Kafka al modificar una canalización, actualiza la canalización en lugar de cancelar el trabajo e iniciar uno nuevo.

Ajustar Kafka para que tenga una semántica de entrega única

Ajustar transaction.max.timeout.ms y transactional.id.expiration.ms puede complementar tu estrategia general de tolerancia a fallos y de entrega exactamente una vez. Sin embargo, su impacto depende de la naturaleza de la interrupción y de tu configuración específica. Defina transaction.max.timeout.ms cerca del tiempo de retención de sus temas de Kafka para evitar que se dupliquen los datos debido a interrupciones del broker de Kafka.

Si un broker de Kafka no está disponible temporalmente (por ejemplo, debido a una partición de red o a un fallo de nodo) y un productor tiene transacciones en curso, es posible que esas transacciones agoten el tiempo de espera. Si aumentas el valor de transaction.max.timeout.ms, las transacciones tendrán más tiempo para completarse después de que se recupere un broker, lo que puede evitar que tengas que reiniciar las transacciones y volver a enviar los mensajes. Esta mitigación ayuda indirectamente a mantener la semántica de exactamente una vez, ya que reduce la probabilidad de que se produzcan mensajes duplicados debido a reinicios de transacciones. Por otro lado, un tiempo de vencimiento más corto puede ayudar a limpiar los IDs de transacción inactivos más rápidamente, lo que reduce el uso potencial de recursos.

Configurar redes

De forma predeterminada, Dataflow inicia instancias en tu red de nube privada virtual (VPC) predeterminada. Según tu configuración de Kafka, es posible que tengas que configurar una red y una subred diferentes para Dataflow. Para obtener más información, consulta Especificar una red y una subred. Cuando configures tu red, crea reglas de cortafuegos que permitan que las máquinas de trabajador de Dataflow accedan a los brokers de Kafka.

Si usas Controles de Servicio de VPC, coloca el clúster de Kafka dentro del perímetro de Controles de Servicio de VPC. De lo contrario, amplía los perímetros a la VPN o a Cloud Interconnect autorizados.

Si tu clúster de Kafka se ha desplegado fuera de Google Cloud, debes crear una conexión de red entre Dataflow y el clúster de Kafka. Hay varias opciones de redes con diferentes ventajas e inconvenientes:

La interconexión dedicada es la mejor opción para disfrutar de un rendimiento y una fiabilidad predecibles, pero puede tardar más en configurarse porque terceros deben aprovisionar los nuevos circuitos. Con una topología basada en IPs públicas, puedes empezar rápidamente porque no es necesario hacer mucho trabajo de redes.

En las dos secciones siguientes se describen estas opciones con más detalle.

Espacio de direcciones RFC 1918 compartido

Tanto Dedicated Interconnect como la VPN IPsec te ofrecen acceso directo a las direcciones IP RFC 1918 de tu nube privada virtual (VPC), lo que puede simplificar tu configuración de Kafka. Si utilizas una topología basada en VPN, te recomendamos que configures una VPN de alto rendimiento.

De forma predeterminada, Dataflow inicia instancias en tu red de VPC predeterminada. En una topología de red privada con rutas definidas explícitamente en Cloud Router que conectan subredes en Google Cloud con ese clúster de Kafka, necesitas más control sobre dónde ubicar tus instancias de Dataflow. Puedes usar Dataflow para configurar los network y subnetwork parámetros de ejecución.

Asegúrate de que la subred correspondiente tenga suficientes direcciones IP disponibles para que Dataflow pueda iniciar instancias cuando intente ampliar la capacidad. Además, cuando crees una red independiente para iniciar tus instancias de Dataflow, asegúrate de que tienes una regla de cortafuegos que habilite el tráfico TCP entre todas las máquinas virtuales del proyecto. La red predeterminada ya tiene configurada esta regla de cortafuegos.

Espacio de direcciones IP públicas

Esta arquitectura usa Seguridad en la capa de transporte (TLS) para proteger el tráfico entre clientes externos y Kafka, y usa tráfico sin cifrar para la comunicación entre brokers. Cuando el receptor de Kafka se vincula a una interfaz de red que se usa tanto para la comunicación interna como para la externa, configurar el receptor es sencillo. Sin embargo, en muchos casos, las direcciones anunciadas externamente de los brokers de Kafka del clúster difieren de las interfaces de red internas que usa Kafka. En estos casos, puedes usar la propiedad advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

Los clientes externos se conectan mediante el puerto 9093 a través de un canal "SSL", y los clientes internos se conectan mediante el puerto 9092 a través de un canal de texto sin formato. Cuando especifiques una dirección en advertised.listeners, usa nombres de DNS (kafkabroker-n.mydomain.com en este ejemplo) que se resuelvan en la misma instancia para el tráfico externo e interno. Es posible que el uso de direcciones IP públicas no funcione porque las direcciones no se resuelvan para el tráfico interno.

Almacenamiento de registros

Los registros de KafkaIO pueden ser bastante detallados. Plantéate reducir el nivel de registro en producción de la siguiente manera:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Para obtener más información, consulta Definir niveles de registro de los elementos de trabajo de la canalización.

Siguientes pasos