En las canalizaciones de streaming con un gran volumen de datos de entrada, suele haber un equilibrio entre el coste y la latencia. Para mantener una latencia baja, Dataflow debe añadir trabajadores a medida que aumenta el volumen de tráfico. Otro factor es la rapidez con la que la canalización debe aumentar o reducir su escala en respuesta a los cambios en la tasa de datos de entrada.
El autoescalador de Dataflow tiene ajustes predeterminados que son adecuados para muchas cargas de trabajo. Sin embargo, puede que quieras ajustar este comportamiento en tu caso concreto. Por ejemplo, puede que te parezca aceptable una latencia media más alta para reducir los costes o que quieras que Dataflow escale más rápido en respuesta a los picos de tráfico.
Para optimizar el autoescalado horizontal, puedes ajustar los siguientes parámetros:
- Intervalo de autoescalado: el número mínimo y máximo de trabajadores que se van a asignar.
- Sugerencia de uso de los trabajadores: uso de CPU objetivo de los trabajadores.
- Sugerencia de paralelismo de los trabajadores: número de paralelismo objetivo de los trabajadores.
Definir el intervalo de autoescalado
Cuando creas una tarea de streaming, puedes definir el número inicial de trabajadores y el número máximo de trabajadores. Para ello, especifica las siguientes opciones de la canalización:
Java
--numWorkers
: número inicial de trabajadores disponibles cuando se inicia la canalización--maxNumWorkers
: número máximo de trabajadores disponibles para tu canalización
Python
--num_workers
: número inicial de trabajadores disponibles cuando se inicia la canalización--max_num_workers
: número máximo de trabajadores disponibles para tu canalización
Go
--num_workers
: número inicial de trabajadores disponibles cuando se inicia la canalización--max_num_workers
: número máximo de trabajadores disponibles para tu canalización
En las tareas de streaming que usan Streaming Engine, la marca --maxNumWorkers
es opcional. El valor predeterminado es 100
. En el caso de las tareas de streaming que no usen Streaming Engine, --maxNumWorkers
es obligatorio cuando el autoescalado horizontal está habilitado.
El valor inicial de --maxNumWorkers
también determina cuántos discos persistentes se asignan al trabajo.
Las canalizaciones se implementan con un grupo fijo de discos persistentes, cuyo número es igual a --maxNumWorkers
. Durante la transmisión, los discos persistentes se redistribuyen de forma que cada trabajador recibe el mismo número de discos conectados.
Si asignas el valor --maxNumWorkers
, asegúrate de que proporcione suficientes discos para tu
pipeline. Ten en cuenta el crecimiento futuro al definir el valor inicial. Para obtener información sobre el rendimiento de Persistent Disk, consulta Configurar Persistent Disk y las VMs.
Dataflow factura el uso de Persistent Disk y tiene cuotas de Compute Engine, incluidas las cuotas de Persistent Disk.
De forma predeterminada, el número mínimo de trabajadores es 1 para las tareas de streaming que usan Streaming Engine y (maxNumWorkers
/15), redondeado hacia arriba, para las tareas que no usan Streaming Engine.
Actualizar el intervalo de escalado automático
En las tareas que usan Streaming Engine, puedes ajustar el número mínimo y máximo de trabajadores sin detener ni sustituir la tarea. Para ajustar estos valores, usa una actualización de trabajo en curso. Actualiza las siguientes opciones de trabajo:
--min-num-workers
: número mínimo de trabajadores.--max-num-workers
: número máximo de trabajadores.
gcloud
Usa el comando gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Haz los cambios siguientes:
- REGION: el ID de región del endpoint regional de la tarea
- MINIMUM_WORKERS: número mínimo de instancias de Compute Engine
- MAXIMUM_WORKERS: número máximo de instancias de Compute Engine
- JOB_ID: el ID del trabajo que se va a actualizar
También puedes actualizar --min-num-workers
y --max-num-workers
de forma individual.
REST
Usa el método projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Haz los cambios siguientes:
- PROJECT_ID: el Google Cloud ID de proyecto de la tarea de Dataflow
- REGION: el ID de región del endpoint regional de la tarea
- JOB_ID: el ID del trabajo que se va a actualizar
- MINIMUM_WORKERS: número mínimo de instancias de Compute Engine
- MAXIMUM_WORKERS: número máximo de instancias de Compute Engine
También puedes actualizar min_num_workers
y max_num_workers
de forma individual.
Especifica qué parámetros quieres actualizar en el parámetro de consulta updateMask
e incluye los valores actualizados en el campo runtimeUpdatableParams
del cuerpo de la solicitud. En el siguiente ejemplo se actualiza min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
En el caso de las tareas que no usan Streaming Engine, puedes sustituir la tarea actual por un valor actualizado de maxNumWorkers
.
Si actualizas una tarea de streaming que no usa Streaming Engine, la tarea actualizada tendrá el escalado automático horizontal inhabilitado de forma predeterminada. Para mantener habilitado el autoescalado, especifica --autoscalingAlgorithm
y --maxNumWorkers
en la tarea actualizada.
Definir la sugerencia de utilización del trabajador
Dataflow usa el uso medio de la CPU como señal para aplicar el autoescalado horizontal. De forma predeterminada, Dataflow establece un uso de CPU objetivo de 0,8. Si la utilización se sale de este intervalo, Dataflow puede añadir o quitar trabajadores.
Para tener más control sobre el comportamiento del autoescalado, puedes definir la utilización de CPU objetivo en un valor del intervalo [0,1, 0,9].
Define un valor de uso de CPU inferior si quieres conseguir latencias máximas más bajas. Un valor más bajo permite que Dataflow escale horizontalmente de forma más agresiva en respuesta al aumento de la utilización de los trabajadores y que escale verticalmente de forma más conservadora para mejorar la estabilidad. Un valor más bajo también proporciona más margen cuando la canalización se ejecuta en un estado estable, lo que suele dar lugar a una latencia de cola más baja. La latencia de cola mide los tiempos de espera más largos antes de que se procese un nuevo registro.
Define un valor más alto si quieres ahorrar recursos y mantener los costes más bajos cuando haya picos de tráfico. Un valor más alto evita un aumento excesivo de la escala, pero a costa de una latencia mayor.
Para configurar la sugerencia de utilización al ejecutar un trabajo que no sea de plantilla, define la worker_utilization_hint
opción de servicio. En el caso de un trabajo de plantilla, actualiza la sugerencia de utilización, ya que no se admiten opciones de servicio.
En el siguiente ejemplo se muestra cómo usar worker_utilization_hint
:
Java
--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION
Sustituye TARGET_UTILIZATION por un valor del intervalo [0,1, 0,9].
Python
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Sustituye TARGET_UTILIZATION por un valor del intervalo [0,1, 0,9].
Go
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Sustituye TARGET_UTILIZATION por un valor del intervalo [0,1, 0,9].
En el caso de las nuevas, te recomendamos que hagas pruebas con cargas realistas usando la configuración predeterminada. Después, evalúa el comportamiento del escalado automático en tu pipeline y haz los ajustes necesarios.
La sugerencia de utilización es solo uno de los factores que usa Dataflow para decidir si debe escalar los trabajadores. Otros factores, como el trabajo pendiente y las claves disponibles, pueden anular el valor de la pista. Además, la sugerencia no es un objetivo estricto. El escalador automático intenta mantener el uso de la CPU dentro del intervalo del valor de la sugerencia, pero la métrica de uso agregada puede ser superior o inferior. Para obtener más información, consulta Heurísticas de autoescalado de streaming.
Actualizar la sugerencia de utilización
Para actualizar la sugerencia de uso mientras se está ejecutando un trabajo, realiza una actualización en curso de la siguiente manera:
gcloud
Usa el comando
gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Haz los cambios siguientes:
- REGION: el ID de región del endpoint regional de la tarea
- JOB_ID: el ID del trabajo que se va a actualizar
- TARGET_UTILIZATION: un valor en el intervalo [0,1; 0,9]
Para restablecer la sugerencia de utilización al valor predeterminado, usa el siguiente comando de gcloud:
gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID
REST
Usa el método projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint { "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION } }
Haz los cambios siguientes:
- PROJECT_ID: el ID de proyecto del trabajo de Dataflow. Google Cloud
- REGION: el ID de región del punto de conexión regional de la tarea.
- JOB_ID: el ID del trabajo que se va a actualizar.
- TARGET_UTILIZATION: un valor en el intervalo [0,1; 0,9]
Definir una sugerencia de paralelismo de los trabajadores
Para gestionar el autoescalado con operaciones largas que no dependan tanto de las CPUs, como las cargas de trabajo intensivas de aprendizaje automático, puedes definir la sugerencia de paralelismo de los trabajadores mediante las sugerencias de recursos de Apache Beam. Estas sugerencias cambian el autoescalado a un modo diferente optimizado para cargas de trabajo que requieren mucha GPU o transformaciones con tiempos de procesamiento largos.
En el siguiente ejemplo se muestra cómo adjuntar una sugerencia de paralelismo a una transformación:
Java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))
Sustituye TARGET_PARALLELISM_PER_WORKER por un valor adecuado para tu caso práctico. Para obtener información general, consulta cómo elegir un buen valor inicial.
Python
pcoll | MyPTransform().with_resource_hints(
max_active_bundles_per_worker="TARGET_PARALLELISM_PER_WORKER")
Sustituye TARGET_PARALLELISM_PER_WORKER por un valor adecuado para tu caso práctico. Para obtener información general, consulta cómo elegir un buen valor inicial.
Elige el valor de la sugerencia de paralelismo de los trabajadores
En los casos prácticos de aprendizaje automático, un buen valor inicial es equivalente al número de modelos que se ejecutan en paralelo en cada trabajador. Este valor está limitado por la capacidad de los aceleradores del trabajador y el tamaño del modelo.
En otros casos prácticos, la canalización está limitada por la memoria o por la CPU. En el caso de las canalizaciones con limitaciones de memoria, usa el límite de memoria para calcular el procesamiento paralelo máximo. En el caso de las canalizaciones con uso intensivo de CPU, se recomienda mantener la política de escalado automático predeterminada en lugar de proporcionar una sugerencia de paralelismo.
Es posible ajustar el valor para adaptarlo a las necesidades de procesamiento de otras fases, como la escritura en un receptor. Si aumentas el valor en 1 o 2 cuando el paralelismo de tu modelo es 2, se reconocerá el tiempo de procesamiento más rápido de la escritura en el receptor, ya que se le dará más margen para tener en cuenta el procesamiento realizado en otras fases. Si tu canalización no incluye un paso de aleatorización y las transformaciones se combinan en una sola fase, no es necesario que ajustes el valor de otras transformaciones.
Este valor también se puede ajustar para simular los efectos de los retrasos aceptables. Por ejemplo, si acepta un retraso máximo de 10 minutos y el tiempo de procesamiento medio de su modelo es de 1 minuto, puede aumentar el valor en 1 si el número máximo de trabajadores es 10.
Heurísticas de autoescalado con uso intensivo de GPU
En la configuración que requiere un uso intensivo de la GPU, indicada mediante la sugerencia de paralelismo de la configuración, Dataflow tiene en cuenta varios factores al ajustar la escala automáticamente. Entre estos factores se incluyen los siguientes:
- Claves disponibles. Las claves son la unidad fundamental de paralelismo en Dataflow.
- Número máximo de paquetes activos por trabajador. Esto sugiere el número máximo ideal de paralelismo de procesamiento en el trabajador.
La idea general que subyace a las decisiones de escalado es calcular los workers necesarios para gestionar la carga actual, tal como indican las claves disponibles. Por ejemplo, si hay 100 claves disponibles para procesarse y el paralelismo máximo por trabajador es 10, deberías tener 10 trabajadores en total.
Si tu canalización es compleja y tiene una carga de trabajo pesada que requiere muchos recursos de GPU y numerosas transformaciones que requieren muchos recursos de CPU, te recomendamos que habilites la opción Ajuste adecuado. De esta forma, el servicio puede distinguir entre el trabajo que requiere mucha CPU y el que requiere mucha GPU, y, a continuación, escalar cada grupo de trabajadores en consecuencia.
Heurísticas de autoescalado de streaming
En el caso de las canalizaciones de streaming, el objetivo del autoescalado horizontal es minimizar el trabajo pendiente y, al mismo tiempo, maximizar el uso y el rendimiento de los trabajadores, así como reaccionar rápidamente a los picos de carga.
Dataflow tiene en cuenta varios factores al ajustar la escala automáticamente, entre los que se incluyen los siguientes:
Registro de trabajo pendiente. El tiempo de trabajo pendiente estimado se calcula a partir del rendimiento y de los bytes pendientes de procesar de la fuente de entrada. Una canalización se considera atrasada cuando el tiempo de atraso estimado supera los 15 segundos.
Uso de CPU objetivo: El objetivo predeterminado del uso medio de CPU es 0,8. Puedes anular este valor.
Claves disponibles. Las claves son la unidad fundamental de paralelismo en Dataflow.
En algunos casos, Dataflow usa los siguientes factores en las decisiones de escalado automático. Si estos factores se usan en tu trabajo, puedes ver esa información en la pestaña de métricas Escalado automático.
La limitación basada en claves usa el número de claves de procesamiento que recibe la tarea para calcular el límite de los trabajadores del usuario, ya que cada clave solo puede procesarla un trabajador a la vez.
Amortiguación de la reducción de escala. Si Dataflow detecta que se han tomado decisiones de escalado automático inestables, reduce la velocidad de reducción para mejorar la estabilidad.
La mejora de la resolución basada en la CPU usa un uso elevado de la CPU como criterio de mejora de la resolución.
En el caso de las tareas de streaming que no usan Streaming Engine, el escalado puede estar limitado por el número de discos persistentes. Para obtener más información, consulta Definir el intervalo de escalado automático.
Autoescalado con uso intensivo de GPU, si se habilita configurando la sugerencia de paralelismo de los trabajadores. Para obtener más información, consulta Heurísticas de autoescalado intensivas en GPU.
Mejora de la resolución. Si una canalización de streaming permanece con un retraso y paralelismo suficientes en los trabajadores durante varios minutos, Dataflow aumenta la escala. Dataflow intenta borrar el trabajo pendiente en un plazo de aproximadamente 150 segundos después de aumentar la escala, dado el rendimiento actual por trabajador. Si hay un retraso, pero el trabajador no tiene suficiente paralelismo para trabajadores adicionales, la canalización no se amplía. (Aumentar el número de trabajadores por encima del número de claves disponibles para el procesamiento en paralelo no ayuda a procesar el trabajo pendiente más rápido).
Reducción del escalado: cuando la herramienta de escalado automático toma una decisión de reducir el escalado, el factor de mayor prioridad es el trabajo pendiente. El escalador automático tiene como objetivo un backlog de no más de 15 segundos. Si el trabajo pendiente se reduce a menos de 10 segundos y la utilización media de los trabajadores es inferior al objetivo de utilización de la CPU, Dataflow reduce la escala. Mientras la cartera de pedidos sea aceptable, el escalador automático intentará mantener la utilización de la CPU cerca del objetivo de utilización de la CPU. Sin embargo, si la utilización ya está lo suficientemente cerca del objetivo, es posible que la herramienta de adaptación dinámica mantenga el número de trabajadores sin cambios, ya que cada paso de reducción tiene un coste.
Streaming Engine también usa una técnica de autoescalado predictivo basada en el retraso del temporizador. Los datos ilimitados de una canalización de streaming se dividen en ventanas agrupadas por marcas de tiempo. Al final de una ventana, se activan los temporizadores de cada clave que se está procesando en esa ventana. La activación de un temporizador indica que el periodo ha finalizado para una clave determinada. Streaming Engine puede medir el trabajo pendiente del temporizador y predecir cuántos temporizadores se activarán al final de una ventana. Al usar el backlog del temporizador como señal, Dataflow puede estimar la cantidad de procesamiento que debe realizarse cuando se activen los temporizadores futuros. En función de la carga futura estimada, Dataflow se adapta dinámicamente con antelación para satisfacer la demanda prevista.
Métricas
Para consultar los límites de escalado automático actuales de una tarea, consulta las siguientes métricas:
job/max_worker_instances_limit
: número máximo de trabajadores.job/min_worker_instances_limit
: número mínimo de trabajadores.
Para obtener información sobre la utilización de los trabajadores, consulta las siguientes métricas:
job/aggregated_worker_utilization
: el uso agregado de los trabajadores.job/worker_utilization_hint
: Sugerencia de utilización del trabajador actual.
Para obtener información valiosa sobre el comportamiento de la herramienta de escalado automático, consulta la siguiente métrica:
job.worker_utilization_hint_is_actively_used
: indica si el escalador automático está usando activamente la sugerencia de utilización del trabajador. Si otros factores anulan la sugerencia cuando se muestrea esta métrica, el valor esfalse
.job/horizontal_worker_scaling
: describe las decisiones que ha tomado la herramienta de ajuste automático. Esta métrica contiene las siguientes etiquetas:direction
: especifica si la herramienta de escalado automático ha aumentado o reducido la escala, o si no ha tomado ninguna medida.rationale
: especifica el motivo de la decisión de la herramienta de adaptación dinámica.
Para obtener más información, consulta las métricas de Cloud Monitoring. Estas métricas también se muestran en los gráficos de monitorización del ajuste de escala automático.