Actualiza una canalización existente

Los SDK de Apache Beam proporcionan una forma de actualizar un trabajo de transmisión continuo en el servicio administrado de Dataflow con un nuevo código de canalización.

Existen varios motivos por los que es posible que desee actualizar su trabajo existente de Dataflow:

  • Quieres mejorar tu código de canalización.
  • Quieres solucionar errores en el código de tu canalización.
  • Quieres actualizar tu canalización a fin de administrar cambios en el formato de datos o justificar la versión o cualquier otro tipo de cambios en tu fuente de datos.

Cuando actualiza su trabajo, el servicio de Dataflow realiza una verificación de compatibilidad entre su trabajo actual y su posible trabajo de reemplazo. La verificación de compatibilidad garantiza que se pueden transferir elementos como la información de estado intermedio y los datos almacenados en búfer de tu trabajo anterior a tu trabajo de reemplazo.

El proceso de actualización y sus efectos

Cuando actualiza un trabajo en el servicio de Dataflow, reemplaza el trabajo existente por un nuevo trabajo que ejecute el código de canalización actualizado. El servicio de Dataflow conserva el nombre del trabajo, pero ejecuta el trabajo de reemplazo con un Job ID actualizado.

El trabajo de reemplazo conserva todos los datos de estado intermedios del trabajo anterior y los registros de datos almacenados en búfer o metadatos aún “en tránsito” del trabajo anterior. Por ejemplo, algunos registros en tu canalización pueden almacenarse en búfer mientras se resuelve una ventana.

Datos en tránsito

Las transformaciones en tu canalización nueva procesarán los datos “en tránsito”. Sin embargo, las transformaciones adicionales que agregas a tu código de la canalización de reemplazo pueden o no tener efecto según la ubicación en la que se almacenan los registros. Por ejemplo, supongamos que tu canalización existente tiene las siguientes transformaciones:

Java: SDK 2.x

      p.apply("Read", ReadStrings())
       .apply("Format", FormatStrings());
    

Python

      p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
        | 'Format' >> FormatStrings()
    

Java: SDK 1.x

Puedes reemplazar tu trabajo con códigos de canalización nuevos, como se indica a continuación:

Java: SDK 2.x

      p.apply("Read", ReadStrings())
       .apply("Remove", RemoveStringsStartingWithA())
       .apply("Format", FormatStrings());
    

Python

      p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
        | 'Remove', RemoveStringsStartingWithA()
        | 'Format' >> FormatStrings()
    

Java: SDK 1.x

Si bien agregaste una transformación para filtrar las strings que comienzan con la letra “A”, la siguiente transformación (FormatStrings) quizás vea strings en tránsito o almacenadas en búfer que comienzan con “A” transferidas del trabajo anterior.

Cambios en el sistema de ventanas

Puedes cambiar las estrategias del sistema de ventanas y de activadores para las PCollection en tu canalización de reemplazo, pero ten cuidado. Los cambios en las estrategias de activadores y del sistema de ventanas no afectarán a los datos que ya están almacenados en búfer o en tránsito.

Recomendamos que intentes realizar solo cambios pequeños en el sistema de ventanas de tu canalización, como cambiar la duración de ventanas fijas o de período variable. Realizar cambios grandes en el sistema de ventanas o activadores, como cambiar el algoritmo del sistema de ventanas, puede tener un impacto impredecible en el resultado de tu canalización.

Inicia tu trabajo de reemplazo

A fin de actualizar tu trabajo, necesitarás iniciar uno nuevo para reemplazar el trabajo en ejecución. Cuando inicies tu trabajo de reemplazo, necesitarás establecer las siguientes opciones de canalización para realizar el proceso de actualización además de las opciones regulares del trabajo:

Java

  • Pasa la opción --update.
  • Establecer la opción --jobName en PipelineOptions en el mismo nombre que el trabajo que quieres actualizar
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transformNameMapping.

Python

  • Pasa la opción --update.
  • Establecer la opción --job_name en PipelineOptions en el mismo nombre que el trabajo que quieres actualizar
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform_name_mapping.

Cómo especificar tu nombre de trabajo de reemplazo

Java

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --jobName debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

Python

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --job_name debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

Para encontrar el valor correcto del nombre del trabajo, seleccione su trabajo anterior en la interfaz de Dataflow Monitoring y busque el campo Job Name en la pestaña Summary:

Figura 1: La pestaña Resumen para un trabajo en ejecución de Cloud Dataflow, con el campo Nombre del trabajo resaltado.

Como alternativa, puede consultar una lista de trabajos existentes mediante la interfaz de línea de comandos de Dataflow. Ingrese el comando gcloud beta dataflow jobs list en su shell o ventana de terminal para obtener una lista de trabajos de Dataflow en su proyecto de Google Cloud y busque el campo NAME para el trabajo que desea reemplazar:

    ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
    2015-07-28_17_02_27-7257409117866690674   windowedwordcount-johndoe-0729000214 Streaming  2015-07-28 17:02:28  Running
    

Crea la asignación de transformaciones

Java

Si la canalización de reemplazo cambió algún nombre de transformación de los de tu canalización anterior, el servicio de Dataflow requiere una asignación de transformación. La asignación de transformaciones asigna las transformaciones nombradas en el código de tu canalización anterior a nombres en el código de tu canalización de reemplazo. Puedes pasar la asignación mediante la opción de línea de comandos --transformNameMapping, con el siguiente formato general:

    --transformNameMapping= .
    {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
    

Solo debes proporcionar entradas de asignación en --transformNameMapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Python

Si la canalización de reemplazo cambió algún nombre de transformación de los de tu canalización anterior, el servicio de Dataflow requiere una asignación de transformación. La asignación de transformaciones asigna las transformaciones nombradas en el código de tu canalización anterior a nombres en el código de tu canalización de reemplazo. Puedes pasar la asignación mediante la opción de línea de comandos --transform_name_mapping, con el siguiente formato general:

    --transform_name_mapping= .
    {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
    

Solo debes proporcionar entradas de asignación en --transform_name_mapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Determina los nombres de las transformaciones

El nombre de la transformación en cada instancia en la asignación es el nombre que suministraste cuando aplicaste la transformación en tu canalización. Por ejemplo:

Java: SDK 2.x

    .apply("FormatResults", ParDo
      .of(new DoFn<KV<String, Long>>, String>() {
        ...
       }
    }))
    

Python

    | 'FormatResults' >> beam.ParDo(MyDoFn())
    

Java: SDK 1.x

También puede obtener los nombres de las transformaciones de su trabajo anterior examinando el gráfico de ejecución del trabajo en la interfaz de Dataflow Monitoring:

Figura 2: El grafo de ejecución para una canalización de WordCount como se muestra en la interfaz de supervisión de Cloud Dataflow.

Denomina las transformaciones compuestas

Los nombres de transformaciones son jerárquicos: se basan en la jerarquía de transformaciones en tu canalización. Si tu canalización tiene una transformación compuesta, las transformaciones anidadas se nombran en términos de la transformación que las contiene. Por ejemplo, supongamos que tu canalización contiene una transformación compuesta llamada CountWidgets, que contiene una transformación interna llamada Parse. El nombre completo de la transformación interna será CountWidgets/Parse y debes especificar ese nombre completo en tu asignación de transformaciones.

Si tu canalización nueva asigna una transformación compuesta con un nombre diferente, se renombrarán todas las transformaciones anidadas de forma automática. Deberás especificar los nombres cambiados para las transformaciones internas en tu asignación de transformaciones.

Refactoriza la jerarquía de transformaciones

Si tu canalización de reemplazo usa una jerarquía de transformaciones diferente a la de tu canalización anterior (p. ej., debido a que refactorizaste tus transformaciones compuestas, o tu canalización depende de una transformación compuesta por una biblioteca que cambió), deberás declarar la asignación de forma explícita.

Por ejemplo, supongamos que tu canalización anterior aplicó una transformación compuesta, CountWidgets, que contenía una transformación interna llamada Parse. Ahora, supongamos que tu canalización de reemplazo refactoriza CountWidgets y anida Parse dentro de otra transformación llamada Scan. Para que tu actualización tenga éxito, debes asignar de forma explícita el nombre completo de la transformación de la canalización anterior (CountWidgets/Parse) al nombre de la transformación de la canalización nueva (CountWidgets/Scan/Parse):

Java

    --transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
    

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que nuestra canalización de reemplazo quita la transformación de CountWidgets/Parse en su totalidad:

    --transformNameMapping={"CountWidgets/Parse":""}
    

Python

    --transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
    

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que nuestra canalización de reemplazo quita la transformación de CountWidgets/Parse en su totalidad:

    --transform_name_mapping={"CountWidgets/Parse":""}
    

Verifica la compatibilidad de trabajos

Cuando inicia el trabajo de reemplazo, el servicio de Dataflow realiza una verificación de compatibilidad entre su trabajo de reemplazo y su trabajo anterior. Si se aprueba la verificación de compatibilidad, tu trabajo anterior se detendrá. El trabajo de reemplazo se iniciará en el servicio de Dataflow y conservará el mismo nombre de trabajo. Si la verificación de compatibilidad falla, su trabajo anterior continuará ejecutándose en el servicio de Dataflow y su trabajo de reemplazo mostrará un error.

Java

Python

La verificación de compatibilidad garantiza que el servicio de Dataflow pueda transferir datos de estado intermedio de los pasos del trabajo anterior al trabajo de reemplazo, según lo especificado por la asignación de transformación que proporcione. La verificación de compatibilidad también garantiza que las PCollection de tu canalización usen los mismos codificadores. Cambiar un Coder puede ocasionar que falle la verificación de compatibilidad, ya que algunos datos en tránsito o registros almacenados en búfer pueden no estar serializados de forma correcta en la canalización de reemplazo.

Evita errores de compatibilidad

Algunas diferencias entre tu canalización anterior y tu canalización de reemplazo pueden ocasionar que falle la verificación de compatibilidad. Estas diferencias incluyen los siguientes puntos:

  • Cambiar el grafo de la canalización sin proporcionar una asignación. Cuando actualizas un trabajo, el servicio de Dataflow intenta hacer coincidir las transformaciones de tu trabajo anterior con las transformaciones del trabajo de reemplazo para transferir datos de estado intermedios para cada paso. Si renunciaste o quitaste algún paso, deberás proporcionar una asignación de transformación para que Dataflow pueda hacer coincidir los datos de estado según corresponda.
  • Cambiar las entradas adicionales de un paso. Agregar entradas adicionales a una transformación de tu canalización de reemplazo o quitarlas de ella hará que falle la verificación de compatibilidad.
  • Cambiar el codificador de un paso. Cuando actualizas un trabajo, el servicio de Dataflow conserva todos los registros almacenados en búfer (por ejemplo, mientras windowing se resuelve) y los controla en el trabajo de reemplazo. Si el trabajo de reemplazo utiliza una codificación de datos diferente o incompatible, el servicio de Dataflow no podrá serializar ni deserializar estos registros.
  • Quitaste una operación “con estado” de tu canalización. Es posible que el trabajo de reemplazo no realice la verificación de compatibilidad de Dataflow si quitas ciertas operaciones con estado de tu canalización. El servicio de Dataflow puede fusionar varios pasos para aumentar la eficacia. Si quitaste una operación dependiente del estado de un paso fusionado, la verificación fallará. Las operaciones con estado incluyen los siguientes elementos:

    • Transformaciones que producen o consumen entradas adicionales
    • Lecturas de E/S
    • Transformaciones que usan un estado clave
    • Transformaciones con ventanas combinadas
  • Intentas ejecutar tu trabajo de reemplazo en una zona geográfica diferente. Debes ejecutar tu trabajo de reemplazo en la misma zona en la que ejecutabas tu trabajo anterior.