Actualiza una canalización existente

Los SDK de Apache Beam proporcionan una forma de actualizar un trabajo de transmisión en curso en el servicio gestionado de Cloud Dataflow con un nuevo código de canalización.

Existen varias razones por las que quizás quieres actualizar tu trabajo actual de Cloud Dataflow:

  • Quieres mejorar tu código de canalización.
  • Quieres solucionar errores en el código de tu canalización.
  • Quieres actualizar tu canalización a fin de administrar cambios en el formato de datos o justificar la versión o cualquier otro tipo de cambios en tu fuente de datos.

Cuando actualizas tu trabajo, el servicio de Cloud Dataflow realiza una verificación de compatibilidad entre tu trabajo actual en ejecución y tu potencial trabajo de reemplazo. La verificación de compatibilidad garantiza que se pueden transferir elementos como la información de estado intermedio y los datos almacenados en búfer de tu trabajo anterior a tu trabajo de reemplazo.

El proceso de actualización y sus efectos

Cuando actualizas un trabajo en el servicio de Cloud Dataflow, reemplazas el trabajo existente con uno nuevo que ejecuta el código de la canalización actualizado. El servicio de Cloud Dataflow retiene el nombre del trabajo, pero ejecuta el trabajo de reemplazo con un ID de trabajo actualizado.

El trabajo de reemplazo conserva todos los datos de estado intermedios del trabajo anterior y los registros de datos almacenados en búfer o metadatos aún “en tránsito” del trabajo anterior. Por ejemplo, algunos registros en tu canalización pueden almacenarse en búfer mientras se resuelve una ventana.

Datos en tránsito

Las transformaciones en tu canalización nueva procesarán los datos “en tránsito”. Sin embargo, las transformaciones adicionales que agregas a tu código de la canalización de reemplazo pueden o no tener efecto según la ubicación en la que se almacenan los registros. Por ejemplo, supongamos que tu canalización existente tiene las siguientes transformaciones:

Java: SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Java: SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(FormatStrings().named("Format"));

Puedes reemplazar tu trabajo con códigos de canalización nuevos, como se indica a continuación:

Java: SDK 2.x

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove', RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Java: SDK 1.x

  p.apply(ReadStrings().named("Read"))
   .apply(RemoveStringsStartingWithA().named("Remove"))
   .apply(FormatStrings().named("Format"));

Si bien agregaste una transformación para filtrar las strings que comienzan con la letra “A”, la siguiente transformación (FormatStrings) quizás vea strings en tránsito o almacenadas en búfer que comienzan con “A” transferidas del trabajo anterior.

Cambios en el sistema de ventanas

Puedes cambiar las estrategias del sistema de ventanas y de activadores para las PCollection en tu canalización de reemplazo, pero ten cuidado. Los cambios en las estrategias de activadores y del sistema de ventanas no afectarán a los datos que ya están almacenados en búfer o en tránsito.

Recomendamos que intentes realizar solo cambios pequeños en el sistema de ventanas de tu canalización, como cambiar la duración de ventanas fijas o de período variable. Realizar cambios grandes en el sistema de ventanas o activadores, como cambiar el algoritmo del sistema de ventanas, puede tener un impacto impredecible en el resultado de tu canalización.

Inicia tu trabajo de reemplazo

A fin de actualizar tu trabajo, necesitarás iniciar uno nuevo para reemplazar el trabajo en ejecución. Cuando inicies tu trabajo de reemplazo, necesitarás establecer las siguientes opciones de canalización para realizar el proceso de actualización además de las opciones regulares del trabajo:

Java

  • Pasa la opción --update.
  • Establecer la opción --jobName en PipelineOptions en el mismo nombre que el trabajo que quieres actualizar
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transformNameMapping.

Python

  • Pasa la opción --update.
  • Establecer la opción --job_name en PipelineOptions en el mismo nombre que el trabajo que quieres actualizar
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform_name_mapping.

Cómo especificar tu nombre de trabajo de reemplazo

Java

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --jobName debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

Python

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --job_name debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

A fin de encontrar el valor de nombre del trabajo correcto, selecciona tu trabajo anterior en la interfaz de supervisión de Cloud Dataflow y encuentra el campo Nombre del trabajo en la pestaña Resumen:

Figura 1: La pestaña Resumen para un trabajo en ejecución de Cloud Dataflow, con el campo Nombre del trabajo resaltado.

De manera alternativa, puedes consultar una lista de trabajos existentes con la interfaz de línea de comandos de Cloud Dataflow. Ingresa el comando gcloud alpha dataflow jobs list en tu shell o ventana de terminal para obtener una lista de trabajos de Cloud Dataflow en tu proyecto de Google Cloud Platform y busca el campo NAME para el trabajo que deseas reemplazar:

ID                                        NAME                                 TYPE       CREATION_TIME        STATUS
2015-07-28_17_02_27-7257409117866690674   windowedwordcount-johndoe-0729000214 Streaming  2015-07-28 17:02:28  Running

Crea la asignación de transformaciones

Java

Si tu canalización de reemplazo cambió algún nombre de transformación de tu canalización anterior, el servicio de Cloud Dataflow requiere una asignación de transformaciones. La asignación de transformaciones asigna las transformaciones nombradas en el código de tu canalización anterior a nombres en el código de tu canalización de reemplazo. Puedes pasar la asignación mediante la opción de línea de comandos --transformNameMapping, con el siguiente formato general:

--transformNameMapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Solo debes proporcionar entradas de asignación en --transformNameMapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Python

Si tu canalización de reemplazo cambió algún nombre de transformación de tu canalización anterior, el servicio de Cloud Dataflow requiere una asignación de transformaciones. La asignación de transformaciones asigna las transformaciones nombradas en el código de tu canalización anterior a nombres en el código de tu canalización de reemplazo. Puedes pasar la asignación mediante la opción de línea de comandos --transform_name_mapping, con el siguiente formato general:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Solo debes proporcionar entradas de asignación en --transform_name_mapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Determina los nombres de las transformaciones

El nombre de la transformación en cada instancia en la asignación es el nombre que suministraste cuando aplicaste la transformación en tu canalización. Por ejemplo:

Java: SDK 2.x

.apply("FormatResults", ParDo
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

Python

| 'FormatResults' >> beam.ParDo(MyDoFn())

Java: SDK 1.x

.apply(ParDo
  .named("FormatResults")
  .of(new DoFn<KV<String, Long>>, String>() {
    ...
   }
}))

También puedes obtener los nombres de las transformaciones de tu trabajo anterior si examinas el grafo de ejecución del trabajo en la interfaz de supervisión de Cloud Dataflow:

Figura 2: El grafo de ejecución para una canalización de WordCount como se muestra en la interfaz de supervisión de Cloud Dataflow.

Denomina las transformaciones compuestas

Los nombres de transformaciones son jerárquicos: se basan en la jerarquía de transformaciones en tu canalización. Si tu canalización tiene una transformación compuesta, las transformaciones anidadas se nombran en términos de la transformación que las contiene. Por ejemplo, supongamos que tu canalización contiene una transformación compuesta llamada CountWidgets, que contiene una transformación interna llamada Parse. El nombre completo de la transformación interna será CountWidgets/Parse y debes especificar ese nombre completo en tu asignación de transformaciones.

Si tu canalización nueva asigna una transformación compuesta con un nombre diferente, se renombrarán todas las transformaciones anidadas de forma automática. Deberás especificar los nombres cambiados para las transformaciones internas en tu asignación de transformaciones.

Refactoriza la jerarquía de transformaciones

Si tu canalización de reemplazo usa una jerarquía de transformaciones diferente a la de tu canalización anterior (p. ej., debido a que refactorizaste tus transformaciones compuestas, o tu canalización depende de una transformación compuesta por una biblioteca que cambió), deberás declarar la asignación de forma explícita.

Por ejemplo, supongamos que tu canalización anterior aplicó una transformación compuesta, CountWidgets, que contenía una transformación interna llamada Parse. Ahora, supongamos que tu canalización de reemplazo refactoriza CountWidgets y anida Parse dentro de otra transformación llamada Scan. Para que tu actualización tenga éxito, debes asignar de forma explícita el nombre completo de la transformación de la canalización anterior (CountWidgets/Parse) al nombre de la transformación de la canalización nueva (CountWidgets/Scan/Parse):

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que nuestra canalización de reemplazo quita la transformación de CountWidgets/Parse en su totalidad:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que nuestra canalización de reemplazo quita la transformación de CountWidgets/Parse en su totalidad:

--transform_name_mapping={"CountWidgets/Parse":""}

Verifica la compatibilidad de trabajos

Cuando inicias tu trabajo de reemplazo, el servicio de Cloud Dataflow realiza una verificación de compatibilidad entre tu trabajo de reemplazo y tu trabajo anterior. Si se aprueba la verificación de compatibilidad, tu trabajo anterior se detendrá. A continuación, se iniciará tu trabajo de reemplazo en el servicio de Cloud Dataflow bajo el mismo nombre. Si la verificación de compatibilidad falla, tu trabajo anterior seguirá en ejecución en el servicio de Cloud Dataflow y tu trabajo de reemplazo mostrará un error.

Java

Python

La verificación de compatibilidad garantiza que el servicio de Cloud Dataflow pueda transferir datos de estado intermedios de los pasos en tu trabajo anterior a tu trabajo de reemplazo, como se especifica en la asignación de transformaciones que proporciones. La verificación de compatibilidad también garantiza que las PCollection de tu canalización usen los mismos codificadores. Cambiar un Coder puede ocasionar que falle la verificación de compatibilidad, ya que algunos datos en tránsito o registros almacenados en búfer pueden no estar serializados de forma correcta en la canalización de reemplazo.

Evita errores de compatibilidad

Algunas diferencias entre tu canalización anterior y tu canalización de reemplazo pueden ocasionar que falle la verificación de compatibilidad. Estas diferencias incluyen los siguientes puntos:

  • Cambiar el grafo de la canalización sin proporcionar una asignación. Cuando actualizas un trabajo, el servicio de Cloud Dataflow intenta hacer coincidir las transformaciones en tu trabajo anterior con las de tu trabajo de reemplazo a fin de transferir los datos de estado intermedios para cada paso. Si renombraste o quitaste pasos, deberás proporcionar una asignación de transformaciones con el fin de que Cloud Dataflow pueda hacer coincidir los datos de estado de forma apropiada.
  • Cambiar las entradas adicionales de un paso. Agregar entradas adicionales a una transformación de tu canalización de reemplazo o quitarlas de ella hará que falle la verificación de compatibilidad.
  • Cambiar el codificador de un paso. Cuando actualizas un trabajo, el servicio de Cloud Dataflow conserva los registros de datos almacenados en búfer en ese momento (por ejemplo, mientras se resuelve el sistema de ventanas) y los administra en el trabajo de reemplazo. Si el trabajo de reemplazo usa una codificación de datos diferente o incompatible, el servicio de Cloud Dataflow no podrá serializar o deserializar estos registros.
  • Quitaste una operación “con estado” de tu canalización. Tu trabajo de reemplazo puede fallar la verificación de compatibilidad de Cloud Dataflow si quitas ciertas operaciones con estado de tu canalización. El servicio de Cloud Dataflow puede fusionar múltiples pasos para lograr una mayor eficiencia. Si quitaste una operación dependiente del estado de un paso fusionado, la verificación fallará. Las operaciones con estado incluyen los siguientes elementos:

    • Transformaciones que producen o consumen entradas adicionales
    • Lecturas de E/S
    • Transformaciones que usan un estado clave
    • Transformaciones con ventanas combinadas
  • Intentas ejecutar tu trabajo de reemplazo en una zona geográfica diferente. Debes ejecutar tu trabajo de reemplazo en la misma zona en la que ejecutabas tu trabajo anterior.

¿Te sirvió esta página? Envíanos tu opinión:

Enviar comentarios sobre…

¿Necesitas ayuda? Visita nuestra página de asistencia.