Actualiza una canalización existente

En este documento, se describe cómo actualizar un trabajo de transmisión en curso. Recomendamos actualizar tu trabajo de Dataflow existente por los siguientes motivos:

  • Quieres mejorar tu código de canalización.
  • Quieres solucionar errores en el código de tu canalización.
  • Quieres actualizar tu canalización a fin de administrar cambios en el formato de datos o justificar la versión o cualquier otro tipo de cambios en tu fuente de datos.
  • Quieres aplicar un parche a una vulnerabilidad de seguridad relacionada con Container-Optimized OS para todos los trabajadores de Dataflow.
  • Quieres escalar una canalización de Apache Beam de transmisión para usar una cantidad diferente de trabajadores.

Puedes actualizar trabajos de dos maneras:

  • Actualización de trabajo en tránsito: Para los trabajos de transmisión que usan Streaming Engine, puedes actualizar las opciones de trabajo demin-num-workers y max-num-workers sin detener el trabajo ni cambiar el ID del mismo.
  • Trabajo de reemplazo: Para ejecutar el código de canalización actualizado o actualizar las opciones de trabajo que las actualizaciones de trabajo en tránsito no admiten, ejecuta un trabajo nuevo que reemplace el actual. Para verificar si un trabajo de reemplazo es válido, valida el gráfico de trabajoantes de iniciar el trabajo nuevo.

Cuando actualizas tu trabajo, el servicio de Dataflow realiza una verificación de compatibilidad entre tu trabajo que se encuentra ejecutado y tu trabajo de reemplazo potencial. La verificación de compatibilidad garantiza que se puedan transferir elementos como la información de estado intermedio y los datos almacenados en búfer de tu trabajo anterior a tu trabajo de reemplazo.

También puedes usar la infraestructura de registro incorporada del SDK de Apache Beam para registrar información cuando actualizas el trabajo. Para obtener más información, consulta Trabaja con registros de canalización. Para identificar problemas con el código de la canalización, usa el nivel de registro DEBUG.

Actualización de la opción de trabajo en tránsito

Para un trabajo de transmisión que usa Streaming Engine, puedes actualizar las siguientes opciones de trabajo sin detener el trabajo ni cambiar su ID:

  • min-num-workers: es la cantidad mínima de instancias de Compute Engine.
  • max-num-workers: es la cantidad máxima de instancias de Compute Engine.

Para otras actualizaciones de trabajos, debes reemplazar el trabajo actual con el trabajo actualizado. Para obtener más información, consulta Inicia un trabajo de reemplazo.

Realizar una actualización en tránsito

Para ejecutar una actualización de la opción de trabajo en tránsito, realiza los siguientes pasos.

gcloud

Usa el comando gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Reemplaza lo siguiente:

  • REGION: es el ID de la región del trabajo
  • JOB_ID: es el ID del trabajo que se actualizará

También puedes actualizar --min-num-workers y --max-num-workers de forma individual.

REST

Usa el método projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Reemplaza lo siguiente:

  • PROJECT_ID: el ID del proyecto de Google Cloud del trabajo de Dataflow
  • REGION: es el ID de la región del trabajo
  • JOB_ID: es el ID del trabajo que se actualizará
  • MINIMUM_WORKERS: es la cantidad mínima de instancias de Compute Engine
  • MAXIMUM_WORKERS: es la cantidad máxima de instancias de Compute Engine

También puedes actualizar min_num_workers y max_num_workers de forma individual. Especifica qué parámetros actualizar en el parámetro de búsqueda updateMask e incluye los valores actualizados en el campo runtimeUpdatableParams del cuerpo de la solicitud. En el siguiente ejemplo, se actualiza min_num_workers:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Un trabajo debe estar en estado de ejecución a fin de ser apto para las actualizaciones en tránsito. Se produce un error si el trabajo no se inició o ya se canceló. Del mismo modo, si inicias un trabajo de reemplazo, espera a que comience a ejecutarse antes de enviar cualquier actualización en tránsito al trabajo nuevo.

Después de enviar una solicitud de actualización, te recomendamos que esperes a que se complete antes de enviar otra actualización. Consulta los registros de trabajo para ver cuándo se completa la solicitud.

Valida un trabajo de reemplazo

Para verificar la validez de un trabajo de reemplazo, valida el gráfico del trabajo antes de iniciar el trabajo nuevo. En Dataflow, un gráfico del trabajo es una representación gráfica de una canalización. Al validar el gráfico del trabajo, reduces el riesgo de que la canalización encuentre errores o que falle luego de la actualización. Además, puedes validar las actualizaciones sin necesidad de detener el trabajo original, para que no experimente ningún tiempo de inactividad.

Para validar tu gráfico del trabajo, sigue los pasos para iniciar un trabajo de reemplazo. Incluye la opción de servicio de Dataflow graph_validate_only en el comando update.

Java

  • Pasa la opción --update.
  • Establece la opción --jobName en PipelineOptions, en el mismo nombre que el trabajo que quieres actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Incluye la opción de servicio --dataflowServiceOptions=graph_validate_only.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transformNameMapping.
  • Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura --updateCompatibilityVersion en la versión del SDK de Apache Beam que se usó en el trabajo original.

Python

  • Pasa la opción --update.
  • Establece la opción --job_name en PipelineOptions, en el mismo nombre que el trabajo que quieres actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Incluye la opción de servicio --dataflow_service_options=graph_validate_only.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform_name_mapping.
  • Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura --updateCompatibilityVersion en la versión del SDK de Apache Beam que se usó en el trabajo original.

Go

  • Pasa la opción --update.
  • Establece la opción --job_name en el mismo nombre que el trabajo que deseas actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Incluye la opción de servicio --dataflow_service_options=graph_validate_only.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform_name_mapping.

gcloud

Para validar el gráfico del trabajo para un trabajo de plantilla de Flex, usa el comando gcloud dataflow flex-template run con la opción additional-experiments:

  • Pasa la opción --update.
  • Configura JOB_NAME con el mismo nombre que el trabajo que deseas actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Incluye la opción --additional-experiments=graph_validate_only.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform-name-mappings.

Por ejemplo:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

Reemplaza JOB_NAME por el nombre del trabajo que deseas actualizar.

REST

Usa el campo additionalExperiments en el objeto FlexTemplateRuntimeEnvironment (plantillas de Flex) o RuntimeEnvironment.

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

La opción de servicio graph_validate_only solo valida las actualizaciones de canalización. No uses esta opción al crear o inicializar canalizaciones. Para actualizar tu canalización, inicia un trabajo de reemplazo sin la opción de servicio graph_validate_only.

Cuando se realiza la validación del gráfico del trabajo con éxito, tanto el estado del trabajo como sus registros muestran los siguientes estados:

  • El estado del trabajo es JOB_STATE_DONE.
  • En la consola de Google Cloud, el Estado del trabajo es Succeeded.
  • El siguiente mensaje aparece en los registros de trabajos:

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

Cuando falla la validación del gráfico del trabajo, el estado y los registros del mismo muestran los siguientes estados:

  • El estado del trabajo es JOB_STATE_FAILED.
  • En la consola de Google Cloud, el Estado del trabajo es Failed.
  • Aparecerá un mensaje que describe el error de incompatibilidad en los registros de trabajos. El contenido del mensaje depende del error.

Inicia un trabajo de reemplazo

Puedes reemplazar un trabajo existente por los siguientes motivos:

  • Para ejecutar el código de canalización actualizado.
  • Para actualizar las opciones de trabajo que no admiten actualizaciones en tránsito.

Para verificar la validez de un trabajo de reemplazo, valida el gráfico del trabajoantes de iniciar el trabajo nuevo.

Cuando inicies un trabajo de reemplazo, configura las siguientes opciones de canalización para realizar el proceso de actualización además de las opciones habituales del trabajo:

Java

  • Pasa la opción --update.
  • Establece la opción --jobName en PipelineOptions, en el mismo nombre que el trabajo que quieres actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transformNameMapping.
  • Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura --updateCompatibilityVersion en la versión del SDK de Apache Beam que se usó en el trabajo original.

Python

  • Pasa la opción --update.
  • Establece la opción --job_name en PipelineOptions, en el mismo nombre que el trabajo que quieres actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform_name_mapping.
  • Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura --updateCompatibilityVersion en la versión del SDK de Apache Beam que se usó en el trabajo original.

Go

  • Pasa la opción --update.
  • Establece la opción --job_name en el mismo nombre que el trabajo que deseas actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform_name_mapping.

gcloud

Para actualizar un trabajo de plantilla flexible con la CLI de gcloud, usa el gcloud dataflow flex-template run comando. No se admite la actualización de otros trabajos mediante la CLI de gcloud.

  • Pasa la opción --update.
  • Configura JOB_NAME con el mismo nombre que el trabajo que deseas actualizar.
  • Establece la opción --region en la misma región que la del trabajo que deseas actualizar.
  • Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción --transform-name-mappings.

REST

En estas instrucciones, se muestra cómo actualizar los trabajos que no son de plantillas a través de la API de REST. Para usar la API de REST para actualizar un trabajo de plantilla clásica, consulta Actualiza un trabajo de transmisión de una plantilla personalizada. Para usar la API de REST para actualizar un trabajo de plantilla de Flex, consulta Actualiza un trabajo de plantilla de Flex.

  1. Recupera el recurso job para el trabajo que deseas reemplazar a través del método projects.locations.jobs.get. Incluye el parámetro de consulta view con el valor JOB_VIEW_DESCRIPTION. Incluir JOB_VIEW_DESCRIPTION limita la cantidad de datos en la respuesta para que la solicitud posterior no exceda los límites de tamaño. Si necesitas información más detallada del trabajo, usa el valor JOB_VIEW_ALL.

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    Reemplaza los siguientes valores:

    • PROJECT_ID: el ID del proyecto de Google Cloud del trabajo de Dataflow
    • REGION: la región del trabajo que deseas actualizar
    • JOB_ID: el ID del trabajo que deseas actualizar
  2. Para actualizar el trabajo, usa el método projects.locations.jobs.create. En el cuerpo de la solicitud, usa el recurso job que recuperaste.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    Reemplaza lo siguiente:

    • JOB_ID: el mismo ID del trabajo que el ID del trabajo que deseas actualizar.
    • JOB_NAME: el mismo nombre del trabajo que el nombre del trabajo que deseas actualizar.

    Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con el campo transformNameMapping.

  3. Opcional: para enviar tu solicitud con curl (Linux, macOS o Cloud Shell), guarda la solicitud en un archivo JSON y, luego, ejecuta el siguiente comando:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    Reemplaza FILE_PATH por la ruta de acceso al archivo JSON que contiene el cuerpo de la solicitud.

Especifica el nombre de tu trabajo de reemplazo

Java

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --jobName debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

Python

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --job_name debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

Go

Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --job_name debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

gcloud

Cuando inicies tu trabajo de reemplazo, el JOB_NAME debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.

REST

Establece el valor del campo replaceJobId como el mismo ID del trabajo que el trabajo que deseas actualizar. Para encontrar el valor de nombre del trabajo correcto, selecciona tu trabajo anterior en la interfaz de supervisión de Dataflow. Luego, en el panel lateral Información del trabajo, busca el campo ID del trabajo.

Para encontrar el valor de nombre del trabajo correcto, selecciona tu trabajo anterior en la interfaz de supervisión de Dataflow. Luego, en el panel lateral Información del trabajo (Job info), busca el campo Nombre del trabajo (Job name):

El panel lateral de información del trabajo para un trabajo de Dataflow en ejecución.
Figura 1: El panel lateral de información del trabajo para un trabajo en ejecución de Dataflow con el campo Nombre del trabajo.

Como alternativa, consulta una lista de trabajos existentes mediante la interfaz de línea de comandos de Dataflow. Ingresa el comando gcloud dataflow jobs list en tu shell o ventana de la terminal a fin de obtener una lista de trabajos de Dataflow en tu proyecto de Google Cloud y busca el campo NAME para el trabajo que deseas reemplazar:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

Crea una asignación de transformaciones

Si tu canalización de reemplazo cambia algún nombre de transformación de los de tu canalización anterior, el servicio de Dataflow requiere una asignación de transformaciones. La asignación de transformaciones asigna las transformaciones nombradas en el código de tu canalización anterior a nombres en el código de tu canalización de reemplazo.

Java

Pasa la asignación mediante la opción de línea de comandos --transformNameMapping, con el siguiente formato general:

--transformNameMapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Solo debes proporcionar entradas de asignación en --transformNameMapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Cuando ejecutas con --transformNameMapping, quizás necesites escapar las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

Pasa la asignación mediante la opción de línea de comandos --transform_name_mapping, con el siguiente formato general:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Solo debes proporcionar entradas de asignación en --transform_name_mapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Cuando ejecutas con --transform_name_mapping, quizás necesites escapar las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Go

Pasa la asignación mediante la opción de línea de comandos --transform_name_mapping, con el siguiente formato general:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Solo debes proporcionar entradas de asignación en --transform_name_mapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Cuando ejecutas con --transform_name_mapping, quizás necesites escapar las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

Pasa la asignación a través de la opción --transform-name-mappings con el siguiente formato general:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Solo debes proporcionar entradas de asignación en --transform-name-mappings para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Cuando ejecutas con --transform-name-mappings, quizás necesites escapar las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

Pasa la asignación a través del campo transformNameMapping, con el siguiente formato general:

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

Solo debes proporcionar entradas de asignación en transformNameMapping para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.

Determina los nombres de las transformaciones

El nombre de la transformación en cada instancia en la asignación es el nombre que suministraste cuando aplicaste la transformación en tu canalización. Por ejemplo:

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Go

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

También puedes obtener los nombres de las transformaciones de tu trabajo anterior en el gráfico de ejecución del trabajo en la interfaz de supervisión de Dataflow:

El grafo de ejecución para una canalización de WordCount.
Figura 2: Grafo de ejecución para una canalización de WordCount como se muestra en la interfaz de supervisión de Dataflow.

Nombres de transformación compuestos

Los nombres de transformaciones son jerárquicos: se basan en la jerarquía de transformaciones en tu canalización. Si tu canalización tiene una transformación compuesta, las transformaciones anidadas se nombran en términos de la transformación que las contiene. Por ejemplo, supongamos que tu canalización contiene una transformación compuesta llamada CountWidgets, que contiene una transformación interna llamada Parse. El nombre completo de tu transformación es CountWidgets/Parse y debes especificar ese nombre completo en tu asignación de transformaciones.

Si tu canalización nueva asigna una transformación compuesta con un nombre diferente, se renombrarán todas las transformaciones anidadas de forma automática. Debes especificar los nombres cambiados para las transformaciones internas en tu asignación de transformaciones.

Refactoriza la jerarquía de transformaciones

Si tu canalización de reemplazo usa una jerarquía de transformaciones diferente de la de tu canalización anterior, debes declarar la asignación de forma explícita. Es posible que tengas una jerarquía de transformaciones diferente porque refactorizaste tus transformaciones compuestas, o tu canalización depende de una transformación compuesta por una biblioteca que cambió.

Por ejemplo, tu canalización anterior aplicó una transformación compuesta, CountWidgets, que contenía una transformación interna llamada Parse. La canalización de reemplazo refactoriza CountWidgets y anida Parse dentro de otra transformación llamada Scan. Para que tu actualización tenga éxito, debes asignar de forma explícita el nombre completo de la transformación en la canalización anterior (CountWidgets/Parse) al nombre de la transformación en la canalización nueva (CountWidgets/Scan/Parse):

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse en su totalidad:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse en su totalidad:

--transform_name_mapping={"CountWidgets/Parse":""}

Go

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse en su totalidad:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse en su totalidad:

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse en su totalidad:

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

Los efectos de reemplazar un trabajo

Cuando reemplazas un trabajo existente, uno nuevo ejecuta el código de la canalización actualizada. El servicio de Dataflow retiene el nombre del trabajo, pero ejecuta el trabajo de reemplazo con un ID de trabajo actualizado. Este proceso puede causar tiempo de inactividad mientras se detiene el trabajo existente, se ejecuta la verificación de compatibilidad y comienza el trabajo nuevo.

El trabajo de reemplazo conserva los siguientes elementos:

Datos de estado intermedios

Se conservan los datos de estado intermedios del trabajo anterior. En los datos de estado, no se incluyen las cachés en la memoria. Si deseas conservar los datos de la caché en la memoria cuando actualizas tu canalización, como solución alternativa, refactoriza tu canalización para convertir las cachés en datos de estado o entradas complementarias. Si deseas obtener más información sobre las entradas complementarias, consulta Patrones de entrada complementaria en la documentación de Apache Beam.

Las canalizaciones de transmisión tienen límites de tamaño para ValueState y entradas complementarias. Por lo tanto, si tienes memorias caché grandes que deseas conservar, es posible que debas usar almacenamiento externo, como Memorystore o Bigtable.

Datos en tránsito

Las transformaciones en tu canalización nueva procesarán los datos “en tránsito”. Sin embargo, las transformaciones adicionales que agregas a tu código de la canalización de reemplazo pueden o no tener efecto según la ubicación en la que se almacenan los registros. En este ejemplo, la canalización existente tiene las siguientes transformaciones:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Go

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

Puedes reemplazar tu trabajo por códigos de canalización nuevos, como se indica a continuación:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Go

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

Si bien agregaste una transformación para filtrar las strings que comienzan con la letra “A”, la siguiente transformación (FormatStrings) quizás vea strings en tránsito o almacenadas en búfer que comienzan con “A” transferidas del trabajo anterior.

Cambia el sistema de ventanas

Puedes cambiar las estrategias de renderización en ventanas y de activadores para los elementos PCollection en tu canalización de reemplazo, pero ten cuidado. Los cambios en las estrategias de activadores y del sistema de ventanas no afectan los datos que ya están almacenados en búfer o en tránsito.

Recomendamos que intentes realizar solo cambios pequeños en el sistema de ventanas de tu canalización, como cambiar la duración de ventanas fijas o de período variable. Realizar cambios grandes en el sistema de ventanas o activadores, como cambiar el algoritmo del sistema de ventanas, puede tener un impacto impredecible en el resultado de tu canalización.

Verifica la compatibilidad de trabajos

Cuando inicias tu trabajo de reemplazo, el servicio de Dataflow realiza una verificación de compatibilidad entre tu trabajo de reemplazo y tu trabajo anterior. Si se aprueba la verificación de compatibilidad, tu trabajo anterior se detiene. A continuación, se inicia tu trabajo de reemplazo en el servicio de Dataflow bajo el mismo nombre. Si la verificación de compatibilidad falla, tu trabajo anterior continúa en ejecución en el servicio de Dataflow y tu trabajo de reemplazo muestra un error.

Java

Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:

  1. Usa pipeline.run().waitUntilFinish() en el código de tu canalización.
  2. Ejecuta el programa de tu canalización de reemplazo con la opción --update.
  3. Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
  4. Usa Ctrl+C para salir del proceso de ejecución de bloqueo.

De manera alternativa, puedes supervisar el estado de tu trabajo de reemplazo en la interfaz de supervisión de Dataflow. Si tu trabajo se inició con éxito, también aprobó la verificación de compatibilidad.

Python

Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:

  1. Usa pipeline.run().wait_until_finish() en el código de tu canalización.
  2. Ejecuta el programa de tu canalización de reemplazo con la opción --update.
  3. Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
  4. Usa Ctrl+C para salir del proceso de ejecución de bloqueo.

De manera alternativa, puedes supervisar el estado de tu trabajo de reemplazo en la interfaz de supervisión de Dataflow. Si tu trabajo se inició con éxito, también aprobó la verificación de compatibilidad.

Go

Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. En particular, debes especificar la ejecución sin bloqueo a través de las marcas --execute_async o --async. La solución alternativa actual consta de los siguientes pasos:

  1. Ejecuta el programa de tu canalización de reemplazo con la opción --update y sin las marcas --execute_async o --async.
  2. Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
  3. Usa Ctrl+C para salir del proceso de ejecución de bloqueo.

gcloud

Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:

  1. Para las canalizaciones de Java, usa pipeline.run().waitUntilFinish() en el código de tu canalización. Para las canalizaciones de Python, usa pipeline.run().wait_until_finish() en el código de tu canalización. Para las canalizaciones de Go, sigue los pasos de la pestaña Go.
  2. Ejecuta el programa de tu canalización de reemplazo con la opción --update.
  3. Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
  4. Usa Ctrl+C para salir del proceso de ejecución de bloqueo.

REST

Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:

  • Para las canalizaciones de Java, usa pipeline.run().waitUntilFinish() en el código de tu canalización. Para las canalizaciones de Python, usa pipeline.run().wait_until_finish() en el código de tu canalización. Para las canalizaciones de Go, sigue los pasos de la pestaña Go.
  • Ejecuta el programa de tu canalización de reemplazo con el campo replaceJobId.
  • Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
  • Usa Ctrl+C para salir del proceso de ejecución de bloqueo.

La verificación de compatibilidad usa la asignación de transformaciones proporcionada para garantizar que Dataflow pueda transferir datos de estado intermedios de los pasos en tu trabajo anterior a tu trabajo de reemplazo. La verificación de compatibilidad también garantiza que los PCollection en tu canalización usen los mismos codificadores. Cambiar un Coder puede ocasionar que falle la verificación de compatibilidad, ya que algunos datos en tránsito o registros almacenados en búfer pueden no estar serializados de forma correcta en la canalización de reemplazo.

Evita errores de compatibilidad

Algunas diferencias entre tu canalización anterior y tu canalización de reemplazo pueden ocasionar que falle la verificación de compatibilidad. Estas diferencias incluyen los siguientes puntos:

  • Cambiar el grafo de la canalización sin proporcionar una asignación. Cuando actualizas un trabajo, Dataflow intenta hacer coincidir las transformaciones en tu trabajo anterior con las transformaciones en el trabajo de reemplazo. Este proceso de coincidencia ayuda a Dataflow a transferir datos de estado intermedios para cada paso. Si cambias el nombre o quitas pasos, debes proporcionar una asignación de transformación para que Dataflow pueda hacer coincidir los datos de estado según corresponda.
  • Cambiar las entradas adicionales de un paso. Agregar entradas adicionales a una transformación de tu canalización de reemplazo o quitarlas de ella hará que falle la verificación de compatibilidad.
  • Cambiar el codificador de un paso. Cuando actualizas un trabajo, Dataflow conserva los registros de datos almacenados en búfer y los administra en el trabajo de reemplazo. Por ejemplo, los datos almacenados en búfer pueden ocurrir mientras se resuelve la renderización en ventanas. Si el trabajo de reemplazo usa una codificación de datos diferente o incompatible, Dataflow no podrá serializar o deserializar estos registros.
  • Quita una operación “con estado” de tu canalización. Si quitas las operaciones con estado de tu canalización, es posible que el trabajo de reemplazo falle en la verificación de compatibilidad. Dataflow puede fusionar varios pasos para lograr una mayor eficiencia. Si quitas una operación dependiente del estado de un paso fusionado, la verificación fallará. Las operaciones con estado incluyen los siguientes elementos:

    • Transformaciones que producen o consumen entradas adicionales
    • Lecturas de E/S
    • Transformaciones que usan un estado clave
    • Transformaciones con ventanas combinadas
  • Cambiar variables con estado DoFn. En los trabajos de transmisión en curso, si la canalización incluye DoFn con estado, cambiar las variables DoFn con estado puede hacer que la canalización falle.

  • Intentar ejecutar tu trabajo de reemplazo en una zona geográfica diferente. Ejecuta tu trabajo de reemplazo en la misma zona en la que ejecutaste tu trabajo anterior.

Actualiza esquemas

Apache Beam permite que las PCollection tengan esquemas con campos con nombre, en cuyo caso no se necesitan codificadores explícitos. Si los nombres y tipos de campo de un esquema determinado no se modifican (incluidos los campos anidados), ese esquema no hará que falle la verificación de actualización. Sin embargo, la actualización aún puede bloquearse si otros segmentos de la canalización nueva no son compatibles.

Esquemas de evolución

A menudo, es necesario mejorar el esquema de una PCollection debido a los requisitos empresariales cambiantes. El servicio de Dataflow permite realizar los siguientes cambios en un esquema cuando se actualiza la canalización:

  • Agregar uno o más campos nuevos a un esquema, incluidos los campos anidados
  • Hacer que un tipo de campo obligatorio (no anulable) sea opcional (anulable).

No se permite cambiar campos, quitar nombres de campos o cambiar tipos de campos durante la actualización.

Pasa datos adicionales a una operación ParDo existente

Puedes pasar datos adicionales (fuera de banda) a una operación ParDo existente mediante uno de los siguientes métodos, según tu caso práctico:

  • Serializa la información como campos en tu subclase DoFn.
  • Cualquier variable a la que se haga referencia a través de los métodos en una DoFn anónima se serializa de forma automática.
  • Calcula los datos dentro de DoFn.startBundle().
  • Pasa datos mediante ParDo.withSideInputs.

Si deseas obtener más información, consulta las siguientes páginas: