Actualizar un flujo de procesamiento de streaming

En esta página se ofrecen directrices y recomendaciones para actualizar tus pipelines de streaming. Por ejemplo, puede que tengas que actualizar a una versión más reciente del SDK de Apache Beam o que quieras actualizar el código de tu canalización. Se ofrecen diferentes opciones para adaptarse a distintas situaciones.

Mientras que las canalizaciones por lotes se detienen cuando se completa el trabajo, las canalizaciones de streaming suelen ejecutarse de forma continua para proporcionar un procesamiento ininterrumpido. Por lo tanto, cuando actualices las canalizaciones de streaming, debes tener en cuenta lo siguiente:

  • Es posible que tengas que minimizar o evitar las interrupciones en la canalización. En algunos casos, puede que puedas tolerar una interrupción temporal del procesamiento mientras se implementa una nueva versión de una canalización. En otros casos, es posible que tu aplicación no pueda tolerar ninguna interrupción.
  • Los procesos de actualización de la canalización deben gestionar los cambios de esquema de forma que se minimicen las interrupciones en el procesamiento de mensajes y en otros sistemas conectados. Por ejemplo, si cambia el esquema de los mensajes de una canalización de procesamiento de eventos, es posible que también sea necesario cambiar el esquema de los receptores de datos posteriores.

Puede usar uno de los siguientes métodos para actualizar las canalizaciones de streaming, en función de su canalización y de los requisitos de actualización:

Para obtener más información sobre los problemas que pueden surgir durante una actualización y cómo prevenirlos, consulta los artículos Validar un trabajo de sustitución y Comprobación de compatibilidad de trabajos.

Prácticas recomendadas

  • Actualiza la versión del SDK de Apache Beam por separado de cualquier cambio en el código de la canalización.
  • Prueba tu canal después de cada cambio antes de hacer más actualizaciones.
  • Actualiza periódicamente la versión del SDK de Apache Beam que usa tu flujo de procesamiento.
  • Utiliza métodos automatizados siempre que sea posible, como las actualizaciones durante el vuelo o las actualizaciones automatizadas de la canalización paralela.

Realizar actualizaciones durante el vuelo

Puedes actualizar algunas canalizaciones de streaming en curso sin detener el trabajo. Esta situación se denomina "actualización de una tarea en curso". Las actualizaciones de los trabajos durante el vuelo solo están disponibles en determinadas circunstancias:

  • El trabajo debe usar Streaming Engine.
  • El trabajo debe estar en el estado de ejecución.
  • Solo vas a cambiar el número de trabajadores que usa la tarea.

Para obtener más información, consulta la sección Definir el intervalo de autoescalado de la página Autoescalado horizontal.

Para obtener instrucciones sobre cómo actualizar un trabajo en curso, consulta Actualizar un flujo de procesamiento.

Iniciar una tarea de sustitución

Si el trabajo actualizado es compatible con el trabajo actual, puedes actualizar tu pipeline con la opción update. Cuando sustituyes un trabajo, se ejecuta un nuevo trabajo con el código de la canalización actualizado. El servicio Dataflow conserva el nombre de la tarea, pero ejecuta la tarea de sustitución con un ID de tarea actualizado. Este proceso puede provocar un tiempo de inactividad mientras se detiene el trabajo actual, se ejecuta la comprobación de compatibilidad y se inicia el nuevo trabajo. Para obtener más información, consulta Efectos de sustituir un trabajo.

Dataflow realiza una comprobación de compatibilidad para asegurarse de que el código de la canalización actualizado se puede implementar de forma segura en la canalización en ejecución. Algunos cambios en el código provocan que la comprobación de compatibilidad falle, como cuando se añaden o se quitan entradas laterales de un paso. Si la comprobación de compatibilidad falla, no podrás realizar una actualización in situ del trabajo.

Para obtener instrucciones sobre cómo iniciar una tarea de sustitución, consulta Iniciar una tarea de sustitución.

Si la actualización de la canalización no es compatible con el trabajo actual, debes detener y sustituir la canalización. Si tu canal no puede tolerar el tiempo de inactividad, ejecuta canales paralelos.

Detener y sustituir flujos de procesamiento

Si puedes detener el procesamiento temporalmente, puedes cancelar o vaciar la canalización y, a continuación, sustituirla por la canalización actualizada. Si cancelas un flujo de procesamiento, Dataflow detendrá inmediatamente el procesamiento y cerrará los recursos lo antes posible, lo que puede provocar la pérdida de algunos datos que se estén procesando (datos en tránsito). Para evitar la pérdida de datos, en la mayoría de los casos, es preferible drenar. También puedes usar capturas de Dataflow para guardar el estado de un flujo de procesamiento de streaming, lo que te permite iniciar una nueva versión de tu trabajo de Dataflow sin perder el estado. Para obtener más información, consulta el artículo sobre cómo utilizar capturas de Dataflow.

Al drenar una canalización, se cierran inmediatamente todas las ventanas en curso y se activan todos los activadores. Aunque los datos en tránsito no se pierden, el vaciado puede provocar que las ventanas tengan datos incompletos. Si esto ocurre, las ventanas en proceso emiten resultados parciales o incompletos. Para obtener más información, consulta Efectos de agotar un trabajo. Una vez que se haya completado el trabajo, inicia un nuevo trabajo de streaming que contenga el código de la canalización actualizado para que se reanude el procesamiento.

Con este método, se produce un tiempo de inactividad entre el momento en que se detiene el trabajo de streaming y el momento en que la canalización de sustitución está lista para reanudar el procesamiento de datos. Sin embargo, cancelar o agotar una canalización y, a continuación, iniciar una nueva tarea con la canalización actualizada es menos complicado que ejecutar canalizaciones paralelas.

Para obtener instrucciones más detalladas, consulta Drain a Dataflow job (Desactivar un trabajo de Dataflow). Después de completar la tarea actual, inicia una nueva con el mismo nombre.

Reprocesamiento de mensajes con las funciones de instantánea y búsqueda de Pub/Sub

En algunas situaciones, después de sustituir o cancelar una canalización agotada, es posible que tengas que volver a procesar los mensajes de Pub/Sub entregados anteriormente. Por ejemplo, puede que tenga que usar una lógica empresarial actualizada para volver a procesar los datos. Búsqueda de Pub/Sub es una función que te permite reproducir mensajes de una instantánea de Pub/Sub. Puedes usar Pub/Sub Seek con Dataflow para volver a procesar mensajes desde el momento en que se crea la captura de la suscripción.

Durante el desarrollo y las pruebas, también puedes usar Pub/Sub Seek para reproducir los mensajes conocidos repetidamente y verificar la salida de tu canalización. Cuando uses la búsqueda de Pub/Sub, no busques una captura de una suscripción si una canalización está consumiendo la suscripción. Si lo haces, la búsqueda puede invalidar la lógica de la marca de agua de Dataflow y afectar al procesamiento de los mensajes de Pub/Sub, que se lleva a cabo una sola vez.

A continuación, se muestra un flujo de trabajo recomendado de gcloud CLI para usar Pub/Sub Seek con canalizaciones de Dataflow en una ventana de terminal:

  1. Para crear una captura de la suscripción, usa el comando gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Para vaciar o cancelar la canalización, usa el comando gcloud dataflow jobs drain o el comando gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    o

    gcloud dataflow jobs cancel JOB_ID
    
  3. Para ir a la instantánea, usa el comando gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Implementa una nueva canalización que consuma la suscripción.

Ejecutar flujos de procesamiento en paralelo

Si necesitas evitar interrupciones en tu flujo de datos durante una actualización, puedes ejecutar flujos de datos paralelos. Este enfoque te permite iniciar una nueva tarea de streaming con el código de la canalización actualizado y ejecutarla en paralelo con la tarea actual. Puedes usar el flujo de trabajo de implementación de actualización de canalización paralela automatizada de Dataflow o realizar los pasos manualmente.

Descripción general de las canalizaciones paralelas

Cuando crees la nueva canalización, usa la misma estrategia de ventanas que en la canalización actual. En el flujo de trabajo manual, deja que la canalización siga ejecutándose hasta que su marca de agua supere la marca de tiempo de la ventana completa más antigua procesada por la canalización actualizada. A continuación, desvía o cancela el flujo de procesamiento. Si utilizas el flujo de trabajo automatizado, este paso se realiza automáticamente. La canalización actualizada sigue ejecutándose en su lugar y se encarga del procesamiento por sí sola.

En el siguiente diagrama se ilustra este proceso.

La canalización B se superpone con la canalización B durante un periodo de 5 minutos.

En el diagrama, Pipeline B es el trabajo actualizado que sustituye a Pipeline A. El valor t es la marca de tiempo de la ventana completa más antigua procesada por Pipeline B. El valor w es la marca de agua de Pipeline A. Para simplificar, se presupone que la marca de agua es perfecta y no hay datos tardíos. El tiempo de procesamiento y el tiempo real se representan en el eje horizontal. Ambas canalizaciones usan ventanas fijas (tumbling) de cinco minutos. Los resultados se activan después de que la marca de agua supere el final de cada ventana.

Como la salida simultánea se produce durante el periodo en el que se superponen las dos canalizaciones, configura las dos canalizaciones para que escriban los resultados en destinos diferentes. Los sistemas posteriores pueden usar una abstracción sobre los dos receptores de destino, como una vista de base de datos, para consultar los resultados combinados. Estos sistemas también pueden usar la abstracción para eliminar los resultados duplicados del periodo superpuesto. Para obtener más información, consulta Gestionar resultados duplicados.

Limitaciones

El uso de actualizaciones de la canalización en paralelo automáticas o manuales tiene las siguientes limitaciones:

  • Solo actualizaciones automatizadas: la nueva tarea paralela debe ser una tarea de Streaming Engine.
  • Los nombres de las tareas antiguas y nuevas deben ser diferentes, ya que no se permiten tareas simultáneas con el mismo nombre.
  • Si se ejecutan dos flujos de trabajo en paralelo con la misma entrada, se pueden duplicar los datos, se pueden producir agregaciones parciales y pueden surgir problemas de orden cuando se insertan datos en el receptor. El sistema receptor debe diseñarse para anticipar y gestionar estos resultados.
  • Cuando se lee de una fuente de Pub/Sub, no se recomienda usar la misma suscripción en varias canalizaciones, ya que puede provocar problemas de precisión. Sin embargo, en algunos casos prácticos, como las canalizaciones de extracción, transformación y carga (ETL), usar la misma suscripción en dos canalizaciones puede reducir la duplicación. Es probable que haya problemas con el escalado automático si proporcionas un valor distinto de cero para la duración de la superposición. Esto se puede mitigar usando la función de actualización de tareas en curso. Para obtener más información, consulta el artículo Ajustar la escalada automática de tus pipelines de streaming de Pub/Sub.
  • En Apache Kafka, puedes minimizar los duplicados habilitando la confirmación de desfase en Kafka. Para habilitar la confirmación de desfase en Kafka, consulta Confirmación en Kafka.

Actualizaciones automatizadas de la canalización paralela

Dataflow ofrece compatibilidad con APIs para iniciar un trabajo de sustitución en paralelo. Esta API de estilo declarativo abstrae el trabajo manual de ejecutar pasos de procedimiento. Declara el trabajo que quieres actualizar y, a continuación, se ejecuta un nuevo trabajo en paralelo con el antiguo. Una vez que la nueva tarea se haya ejecutado durante el tiempo que hayas especificado, se vaciará la antigua. Esta función elimina las pausas de procesamiento durante las actualizaciones y reduce el esfuerzo operativo necesario para actualizar las canalizaciones incompatibles.

Este método de actualización es el más adecuado para las canalizaciones que pueden tolerar algunos duplicados o agregaciones parciales y que no requieren un orden estricto al insertar datos. Es adecuada para flujos de procesamiento ETL, así como para flujos de procesamiento que usan el modo de streaming "al menos una vez" y la transformación Redistribute con la opción de permitir duplicados definida como true.

Enviar una solicitud de actualización de canalización paralela automatizada

Para usar el flujo de trabajo automatizado, inicia un nuevo trabajo de streaming con las siguientes opciones de servicio. Debes iniciar el nuevo trabajo con un nombre diferente al del trabajo antiguo.

Java

--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

También puedes especificar el ID de la tarea antigua:

--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

Python

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

También puedes especificar el ID de la tarea antigua:

--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

Go

--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

También puedes especificar el ID de la tarea antigua:

--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

gcloud

--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

También puedes especificar el ID de la tarea antigua:

--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"

Sustituye las siguientes variables:

  • Debes proporcionar parallel_replace_job_name o parallel_replace_job_id para identificar el trabajo que quieres sustituir.
    • OLD_JOB_NAME: si usas parallel_replace_job_name, el nombre del trabajo que se va a sustituir.
    • OLD_JOB_ID: si usa parallel_replace_job_id, el ID del trabajo que se va a sustituir.
  • Debes proporcionar un valor de parallel_replace_job_min_parallel_pipelines_duration.

    • DURATION: el tiempo mínimo que las dos pipelines se ejecutan en paralelo como un número entero o de coma flotante. Una vez transcurrido este tiempo, se envía una señal de drenaje al trabajo antiguo.

      La duración debe estar entre 0 segundos (0s) y 31 días (744h). Usa s, m y h para especificar segundos, minutos y horas. Por ejemplo, 10m es 10 minutos.

Cuando inicias el nuevo trabajo, Dataflow espera a que se aprovisionen todos los trabajadores antes de empezar a procesar los datos. Para monitorizar el estado de la implementación, consulta los registros de la tarea de Dataflow.

Ejecutar flujos de procesamiento paralelos manualmente

En situaciones más complejas o cuando necesites más control sobre el proceso de actualización, puedes ejecutar manualmente las canalizaciones paralelas. Deja que la canalización siga ejecutándose hasta que su marca de agua supere la marca de tiempo de la ventana completa más antigua procesada por la canalización actualizada. A continuación, vacía o cancela la pipeline actual.

Gestionar resultados duplicados

En el siguiente ejemplo se describe una forma de gestionar la salida duplicada. Las dos canalizaciones escriben la salida en destinos diferentes, usan sistemas posteriores para consultar los resultados y eliminan los resultados duplicados del periodo superpuesto. En este ejemplo se usa un flujo de procesamiento que lee datos de entrada de Pub/Sub, realiza algún procesamiento y escribe los resultados en BigQuery.

  1. En el estado inicial, el flujo de procesamiento en streaming (Pipeline A) está en ejecución y lee mensajes de un tema de Pub/Sub (Topic) mediante una suscripción (Subscription A). Los resultados se escriben en una tabla de BigQuery (Tabla A). Los resultados se consumen a través de una vista de BigQuery, que actúa como fachada para ocultar los cambios de la tabla subyacente. Este proceso es una aplicación de un método de diseño llamado patrón de fachada. En el siguiente diagrama se muestra el estado inicial.

    Una canalización con una suscripción y escritura en una sola tabla de BigQuery.

  2. Crea una suscripción (Suscripción B) para la canalización actualizada. Implementa la canalización actualizada (Canalización B), que lee del tema de Pub/Sub (Tema) mediante la Suscripción B y escribe en una tabla de BigQuery independiente (Tabla B). En el siguiente diagrama se muestra este flujo.

    Dos canalizaciones, cada una con una suscripción. Cada flujo de procesamiento escribe en una tabla de BigQuery independiente. Una vista de fachada lee de ambas tablas.

    En este punto, Pipeline A y Pipeline B se ejecutan en paralelo y escriben los resultados en tablas independientes. Registras el tiempo t como la marca de tiempo de la ventana completa más antigua procesada por Pipeline B.

  3. Cuando la marca de agua de Pipeline A supere el tiempo t, vacía Pipeline A. Cuando vacías el flujo de procesamiento, se cierran todas las ventanas abiertas y se completa el procesamiento de los datos en tránsito. Si la canalización contiene ventanas y es importante que estén completas (suponiendo que no haya datos tardíos), antes de vaciar Canalización A, deja que ambas canalizaciones se ejecuten hasta que tengas ventanas superpuestas completas. Detener la tarea de streaming de Pipeline A después de que se hayan procesado todos los datos en tránsito y se hayan escrito en Table A. En el siguiente diagrama se muestra esta fase.

    El flujo de trabajo A se vacía y deja de leer la suscripción A, así como de enviar datos a la tabla A una vez que se haya completado el vaciado. El segundo flujo de procesamiento se encarga de todo el procesamiento.

  4. En este punto, solo se está ejecutando Pipeline B. Puedes consultar una vista de BigQuery (vista de fachada), que actúa como fachada de Tabla A y Tabla B. En las filas que tengan la misma marca de tiempo en ambas tablas, configura la vista para que devuelva las filas de la tabla B o, si las filas no existen en la tabla B, que vuelva a la tabla A. En el siguiente diagrama se muestra la vista (Vista de fachada) que lee de Tabla A y Tabla B.

    El flujo de procesamiento A se ha eliminado y solo se ejecuta el flujo de procesamiento B.

    En este punto, puedes eliminar la Suscripción A.

Si se detectan problemas con una nueva implementación de una canalización, tener canalizaciones paralelas puede simplificar la reversión. En este ejemplo, puede que quieras mantener Pipeline A en ejecución mientras monitorizas Pipeline B para comprobar que funciona correctamente. Si se produce algún problema con Pipeline B, puedes volver a Pipeline A.

Gestionar mutaciones de esquema

Los sistemas de tratamiento de datos suelen tener que adaptarse a las mutaciones de esquemas a lo largo del tiempo, a veces debido a cambios en los requisitos empresariales y otras veces por motivos técnicos. Aplicar actualizaciones de esquemas suele requerir una planificación y una ejecución cuidadosas para evitar interrupciones en los sistemas de información de la empresa.

Supongamos que tienes una canalización que lee mensajes que contienen cargas útiles de JSON de un tema de Pub/Sub. La canalización convierte cada mensaje en una instancia de TableRow y, a continuación, escribe las filas en una tabla de BigQuery. El esquema de la tabla de salida es similar a los mensajes que procesa la canalización. En el siguiente diagrama, el esquema se denomina Esquema A.

Una canalización que lee una suscripción y escribe en una tabla de salida de BigQuery mediante el esquema A.

Con el tiempo, el esquema de mensajes puede cambiar de formas no triviales. Por ejemplo, se añaden, se quitan o se sustituyen campos. El esquema A evoluciona a un nuevo esquema. En la conversación que sigue, el nuevo esquema se denomina Esquema B. En este caso, Pipeline A debe actualizarse y el esquema de la tabla de salida debe admitir Schema B.

En la tabla de salida, puedes realizar algunas mutaciones de esquema sin tiempo de inactividad. Por ejemplo, puedes añadir campos o cambiar los modos de columna, como cambiar de REQUIRED a NULLABLE, sin que haya tiempo de inactividad. Estas mutaciones no suelen afectar a las consultas. Sin embargo, las mutaciones de esquema que modifican o eliminan campos de esquema provocan errores en las consultas u otras interrupciones. El siguiente enfoque se adapta a los cambios sin necesidad de tiempo de inactividad.

Separa los datos que escribe la canalización en una tabla principal y en una o varias tablas de almacenamiento temporal. La tabla principal almacena el historial de datos escritos por la canalización. Las tablas de almacenamiento provisional almacenan la salida más reciente de la canalización. Puedes definir una vista de fachada de BigQuery sobre las tablas principales y de almacenamiento provisional, lo que permite a los consumidores consultar datos históricos y actualizados.

En el siguiente diagrama se revisa el flujo de la canalización anterior para incluir una tabla de almacenamiento provisional (Tabla de almacenamiento provisional A), una tabla principal y una vista de fachada.

Flujo de procesamiento que lee una suscripción y escribe en una tabla de almacenamiento temporal de BigQuery. Una segunda tabla (principal) tiene la salida de una versión anterior del esquema. Una vista de fachada lee tanto de la tabla de almacenamiento temporal como de la tabla principal.

En el flujo revisado, Pipeline A procesa los mensajes que usan Schema A y escribe el resultado en Staging Table A, que tiene un esquema compatible. La tabla principal contiene datos históricos escritos por versiones anteriores de la pipeline, así como resultados que se combinan periódicamente de la tabla de almacenamiento provisional. Los consumidores pueden consultar datos actualizados, incluidos datos históricos y en tiempo real, mediante la vista de fachada.

Cuando el esquema de mensajes cambia de Esquema A a Esquema B, puede actualizar el código de la canalización para que sea compatible con los mensajes que usen Esquema B. El flujo de procesamiento debe actualizarse con la nueva implementación. Al ejecutar pipelines paralelos, puedes asegurarte de que el procesamiento de datos de streaming continúe sin interrupciones. Si se terminan y se sustituyen las canalizaciones, se produce una interrupción en el procesamiento, ya que no se ejecuta ninguna canalización durante un periodo.

La canalización actualizada escribe en una tabla de almacenamiento provisional adicional (Tabla de almacenamiento provisional B) que usa el Esquema B. Puedes usar un flujo de trabajo orquestado para crear la nueva tabla de staging antes de actualizar la canalización. Actualiza la vista de fachada para incluir los resultados de la nueva tabla de almacenamiento provisional, posiblemente usando un paso de flujo de trabajo relacionado.

En el siguiente diagrama se muestra el flujo actualizado que muestra la tabla de almacenamiento temporal B con el esquema B y cómo se actualiza la vista de fachada para incluir contenido de la tabla principal y de ambas tablas de almacenamiento temporal.

Ahora, la canalización usa el esquema B y escribe en la tabla de almacenamiento provisional B. Una vista de fachada lee de la tabla Principal, la tabla de almacenamiento provisional A y la tabla de almacenamiento provisional B.

Como proceso independiente de la actualización de la canalización, puedes combinar las tablas de almacenamiento provisional en la tabla principal, ya sea periódicamente o según sea necesario. En el siguiente diagrama se muestra cómo se combina Tabla de almacenamiento temporal A con la tabla principal.

La tabla de almacenamiento temporal A se combina con la tabla principal. La vista de fachada lee de la tabla de almacenamiento provisional B y de la tabla principal.

Siguientes pasos