En este documento, se describe cómo actualizar un trabajo de transmisión en curso. Recomendamos actualizar tu trabajo de Dataflow existente por los siguientes motivos:
- Quieres mejorar tu código de canalización.
- Quieres solucionar errores en el código de tu canalización.
- Quieres actualizar tu canalización a fin de administrar cambios en el formato de datos o justificar la versión o cualquier otro tipo de cambios en tu fuente de datos.
- Quieres aplicar un parche a una vulnerabilidad de seguridad relacionada con Container-Optimized OS para todos los trabajadores de Dataflow.
- Quieres escalar una canalización de Apache Beam de transmisión para usar una cantidad diferente de trabajadores.
Puedes actualizar trabajos de dos maneras:
- Actualización de trabajo en tránsito: Para los trabajos de transmisión que usan Streaming Engine, puedes actualizar las opciones de trabajo de
min-num-workers
ymax-num-workers
sin detener el trabajo ni cambiar el ID del mismo. - Trabajo de reemplazo: Para ejecutar el código de canalización actualizado o actualizar las opciones de trabajo que las actualizaciones de trabajo en tránsito no admiten, ejecuta un trabajo nuevo que reemplace el actual. Para verificar si un trabajo de reemplazo es válido, valida el gráfico de trabajoantes de iniciar el trabajo nuevo.
Cuando actualizas tu trabajo, el servicio de Dataflow realiza una verificación de compatibilidad entre tu trabajo que se encuentra ejecutado y tu trabajo de reemplazo potencial. La verificación de compatibilidad garantiza que se puedan transferir elementos como la información de estado intermedio y los datos almacenados en búfer de tu trabajo anterior a tu trabajo de reemplazo.
También puedes usar la infraestructura de registro incorporada del SDK de Apache Beam para registrar información cuando actualizas el trabajo. Para obtener más información, consulta Trabaja con registros de canalización.
Para identificar problemas con el código de la canalización, usa el
nivel de registro DEBUG
.
- Para obtener instrucciones sobre cómo actualizar trabajos de transmisión que usan plantillas clásicas, consulta Actualiza un trabajo de transmisión de una plantilla personalizada.
- Para obtener instrucciones sobre cómo actualizar trabajos de transmisión que usan plantillas de Flex, sigue las instrucciones de gcloud CLI en esta página o consulta Actualiza un trabajo de plantilla de Flex.
Actualización de la opción de trabajo en tránsito
Para un trabajo de transmisión que usa Streaming Engine, puedes actualizar las siguientes opciones de trabajo sin detener el trabajo ni cambiar su ID:
min-num-workers
: es la cantidad mínima de instancias de Compute Engine.max-num-workers
: es la cantidad máxima de instancias de Compute Engine.worker-utilization-hint
: es el uso de CPU objetivo, en el rango [0.1, 0.9].
Para otras actualizaciones de trabajos, debes reemplazar el trabajo actual con el trabajo actualizado. Para obtener más información, consulta Inicia un trabajo de reemplazo.
Realizar una actualización en tránsito
Para ejecutar una actualización de la opción de trabajo en tránsito, realiza los siguientes pasos.
gcloud
Usa el comando gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Reemplaza lo siguiente:
- REGION: es el ID de la región del trabajo
- MINIMUM_WORKERS: es la cantidad mínima de instancias de Compute Engine
- MAXIMUM_WORKERS: es la cantidad máxima de instancias de Compute Engine
- TARGET_UTILIZATION: un valor en el rango [0.1, 0.9]
- JOB_ID: es el ID del trabajo que se actualizará
También puedes actualizar --min-num-workers
, --max-num-workers
y worker-utilization-hint
de forma individual.
REST
Usa el método projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS, "worker_utilization_hint": TARGET_UTILIZATION } }
Reemplaza lo siguiente:
- MASK: Es una lista de parámetros separados por comas que se deben actualizar, según lo siguiente:
runtime_updatable_params.max_num_workers
runtime_updatable_params.min_num_workers
runtime_updatable_params.worker_utilization_hint
- PROJECT_ID: el ID del proyecto de Google Cloud del trabajo de Dataflow
- REGION: es el ID de la región del trabajo
- JOB_ID: es el ID del trabajo que se actualizará
- MINIMUM_WORKERS: es la cantidad mínima de instancias de Compute Engine
- MAXIMUM_WORKERS: es la cantidad máxima de instancias de Compute Engine
- TARGET_UTILIZATION: un valor en el rango [0.1, 0.9]
También puedes actualizar min_num_workers
, max_num_workers
y worker_utilization_hint
de forma individual.
Especifica qué parámetros actualizar en el parámetros de consulta updateMask
e incluye los valores actualizados en el campo runtimeUpdatableParams
del cuerpo de la solicitud. En el siguiente ejemplo, se actualiza min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Un trabajo debe estar en estado de ejecución a fin de ser apto para las actualizaciones en tránsito. Se produce un error si el trabajo no se inició o ya se canceló. Del mismo modo, si inicias un trabajo de reemplazo, espera a que comience a ejecutarse antes de enviar cualquier actualización en tránsito al trabajo nuevo.
Después de enviar una solicitud de actualización, te recomendamos que esperes a que se complete antes de enviar otra actualización. Consulta los registros de trabajo para ver cuándo se completa la solicitud.
Valida un trabajo de reemplazo
Para verificar la validez de un trabajo de reemplazo, valida el gráfico del trabajo antes de iniciar el trabajo nuevo. En Dataflow, un gráfico del trabajo es una representación gráfica de una canalización. Al validar el gráfico del trabajo, reduces el riesgo de que la canalización encuentre errores o que falle luego de la actualización. Además, puedes validar las actualizaciones sin necesidad de detener el trabajo original, para que no experimente ningún tiempo de inactividad.
Para validar tu gráfico del trabajo, sigue los pasos para iniciar un trabajo de reemplazo. Incluye la opción de servicio de Dataflow graph_validate_only
en el comando update.
Java
- Pasa la opción
--update
. - Establece la opción
--jobName
enPipelineOptions
, en el mismo nombre que el trabajo que quieres actualizar. - Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Incluye la opción de servicio
--dataflowServiceOptions=graph_validate_only
. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transformNameMapping
. - Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura
--updateCompatibilityVersion
en la versión del SDK de Apache Beam que se usó en el trabajo original.
Python
- Pasa la opción
--update
. - Establece la opción
--job_name
enPipelineOptions
, en el mismo nombre que el trabajo que quieres actualizar. - Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Incluye la opción de servicio
--dataflow_service_options=graph_validate_only
. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transform_name_mapping
. - Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura
--updateCompatibilityVersion
en la versión del SDK de Apache Beam que se usó en el trabajo original.
Go
- Pasa la opción
--update
. - Establece la opción
--job_name
en el mismo nombre que el trabajo que deseas actualizar. - Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Incluye la opción de servicio
--dataflow_service_options=graph_validate_only
. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transform_name_mapping
.
gcloud
Para validar el gráfico del trabajo para un trabajo de plantilla de Flex, usa el
comando gcloud dataflow flex-template run
con la opción additional-experiments
:
- Pasa la opción
--update
. - Configura JOB_NAME con el mismo nombre que el trabajo que deseas actualizar.
- Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Incluye la opción
--additional-experiments=graph_validate_only
. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transform-name-mappings
.
Por ejemplo:
gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only
Reemplaza JOB_NAME por el nombre del trabajo que deseas actualizar.
REST
Usa el campo additionalExperiments
en el
objeto FlexTemplateRuntimeEnvironment
(plantillas de Flex) o
RuntimeEnvironment
.
{
additionalExperiments : ["graph_validate_only"]
...
}
La opción de servicio graph_validate_only
solo valida las actualizaciones de canalización. No uses esta opción al crear o inicializar canalizaciones. Para actualizar tu canalización, inicia un trabajo de reemplazo sin la opción de servicio graph_validate_only
.
Cuando se realiza la validación del gráfico del trabajo con éxito, tanto el estado del trabajo como sus registros muestran los siguientes estados:
- El estado del trabajo es
JOB_STATE_DONE
. - En la consola de Google Cloud, el Estado del trabajo es
Succeeded
. El siguiente mensaje aparece en los registros de trabajos:
Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
Cuando falla la validación del gráfico del trabajo, el estado y los registros del mismo muestran los siguientes estados:
- El estado del trabajo es
JOB_STATE_FAILED
. - En la consola de Google Cloud, el Estado del trabajo es
Failed
. - Aparecerá un mensaje que describe el error de incompatibilidad en los registros de trabajos. El contenido del mensaje depende del error.
Inicia un trabajo de reemplazo
Puedes reemplazar un trabajo existente por los siguientes motivos:
- Para ejecutar el código de canalización actualizado.
- Para actualizar las opciones de trabajo que no admiten actualizaciones en tránsito.
Para verificar la validez de un trabajo de reemplazo, valida el gráfico del trabajoantes de iniciar el trabajo nuevo.
Cuando inicies un trabajo de reemplazo, configura las siguientes opciones de canalización para realizar el proceso de actualización además de las opciones habituales del trabajo:
Java
- Pasa la opción
--update
. - Establece la opción
--jobName
enPipelineOptions
, en el mismo nombre que el trabajo que quieres actualizar. - Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transformNameMapping
. - Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura
--updateCompatibilityVersion
en la versión del SDK de Apache Beam que se usó en el trabajo original.
Python
- Pasa la opción
--update
. - Establece la opción
--job_name
enPipelineOptions
, en el mismo nombre que el trabajo que quieres actualizar. - Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transform_name_mapping
. - Si envías un trabajo de reemplazo que usa una versión posterior del SDK de Apache Beam, configura
--updateCompatibilityVersion
en la versión del SDK de Apache Beam que se usó en el trabajo original.
Go
- Pasa la opción
--update
. - Establece la opción
--job_name
en el mismo nombre que el trabajo que deseas actualizar. - Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transform_name_mapping
.
gcloud
Para actualizar un trabajo de plantilla flexible con la CLI de gcloud, usa el
gcloud dataflow flex-template run
comando. No se admite
la actualización de otros trabajos mediante la CLI de gcloud.
- Pasa la opción
--update
. - Configura JOB_NAME con el mismo nombre que el trabajo que deseas actualizar.
- Establece la opción
--region
en la misma región que la del trabajo que deseas actualizar. - Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con la opción
--transform-name-mappings
.
REST
En estas instrucciones, se muestra cómo actualizar los trabajos que no son de plantillas a través de la API de REST. Para usar la API de REST para actualizar un trabajo de plantilla clásica, consulta Actualiza un trabajo de transmisión de una plantilla personalizada. Para usar la API de REST para actualizar un trabajo de plantilla de Flex, consulta Actualiza un trabajo de plantilla de Flex.
Recupera el recurso
job
para el trabajo que deseas reemplazar a través del métodoprojects.locations.jobs.get
. Incluye el parámetro de consultaview
con el valorJOB_VIEW_DESCRIPTION
. IncluirJOB_VIEW_DESCRIPTION
limita la cantidad de datos en la respuesta para que la solicitud posterior no exceda los límites de tamaño. Si necesitas información más detallada del trabajo, usa el valorJOB_VIEW_ALL
.GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
Reemplaza los siguientes valores:
- PROJECT_ID: el ID del proyecto de Google Cloud del trabajo de Dataflow
- REGION: la región del trabajo que deseas actualizar
- JOB_ID: el ID del trabajo que deseas actualizar
Para actualizar el trabajo, usa el método
projects.locations.jobs.create
. En el cuerpo de la solicitud, usa el recursojob
que recuperaste.POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs { "id": JOB_ID, "replaceJobId": JOB_ID, "name": JOB_NAME, "type": "JOB_TYPE_STREAMING", "transformNameMapping": { string: string, ... }, }
Reemplaza lo siguiente:
- JOB_ID: el mismo ID del trabajo que el ID del trabajo que deseas actualizar.
- JOB_NAME: el mismo nombre del trabajo que el nombre del trabajo que deseas actualizar.
Si cambió algún nombre de transformación en tu canalización, debes suministrar una asignación de transformación y pasarla con el campo
transformNameMapping
.Opcional: para enviar tu solicitud con curl (Linux, macOS o Cloud Shell), guarda la solicitud en un archivo JSON y, luego, ejecuta el siguiente comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
Reemplaza FILE_PATH por la ruta de acceso al archivo JSON que contiene el cuerpo de la solicitud.
Especifica el nombre de tu trabajo de reemplazo
Java
Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --jobName
debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.
Python
Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --job_name
debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.
Go
Cuando inicies tu trabajo de reemplazo, el valor que apruebes para la opción --job_name
debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.
gcloud
Cuando inicies tu trabajo de reemplazo, el JOB_NAME debe coincidir exactamente con el nombre del trabajo que deseas reemplazar.
REST
Establece el valor del campo replaceJobId
como el mismo ID del trabajo que el trabajo que deseas
actualizar. Para encontrar el valor de nombre del trabajo correcto, selecciona tu trabajo anterior en la
interfaz de supervisión de Dataflow.
Luego, en el panel lateral Información del trabajo, busca el campo ID del trabajo.
Para encontrar el valor de nombre del trabajo correcto, selecciona tu trabajo anterior en la interfaz de supervisión de Dataflow. Luego, en el panel lateral Información del trabajo (Job info), busca el campo Nombre del trabajo (Job name):
Como alternativa, consulta una lista de trabajos existentes mediante la interfaz de línea de comandos de Dataflow.
Ingresa el comando gcloud dataflow jobs list
en tu shell o ventana de la terminal a fin de obtener una lista de trabajos de Dataflow en tu proyecto de Google Cloud y busca el campo NAME
para el trabajo que deseas reemplazar:
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
Crea una asignación de transformaciones
Si tu canalización de reemplazo cambia algún nombre de transformación de los de tu canalización anterior, el servicio de Dataflow requiere una asignación de transformaciones. La asignación de transformaciones asigna las transformaciones nombradas en el código de tu canalización anterior a nombres en el código de tu canalización de reemplazo.
Java
Pasa la asignación mediante la opción de línea de comandos --transformNameMapping
, con el siguiente formato general:
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Solo debes proporcionar entradas de asignación en --transformNameMapping
para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.
Cuando ejecutas con --transformNameMapping
,
quizás necesites escapar
las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:
--transformNameMapping='{"oldTransform1":"newTransform1",...}'
Python
Pasa la asignación mediante la opción de línea de comandos --transform_name_mapping
, con el siguiente formato general:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Solo debes proporcionar entradas de asignación en --transform_name_mapping
para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.
Cuando ejecutas con --transform_name_mapping
,
quizás necesites escapar
las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
Go
Pasa la asignación mediante la opción de línea de comandos --transform_name_mapping
, con el siguiente formato general:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Solo debes proporcionar entradas de asignación en --transform_name_mapping
para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.
Cuando ejecutas con --transform_name_mapping
,
quizás necesites escapar
las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
gcloud
Pasa la asignación a través de la opción
--transform-name-mappings
con el siguiente formato general:
--transform-name-mappings= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Solo debes proporcionar entradas de asignación en --transform-name-mappings
para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.
Cuando ejecutas con --transform-name-mappings
,
quizás necesites escapar las comillas de forma adecuada para tu shell. Por ejemplo, de la siguiente manera en Bash:
--transform-name-mappings='{"oldTransform1":"newTransform1",...}'
REST
Pasa la asignación a través del campo
transformNameMapping
, con el siguiente formato general:
"transformNameMapping": {
oldTransform1: newTransform1,
oldTransform2: newTransform2,
...
}
Solo debes proporcionar entradas de asignación en transformNameMapping
para los nombres de transformaciones que cambiaron entre tu canalización anterior y tu canalización de reemplazo.
Determina los nombres de las transformaciones
El nombre de la transformación en cada instancia en la asignación es el nombre que suministraste cuando aplicaste la transformación en tu canalización. Por ejemplo:
Java
.apply("FormatResults", ParDo
.of(new DoFn<KV<String, Long>>, String>() {
...
}
}))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Go
// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)
También puedes obtener los nombres de las transformaciones de tu trabajo anterior en el gráfico de ejecución del trabajo en la interfaz de supervisión de Dataflow:
Nombres de transformación compuestos
Los nombres de transformaciones son jerárquicos: se basan en la jerarquía de transformaciones en tu canalización. Si tu canalización tiene una transformación compuesta, las transformaciones anidadas se nombran en términos de la transformación que las contiene. Por ejemplo, supongamos que tu canalización contiene una transformación compuesta llamada CountWidgets
, que contiene una transformación interna llamada Parse
. El nombre completo de tu transformación es CountWidgets/Parse
y debes especificar ese nombre completo en tu asignación de transformaciones.
Si tu canalización nueva asigna una transformación compuesta con un nombre diferente, se renombrarán todas las transformaciones anidadas de forma automática. Debes especificar los nombres cambiados para las transformaciones internas en tu asignación de transformaciones.
Refactoriza la jerarquía de transformaciones
Si tu canalización de reemplazo usa una jerarquía de transformaciones diferente de la de tu canalización anterior, debes declarar la asignación de forma explícita. Es posible que tengas una jerarquía de transformaciones diferente porque refactorizaste tus transformaciones compuestas, o tu canalización depende de una transformación compuesta por una biblioteca que cambió.
Por ejemplo, tu canalización anterior aplicó una transformación compuesta, CountWidgets
, que contenía una transformación interna llamada Parse
. La canalización de reemplazo refactoriza CountWidgets
y anida Parse
dentro de otra transformación llamada Scan
. Para que tu actualización tenga éxito, debes asignar de forma explícita el nombre completo de la transformación en la canalización anterior (CountWidgets/Parse
) al nombre de la transformación en la canalización nueva (CountWidgets/Scan/Parse
):
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse
en su totalidad:
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse
en su totalidad:
--transform_name_mapping={"CountWidgets/Parse":""}
Go
--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}
Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse
en su totalidad:
--transform_name_mapping={"CountWidgets/main.Parse":""}
gcloud
--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse
en su totalidad:
--transform-name-mappings={"CountWidgets/main.Parse":""}
REST
"transformNameMapping": {
CountWidgets/Parse: CountWidgets/Scan/Parse
}
Si borras una transformación en su totalidad en tu canalización de reemplazo, debes proporcionar una asignación nula. Supongamos que tu canalización de reemplazo quita la transformación CountWidgets/Parse
en su totalidad:
"transformNameMapping": {
CountWidgets/main.Parse: null
}
Los efectos de reemplazar un trabajo
Cuando reemplazas un trabajo existente, uno nuevo ejecuta el código de la canalización actualizada. El servicio de Dataflow retiene el nombre del trabajo, pero ejecuta el trabajo de reemplazo con un ID de trabajo actualizado. Este proceso puede causar tiempo de inactividad mientras se detiene el trabajo existente, se ejecuta la verificación de compatibilidad y comienza el trabajo nuevo.
El trabajo de reemplazo conserva los siguientes elementos:
- Datos de estado intermedio del trabajo anterior. Las cachés en memoria no se guardan.
- Registros de datos almacenados en búfer o metadatos actualmente “en tránsito” del trabajo anterior. Por ejemplo, algunos registros en tu canalización pueden almacenarse en búfer mientras se resuelve una ventana.
- Actualizaciones de opciones de trabajos en tránsito que aplicaste al trabajo anterior.
Datos de estado intermedios
Se conservan los datos de estado intermedios del trabajo anterior. En los datos de estado, no se incluyen las cachés en la memoria. Si deseas conservar los datos de la caché en la memoria cuando actualizas tu canalización, como solución alternativa, refactoriza tu canalización para convertir las cachés en datos de estado o entradas complementarias. Si deseas obtener más información sobre las entradas complementarias, consulta Patrones de entradas complementarias en la documentación de Apache Beam.
Las canalizaciones de transmisión tienen límites de tamaño para ValueState
y entradas complementarias.
Por lo tanto, si tienes memorias caché grandes que deseas conservar, es posible que debas usar almacenamiento externo, como Memorystore o Bigtable.
Datos en tránsito
Las transformaciones en tu canalización nueva procesarán los datos “en tránsito”. Sin embargo, las transformaciones adicionales que agregas a tu código de la canalización de reemplazo pueden o no tener efecto según la ubicación en la que se almacenan los registros. En este ejemplo, la canalización existente tiene las siguientes transformaciones:
Java
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, FormatStrings)
Puedes reemplazar tu trabajo por códigos de canalización nuevos, como se indica a continuación:
Java
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, RemoveStringsStartingWithA) beam.ParDo(s, FormatStrings)
Si bien agregaste una transformación para filtrar las strings que comienzan con la letra “A”, la siguiente transformación (FormatStrings
) quizás vea strings en tránsito o almacenadas en búfer que comienzan con “A” transferidas del trabajo anterior.
Cambia el sistema de ventanas
Puedes cambiar las estrategias de renderización en ventanas y de activadores para los elementos PCollection
en tu canalización de reemplazo, pero ten cuidado.
Los cambios en las estrategias de activadores y del sistema de ventanas no afectan los datos que ya están almacenados en búfer o en tránsito.
Recomendamos que intentes realizar solo cambios pequeños en el sistema de ventanas de tu canalización, como cambiar la duración de ventanas fijas o de período variable. Realizar cambios grandes en el sistema de ventanas o activadores, como cambiar el algoritmo del sistema de ventanas, puede tener un impacto impredecible en el resultado de tu canalización.
Verifica la compatibilidad de trabajos
Cuando inicias tu trabajo de reemplazo, el servicio de Dataflow realiza una verificación de compatibilidad entre tu trabajo de reemplazo y tu trabajo anterior. Si se aprueba la verificación de compatibilidad, tu trabajo anterior se detiene. A continuación, se inicia tu trabajo de reemplazo en el servicio de Dataflow bajo el mismo nombre. Si la verificación de compatibilidad falla, tu trabajo anterior continúa en ejecución en el servicio de Dataflow y tu trabajo de reemplazo muestra un error.
Java
Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:
- Usa pipeline.run().waitUntilFinish() en el código de tu canalización.
- Ejecuta el programa de tu canalización de reemplazo con la opción
--update
. - Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
- Usa
Ctrl+C
para salir del proceso de ejecución de bloqueo.
De manera alternativa, puedes supervisar el estado de tu trabajo de reemplazo en la interfaz de supervisión de Dataflow. Si tu trabajo se inició con éxito, también aprobó la verificación de compatibilidad.
Python
Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:
- Usa pipeline.run().wait_until_finish() en el código de tu canalización.
- Ejecuta el programa de tu canalización de reemplazo con la opción
--update
. - Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
- Usa
Ctrl+C
para salir del proceso de ejecución de bloqueo.
De manera alternativa, puedes supervisar el estado de tu trabajo de reemplazo en la interfaz de supervisión de Dataflow. Si tu trabajo se inició con éxito, también aprobó la verificación de compatibilidad.
Go
Debido a una limitación, debes usar la ejecución de
bloqueo para ver los errores de intento de actualización en tu consola o terminal.
En particular, debes especificar la ejecución sin bloqueo a través de las
marcas --execute_async
o --async
. La solución alternativa actual consta de los siguientes pasos:
- Ejecuta el programa de tu canalización de reemplazo con la opción
--update
y sin las marcas--execute_async
o--async
. - Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
- Usa
Ctrl+C
para salir del proceso de ejecución de bloqueo.
gcloud
Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:
- Para las canalizaciones de Java, usa pipeline.run().waitUntilFinish() en el código de tu canalización. Para las canalizaciones de Python, usa pipeline.run().wait_until_finish() en el código de tu canalización. Para las canalizaciones de Go, sigue los pasos de la pestaña Go.
- Ejecuta el programa de tu canalización de reemplazo con la opción
--update
. - Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
- Usa
Ctrl+C
para salir del proceso de ejecución de bloqueo.
REST
Debido a una limitación, debes usar la ejecución de bloqueo para ver los errores de intento de actualización en tu consola o terminal. La solución alternativa actual consta de los siguientes pasos:
- Para las canalizaciones de Java, usa pipeline.run().waitUntilFinish() en el código de tu canalización. Para las canalizaciones de Python, usa pipeline.run().wait_until_finish() en el código de tu canalización. Para las canalizaciones de Go, sigue los pasos de la pestaña Go.
- Ejecuta el programa de tu canalización de reemplazo con el campo
replaceJobId
. - Espera a que el trabajo de reemplazo apruebe con éxito la verificación de compatibilidad.
- Usa
Ctrl+C
para salir del proceso de ejecución de bloqueo.
La verificación de compatibilidad usa la asignación de transformaciones proporcionada para garantizar que Dataflow pueda transferir datos de estado intermedios de los pasos en tu trabajo anterior a tu trabajo de reemplazo. La verificación de compatibilidad también garantiza que los PCollection
en tu canalización usen los mismos codificadores.
Cambiar un Coder
puede ocasionar que falle la verificación de compatibilidad, ya que algunos datos en tránsito o registros almacenados en búfer pueden no estar serializados de forma correcta en la canalización de reemplazo.
Evita errores de compatibilidad
Algunas diferencias entre tu canalización anterior y tu canalización de reemplazo pueden ocasionar que falle la verificación de compatibilidad. Estas diferencias incluyen los siguientes puntos:
- Cambiar el grafo de la canalización sin proporcionar una asignación. Cuando actualizas un trabajo, Dataflow intenta hacer coincidir las transformaciones en tu trabajo anterior con las transformaciones en el trabajo de reemplazo. Este proceso de coincidencia ayuda a Dataflow a transferir datos de estado intermedios para cada paso. Si cambias el nombre o quitas pasos, debes proporcionar una asignación de transformación para que Dataflow pueda hacer coincidir los datos de estado según corresponda.
- Cambiar las entradas adicionales de un paso. Agregar entradas adicionales a una transformación de tu canalización de reemplazo o quitarlas de ella hará que falle la verificación de compatibilidad.
- Cambiar el codificador de un paso. Cuando actualizas un trabajo, Dataflow conserva los registros de datos almacenados en búfer y los administra en el trabajo de reemplazo. Por ejemplo, los datos almacenados en búfer pueden ocurrir mientras se resuelve la renderización en ventanas. Si el trabajo de reemplazo usa una codificación de datos diferente o incompatible, Dataflow no podrá serializar o deserializar estos registros.
Quita una operación “con estado” de tu canalización. Si quitas las operaciones con estado de tu canalización, es posible que el trabajo de reemplazo falle en la verificación de compatibilidad. Dataflow puede fusionar varios pasos para lograr una mayor eficiencia. Si quitas una operación dependiente del estado de un paso fusionado, la verificación fallará. Las operaciones con estado incluyen los siguientes elementos:
- Transformaciones que producen o consumen entradas adicionales
- Lecturas de E/S
- Transformaciones que usan un estado clave
- Transformaciones con ventanas combinadas
Cambiar variables con estado
DoFn
. En los trabajos de transmisión en curso, si la canalización incluyeDoFn
con estado, cambiar las variablesDoFn
con estado puede hacer que la canalización falle.Intentar ejecutar tu trabajo de reemplazo en una zona geográfica diferente. Ejecuta tu trabajo de reemplazo en la misma zona en la que ejecutaste tu trabajo anterior.
Actualiza esquemas
Apache Beam permite que las PCollection
tengan esquemas con campos con nombre, en cuyo caso no se necesitan codificadores explícitos. Si los nombres y tipos de campo de un esquema determinado no se modifican (incluidos los campos anidados), ese esquema no hará que falle la verificación de actualización. Sin embargo, la actualización aún puede bloquearse si otros segmentos de la canalización nueva no son compatibles.
Esquemas de evolución
A menudo, es necesario mejorar el esquema de una PCollection
debido a los requisitos empresariales cambiantes. El servicio de Dataflow permite realizar los siguientes cambios en un esquema cuando se actualiza la canalización:
- Agregar uno o más campos nuevos a un esquema, incluidos los campos anidados
- Hacer que un tipo de campo obligatorio (no anulable) sea opcional (anulable).
No se permite cambiar campos, quitar nombres de campos o cambiar tipos de campos durante la actualización.
Pasa datos adicionales a una operación ParDo existente
Puedes pasar datos adicionales (fuera de banda) a una operación ParDo existente mediante uno de los siguientes métodos, según tu caso práctico:
- Serializa la información como campos en tu subclase
DoFn
. - Cualquier variable a la que se haga referencia a través de los métodos en una
DoFn
anónima se serializa de forma automática. - Calcula los datos dentro de
DoFn.startBundle()
. - Pasa datos mediante
ParDo.withSideInputs
.
Si deseas obtener más información, consulta las siguientes páginas:
- Guía de programación de Apache Beam: ParDo, en particular, las secciones sobre la creación de una DoFn y entradas complementarias.
- Referencia del SDK de Apache Beam para Java: ParDo