¿Qué es el autoescalado?
Estimar el número "correcto" de trabajadores (nodos) de un clúster para una carga de trabajo es difícil, y un único tamaño de clúster para toda una canalización a menudo no es lo ideal. El escalado de clústeres iniciado por el usuario aborda parcialmente este problema, pero requiere monitorizar la utilización del clúster e intervenir manualmente.
La API AutoscalingPolicies de Dataproc es un mecanismo de automatización de la gestión de los recursos de clústeres que permite el autoescalado de las VMs de los trabajadores del clúster. Una Autoscaling Policy
es una configuración reutilizable que describe cómo deben escalarse los trabajadores del clúster que utilizan la política de autoescalado. Define los límites de escalado, la frecuencia y la agresividad para ofrecer un control pormenorizado de los recursos del clúster durante toda su vida útil.
Cuándo usar el autoescalado
Usa el autoescalado:
en clústeres que almacenan datos en servicios externos, como Cloud Storage o BigQuery
en clústeres que procesan muchas tareas
para escalar verticalmente clústeres de una sola tarea
con el modo de flexibilidad mejorado para trabajos por lotes de Spark
No se recomienda el autoescalado en los siguientes casos:
HDFS: el autoescalado no está diseñado para escalar HDFS en el clúster porque:
- La utilización de HDFS no es una señal para el autoescalado.
- Los datos de HDFS solo se alojan en trabajadores principales. El número de trabajadores primarios debe ser suficiente para alojar todos los datos de HDFS.
- La retirada de DataNodes de HDFS puede retrasar la eliminación de trabajadores. Los nodos de datos copian los bloques de HDFS en otros nodos de datos antes de que se elimine un trabajador. En función del tamaño de los datos y del factor de replicación, este proceso puede tardar horas.
Etiquetas de nodo de YARN: el autoescalado no admite etiquetas de nodo de YARN ni la propiedad
dataproc:am.primary_only
debido a YARN-9088. YARN informa incorrectamente las métricas del clúster cuando se usan etiquetas de nodo.Spark Structured Streaming: el autoescalado no es compatible con Spark Structured Streaming (consulta Autoescalado y Spark Structured Streaming).
Clústeres inactivos: no se recomienda el autoescalado para reducir el tamaño de un clúster al mínimo cuando esté inactivo. Como crear un clúster nuevo es tan rápido como cambiar el tamaño de uno, te recomendamos que elimines los clústeres inactivos y los vuelvas a crear. Las siguientes herramientas admiten este modelo "efímero":
Usa los flujos de trabajo de Dataproc para programar un conjunto de trabajos en un clúster dedicado y, a continuación, elimina el clúster cuando se hayan completado los trabajos. Para una orquestación más avanzada, usa Cloud Composer, que se basa en Apache Airflow.
En el caso de los clústeres que procesan consultas ad hoc o cargas de trabajo programadas externamente, utiliza la eliminación programada de clústeres para eliminar el clúster después de un periodo de inactividad o una duración especificados, o en un momento concreto.
Cargas de trabajo de diferentes tamaños: cuando se ejecutan tareas pequeñas y grandes en un clúster, la reducción de escala de la desactivación gradual esperará a que finalicen las tareas grandes. Como resultado, una tarea de larga duración retrasará el ajuste automático de los recursos de las tareas más pequeñas que se ejecuten en el clúster hasta que finalice la tarea de larga duración. Para evitar este resultado, agrupa las tareas más pequeñas de tamaño similar en un clúster y aísla cada tarea de larga duración en un clúster independiente.
Habilitar autoescalado
Para habilitar el autoescalado en un clúster, sigue estos pasos:
Cualquiera de las siguientes:
Crear una política de autoescalado
CLI de gcloud
Puedes usar el comando
gcloud dataproc autoscaling-policies import
para crear una política de autoescalado. Lee un archivo YAML local que define una política de autoescalado. El formato y el contenido del archivo deben coincidir con los objetos y campos de configuración definidos por la API REST autoscalingPolicies.
En el siguiente ejemplo de YAML se define una política para clústeres estándar de Dataproc con todos los campos obligatorios. También proporciona los valores minInstances
y maxInstances
de los trabajadores principales, el valor maxInstances
de los trabajadores secundarios (interrumpibles) y especifica un cooldownPeriod
de 4 minutos (el valor predeterminado es de 2 minutos). El workerConfig
configura los
trabajadores principales. En este ejemplo, minInstances
y maxInstances
tienen el mismo valor para evitar que se escale el número de trabajadores principales.
workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: maxInstances: 50 basicAlgorithm: cooldownPeriod: 4m yarnConfig: scaleUpFactor: 0.05 scaleDownFactor: 1.0 gracefulDecommissionTimeout: 1h
En el siguiente ejemplo de YAML se define una política para clústeres estándar de Dataproc con todos los campos obligatorios y opcionales de la política de autoescalado.
clusterType: STANDARD workerConfig: minInstances: 10 maxInstances: 10 weight: 1 secondaryWorkerConfig: minInstances: 0 maxInstances: 100 weight: 1 basicAlgorithm: cooldownPeriod: 2m yarnConfig: scaleUpFactor: 0.05 scaleDownFactor: 1.0 scaleUpMinWorkerFraction: 0.0 scaleDownMinWorkerFraction: 0.0 gracefulDecommissionTimeout: 1h
En el siguiente ejemplo de YAML se define una política para clústeres de escalado a cero.
En los clústeres de escala cero, no incluyasworkerConfig
.
clusterType: ZERO_SCALE secondaryWorkerConfig: minInstances: 0 maxInstances: 100 weight: 1 basicAlgorithm: cooldownPeriod: 2m yarnConfig: scaleUpFactor: 0.05 scaleDownFactor: 1.0 scaleUpMinWorkerFraction: 0.0 scaleDownMinWorkerFraction: 0.0 gracefulDecommissionTimeout: 1h
Ejecuta el siguiente comando gcloud
desde un terminal local o en Cloud Shell para crear la política de autoescalado. Asigne un nombre a la política. Este nombre se convertirá en el id
de la política, que podrás usar en comandos gcloud
posteriores para hacer referencia a la política. Usa la marca --source
para especificar la ruta local y el nombre del archivo YAML de la política de autoescalado que quieres importar.
gcloud dataproc autoscaling-policies import policy-name \ --source=filepath/filename.yaml \ --region=region
API REST
Crea una política de autoescalado definiendo un AutoscalingPolicy como parte de una solicitud autoscalingPolicies.create.
Consola
Para crear una política de autoescalado, selecciona CREAR POLÍTICA en la página de Dataproc Políticas de autoescalado con la Google Cloud consola. En la página Crear política, puedes seleccionar un panel de recomendaciones de políticas para rellenar los campos de la política de autoescalado de un tipo de trabajo o un objetivo de escalado específicos.
Crear un clúster de autoescalado
Después de crear una política de autoescalado, crea un clúster que la utilice. El clúster debe estar en la misma región que la política de autoescalado.
CLI de gcloud
Ejecuta el siguiente comando gcloud
desde un terminal local o en Cloud Shell para crear un clúster de escalado automático. Proporciona un nombre para el clúster y usa la marca --autoscaling-policy
para especificar el policy ID (el nombre de la política que especificaste cuando creaste la política) o la política resource URI (resource name) (consulta los campos AutoscalingPolicy id
y name
).
gcloud dataproc clusters create cluster-name \ --autoscaling-policy=policy id or resource URI \ --region=region
API REST
Para crear un clúster de autoescalado, incluye un AutoscalingConfig como parte de una solicitud clusters.create.
Consola
Puedes seleccionar una política de autoescalado para aplicarla a un clúster nuevo desde la sección Política de autoescalado del panel Configurar clúster de la página Crear un clúster de la consola de Google Cloud Dataproc.
Habilitar el autoescalado en un clúster
Después de crear una política de autoescalado, puedes habilitarla en un clúster que ya tengas en la misma región.
CLI de gcloud
Ejecuta el siguiente comando gcloud
desde un terminal local o en Cloud Shell para habilitar una política de autoescalado en un clúster. Proporciona el nombre del clúster y usa la marca --autoscaling-policy
para especificar el policy ID (el nombre de la política que especificaste cuando creaste la política) o el resource URI (resource name) de la política (consulta los campos AutoscalingPolicy id
y name
).
gcloud dataproc clusters update cluster-name \ --autoscaling-policy=policy id or resource URI \ --region=region
API REST
Para habilitar una política de autoescalado en un clúster, define el valor de AutoscalingConfig.policyUri de la política en el updateMask
de una solicitud clusters.patch.
Consola
No se puede habilitar una política de autoescalado en un clúster disponible en la consola de Google Cloud .
Uso de políticas de varios clústeres
Una política de autoescalado define el comportamiento de escalado que se puede aplicar a varios clústeres. Una política de autoescalado se aplica mejor a varios clústeres cuando estos comparten cargas de trabajo similares o ejecutan trabajos con patrones de uso de recursos similares.
Puedes actualizar una política que se esté usando en varios clústeres. Las actualizaciones afectan inmediatamente al comportamiento del autoescalado de todos los clústeres que usan la política (consulta autoscalingPolicies.update). Si no quieres que una actualización de una política se aplique a un clúster que la esté usando, inhabilita el autoescalado en el clúster antes de actualizar la política.
CLI de gcloud
Ejecuta el siguiente comando gcloud
desde un terminal local o en Cloud Shell para inhabilitar el autoescalado en un clúster.
gcloud dataproc clusters update cluster-name --disable-autoscaling \ --region=region
API REST
Para inhabilitar el autoescalado en un clúster, asigna a AutoscalingConfig.policyUri la cadena vacía y define update_mask=config.autoscaling_config.policy_uri
en una solicitud clusters.patch.
Consola
No se puede inhabilitar el autoescalado en un clúster desde la consola de Google Cloud .
- No se puede eliminar una política que se esté usando en uno o varios clústeres (consulta autoscalingPolicies.delete).
Cómo funciona el ajuste de escala automático
La función de autoescalado comprueba las métricas de Hadoop YARN del clúster a medida que transcurre cada periodo de enfriamiento para determinar si se debe escalar el clúster y, si es así, la magnitud de la actualización.
El valor de la métrica de recursos pendientes de YARN (memoria pendiente o núcleos pendientes) determina si se debe aumentar o reducir la escala. Un valor superior a
0
indica que los trabajos de YARN están esperando recursos y que puede ser necesario aumentar la escala. Un valor0
indica que YARN tiene recursos suficientes, por lo que puede que no sea necesario reducir la escala ni hacer otros cambios.Si el recurso pendiente es > 0:
$estimated\_worker\_count =$
\[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Pending + Available + Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]
Si el recurso pendiente es 0:
$estimated\_worker\_count =$
\[ \Biggl \lceil AVERAGE\ during\ cooldown\ period\Big(\frac{Allocated + Reserved}{Resource\ per\ worker}\Big)\Biggr \rceil \]
De forma predeterminada, la herramienta de escalado automático monitoriza el recurso de memoria de YARN. Si habilitas el autoescalado basado en núcleos, se monitorizarán tanto la memoria como los núcleos de YARN:
estimated_worker_count
se evalúa por separado para la memoria y los núcleos, y se selecciona el número de trabajadores resultante más alto.$estimated\_worker\_count =$
\[ max(estimated\_worker\_count\_by\_memory,\ estimated\_worker\_count\_by\_cores) \]
\[ estimated\ \Delta worker = estimated\_worker\_count - current\_worker\_count \]
Teniendo en cuenta el cambio estimado que se necesita en el número de trabajadores, el autoescalado usa un
scaleUpFactor
o unscaleDownFactor
para calcular el cambio real en el número de trabajadores:if estimated Δworkers > 0: actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor) # examples: # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3 # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1 else: actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor) # examples: # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2 # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0 # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
Un valor de 1.0 en scaleUpFactor o scaleDownFactor significa que el autoescalado se realizará de forma que el recurso pendiente o disponible sea 0 (utilización perfecta).
Una vez que se calcula el cambio en el número de trabajadores,
scaleUpMinWorkerFraction
yscaleDownMinWorkerFraction
actúan como umbral para determinar si el autoescalado escalará el clúster. Una fracción pequeña significa que el autoescalado debe escalar aunque elΔworkers
sea pequeño. Una fracción mayor significa que el escalado solo debe producirse cuando elΔworkers
sea grande. O BIENIF (Δworkers > scaleUpMinWorkerFraction * current_worker_count) then scale up
IF (abs(Δworkers) > scaleDownMinWorkerFraction * current_worker_count), THEN scale down.
Si el número de trabajadores que se van a escalar es lo suficientemente grande como para activar el escalado, el autoescalado usa los límites
minInstances
maxInstances
deworkerConfig
ysecondaryWorkerConfig
yweight
(proporción de trabajadores principales y secundarios) para determinar cómo dividir el número de trabajadores entre los grupos de instancias de trabajadores principales y secundarios. El resultado de estos cálculos es el cambio final de escalado automático del clúster durante el periodo de escalado.Las solicitudes de reducción de escala del autoescalado se cancelarán en los clústeres creados con versiones de imagen 2.0.57+, 2.1.5+ y posteriores si se cumplen las siguientes condiciones:
- Se está reduciendo la escala con un valor de tiempo de espera de retirada suave distinto de cero y
El número de trabajadores de YARN ACTIVOS ("trabajadores activos") más el cambio en el número total de trabajadores recomendado por el escalador automático (
Δworkers
) es igual o superior aDECOMMISSIONING
trabajadores de YARN ("trabajadores retirados"), como se muestra en la siguiente fórmula:IF (active workers + Δworkers ≥ active workers + decommissioning workers) THEN cancel the scaledown operation
Para ver un ejemplo de cancelación de una reducción horizontal, consulta ¿Cuándo cancela el autoescalado una operación de reducción horizontal?
Recomendaciones de configuración de autoescalado
En esta sección se incluyen recomendaciones para configurar el autoescalado.
Evitar escalar trabajadores principales
Los trabajadores principales ejecutan nodos de datos de HDFS, mientras que los trabajadores secundarios solo realizan cálculos.
El uso de trabajadores secundarios te permite escalar de forma eficiente los recursos de computación sin necesidad de aprovisionar almacenamiento, lo que se traduce en una mayor capacidad de escalado.
Los nodos de nombres de HDFS pueden tener varias condiciones de carrera que provoquen que HDFS se dañe, de modo que la retirada se quede bloqueada indefinidamente. Para evitar este problema, no escale los trabajadores principales. Por ejemplo:
none
workerConfig:
minInstances: 10
maxInstances: 10
secondaryWorkerConfig:
minInstances: 0
maxInstances: 100
Hay que hacer algunas modificaciones en el comando de creación del clúster:
- Asigna a
--num-workers=10
el tamaño del grupo de trabajadores principal de la política de autoescalado. - Define
--secondary-worker-type=non-preemptible
para configurar los trabajadores secundarios como no interrumpibles. (a menos que quieras usar máquinas virtuales interrumpibles). - Copia la configuración de hardware de los trabajadores principales en los secundarios. Por ejemplo, define
--secondary-worker-boot-disk-size=1000GB
para que coincida con--worker-boot-disk-size=1000GB
.
Usar el modo de flexibilidad mejorada en trabajos por lotes de Spark
Usa el modo de flexibilidad mejorada (EFM) con el autoescalado para lo siguiente:
permite reducir la escala del clúster más rápido mientras se ejecutan las tareas
evitar interrupciones en las tareas en ejecución debido a la reducción de la escala del clúster
minimizar las interrupciones de los trabajos en curso debido a la interrupción de los trabajadores secundarios interrumpibles
Si la gestión de errores ampliada está habilitada, el tiempo de espera de retirada gradual de una política de autoescalado debe ser 0s
. La política de autoescalado solo debe autoescalar los trabajadores secundarios.
Elegir un tiempo de espera de la retirada suave
El autoescalado admite el desmantelamiento gradual de YARN al quitar nodos de un clúster. La retirada gradual permite que las aplicaciones terminen de reorganizar los datos entre fases para evitar que se interrumpa el progreso de los trabajos. El tiempo de espera de retirada gradual proporcionado en una política de escalado automático es el límite superior de la duración que YARN esperará a que se ejecuten las aplicaciones (aplicaciones que se estaban ejecutando cuando se inició la retirada) antes de eliminar los nodos.
Si un proceso no se completa en el periodo de tiempo de espera de desactivación gradual especificado, el nodo de trabajo se cerrará de forma forzada, lo que puede provocar la pérdida de datos o la interrupción del servicio. Para evitar esta posibilidad, asigna al tiempo de espera de la retirada suave un valor mayor que el de la tarea más larga que vaya a procesar el clúster. Por ejemplo, si prevés que el trabajo más largo se ejecutará durante una hora, define el tiempo de espera en al menos una hora (1h
).
Considera la posibilidad de migrar los trabajos que duren más de una hora a sus propios clústeres efímeros para evitar que se bloquee la desactivación gradual.
Configurando scaleUpFactor
scaleUpFactor
controla la intensidad con la que la herramienta de adaptación dinámica aumenta el tamaño de un clúster.
Especifica un número entre 0.0
y 1.0
para definir el valor fraccionario del recurso pendiente de YARN que provoca la adición de nodos.
Por ejemplo, si hay 100 contenedores pendientes que solicitan 512 MB cada uno, habrá 50 GB de memoria YARN pendientes. Si scaleUpFactor es 0.5
, la herramienta de adaptación dinámica añadirá suficientes nodos para añadir 25 GB de memoria YARN. Del mismo modo, si es 0.1
, el autoescalador añadirá suficientes nodos para 5 GB. Ten en cuenta que estos valores corresponden a la memoria de YARN, no a la memoria total disponible físicamente en una máquina virtual.
Un buen punto de partida es 0.05
para las tareas de MapReduce y Spark con la asignación dinámica habilitada. En el caso de las tareas de Spark con un número fijo de ejecutores y las tareas de Tez, usa 1.0
. Un valor de scaleUpFactor de 1.0
significa que el autoescalado se realizará de forma que el recurso pendiente o disponible sea 0 (utilización perfecta).
Configurando scaleDownFactor
scaleDownFactor
controla la intensidad con la que la herramienta de adaptación dinámica reduce el tamaño de un clúster. Especifica un número entre 0.0
y 1.0
para definir el valor fraccionario del recurso disponible de YARN que provoca la eliminación del nodo.
Deja este valor en 1.0
en la mayoría de los clústeres de varios trabajos que necesiten aumentar y reducir su escala con frecuencia. Como resultado de la retirada suave, las operaciones de reducción de escala son significativamente más lentas que las de aumento de escala. Si se define scaleDownFactor=1.0
, se establece una tasa de reducción agresiva, lo que minimiza el número de operaciones de reducción necesarias para alcanzar el tamaño de clúster adecuado.
En los clústeres que necesiten más estabilidad, define un valor de scaleDownFactor
más bajo para que la reducción se produzca a un ritmo más lento.
Asigna el valor 0.0
a esta propiedad para evitar que el clúster se reduzca, por ejemplo, cuando utilices clústeres efímeros o de una sola tarea.
Configurar scaleUpMinWorkerFraction
y scaleDownMinWorkerFraction
scaleUpMinWorkerFraction
y scaleDownMinWorkerFraction
se usan con scaleUpFactor
o scaleDownFactor
y tienen los valores predeterminados 0.0
. Representan los umbrales en los que el escalador automático aumentará o reducirá el tamaño del clúster: el aumento o la reducción fraccionarios mínimos del tamaño del clúster necesarios para emitir solicitudes de escalado vertical.
Por ejemplo, la herramienta de ajuste automático de escala no emitirá una solicitud de actualización para añadir 5 trabajadores a un clúster de 100 nodos a menos que scaleUpMinWorkerFraction
sea menor o igual que 0.05
(5%). Si se define como 0.1
, el escalador automático no emitirá la solicitud para aumentar la escala del clúster.
Del mismo modo, si scaleDownMinWorkerFraction
es 0.05
, la herramienta de ajuste automático de escala no se activará hasta que se eliminen al menos 5 nodos.
El valor predeterminado de 0.0
significa que no hay umbral.
Se recomienda encarecidamente definir un valor de scaleDownMinWorkerFractionthresholds
más alto en clústeres grandes (más de 100 nodos) para evitar operaciones de escalado pequeñas e innecesarias.
Elige un periodo de enfriamiento
El cooldownPeriod
define un periodo durante el cual el autoescalador no enviará solicitudes para cambiar el tamaño del clúster. Puedes usarlo para limitar la frecuencia de los cambios de la herramienta de escalado automático en el tamaño del clúster.
El valor mínimo y predeterminado de cooldownPeriod
es de dos minutos. Si se define un cooldownPeriod
más corto en una política, los cambios en la carga de trabajo afectarán más rápidamente al tamaño del clúster, pero es posible que los clústeres se amplíen y se reduzcan innecesariamente. Lo recomendable es asignar a scaleUpMinWorkerFraction
y scaleDownMinWorkerFraction
de una política un valor distinto de cero cuando se use un cooldownPeriod
más corto. De esta forma, el clúster solo se ampliará o reducirá cuando el cambio en la utilización de recursos sea suficiente para justificar una actualización del clúster.
Si tu carga de trabajo es sensible a los cambios en el tamaño del clúster, puedes aumentar el periodo de enfriamiento. Por ejemplo, si estás ejecutando un trabajo de procesamiento por lotes, puedes definir un periodo de enfriamiento de 10 minutos o más. Experimenta con diferentes periodos de enfriamiento para encontrar el valor que mejor se adapte a tu carga de trabajo.
Límites del número de trabajadores y pesos de los grupos
Cada grupo de trabajadores tiene minInstances
y maxInstances
que configuran un límite estricto en el tamaño de cada grupo.
Cada grupo también tiene un parámetro llamado weight
que configura el equilibrio objetivo entre los dos grupos. Ten en cuenta que este parámetro es solo una sugerencia y, si un grupo alcanza su tamaño mínimo o máximo, los nodos solo se añadirán o quitarán del otro grupo. Por lo tanto, weight
casi siempre se puede dejar en el valor predeterminado 1
.
Habilitar el autoescalado basado en núcleos
De forma predeterminada, YARN usa métricas de memoria para la asignación de recursos. En el caso de las aplicaciones que requieren un uso intensivo de la CPU, lo más recomendable es configurar YARN para que use la calculadora de recursos dominante. Para ello, define la siguiente propiedad al crear un clúster:
capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator
Métricas y registros de autoescalado
Los siguientes recursos y herramientas pueden ayudarte a monitorizar las operaciones de escalado automático y su efecto en tu clúster y sus trabajos.
Cloud Monitoring
Usa Cloud Monitoring para lo siguiente:
- Ver las métricas que usa el autoescalado
- Ver el número de gestores de nodos de tu clúster
- Entender por qué se ha escalado o no el clúster con el autoescalado
Cloud Logging
Usa Cloud Logging para ver los registros del escalador automático de Dataproc.
1) Busca los registros de tu clúster.
2) Selecciona dataproc.googleapis.com/autoscaler
.
3) Despliega los mensajes de registro para ver el campo status
. Los registros están en formato JSON, un formato electrónico.
4) Expanda el mensaje de registro para ver las recomendaciones de escalado, las métricas utilizadas para tomar decisiones de escalado, el tamaño original del clúster y el nuevo tamaño de clúster de destino.
Información general: autoescalado con Apache Hadoop y Apache Spark
En las siguientes secciones se explica cómo interactúa (o no) el autoescalado con Hadoop YARN y Hadoop MapReduce, así como con Apache Spark, Spark Streaming y Spark Structured Streaming.
Métricas de Hadoop YARN
El autoescalado se centra en las siguientes métricas de Hadoop YARN:
Allocated resource
hace referencia al total de recursos de YARN que ocupan los contenedores en ejecución en todo el clúster. Si hay 6 contenedores en ejecución que pueden usar hasta 1 unidad de recurso, hay 6 recursos asignados.Available resource
es el recurso de YARN del clúster que no utilizan los contenedores asignados. Si hay 10 unidades de recursos en todos los gestores de nodos y 6 de ellas están asignadas, hay 4 recursos disponibles. Si hay recursos disponibles (sin usar) en el clúster, el autoescalado puede eliminar trabajadores del clúster.Pending resource
es la suma de las solicitudes de recursos de YARN de los contenedores pendientes. Los contenedores pendientes están esperando espacio para ejecutarse en YARN. El recurso pendiente es distinto de cero solo si el recurso disponible es cero o demasiado pequeño para asignarlo al siguiente contenedor. Si hay contenedores pendientes, el autoescalado puede añadir trabajadores al clúster.
Puede ver estas métricas en Cloud Monitoring. De forma predeterminada, la memoria de YARN será 0, 8 * la memoria total del clúster.La memoria restante se reserva para otros daemons y para el uso del sistema operativo, como la caché de páginas. Puede anular el valor predeterminado con el ajuste de configuración de YARN "yarn.nodemanager.resource.memory-mb" (consulte las propiedades relacionadas con Apache Hadoop YARN, HDFS y Spark).
Autoescalado y MapReduce de Hadoop
MapReduce ejecuta cada tarea de asignación y reducción como un contenedor YARN independiente. Cuando empieza un trabajo, MapReduce envía solicitudes de contenedor para cada tarea de mapa, lo que provoca un gran pico en la memoria de YARN pendiente. A medida que se completan las tareas de asignación, la memoria pendiente disminuye.
Cuando se hayan completado mapreduce.job.reduce.slowstart.completedmaps
(95% de forma predeterminada en Dataproc), MapReduce pone en cola las solicitudes de contenedor de todos los reductores, lo que provoca otro pico en la memoria pendiente.
A menos que tus tareas de asignación y reducción tarden varios minutos o más, no definas un valor alto para el ajuste automático de escala scaleUpFactor
. Añadir trabajadores al clúster lleva al menos 1,5 minutos, así que asegúrate de que haya suficiente trabajo pendiente para utilizar el nuevo trabajador durante varios minutos. Un buen punto de partida es asignar a scaleUpFactor
el valor 0,05 (5%) o 0,1 (10%) de la memoria pendiente.
Autoescalado y Spark
Spark añade una capa adicional de programación sobre YARN. En concreto, la asignación dinámica de Spark Core envía solicitudes a YARN para que los contenedores ejecuten los ejecutores de Spark y, a continuación, programa las tareas de Spark en los hilos de esos ejecutores. Los clústeres de Dataproc tienen habilitada la asignación dinámica de forma predeterminada, por lo que los ejecutores se añaden y se eliminan según sea necesario.
Spark siempre pide contenedores a YARN, pero, sin la asignación dinámica, solo pide contenedores al principio de la tarea. Con la asignación dinámica, se eliminarán contenedores o se solicitarán otros nuevos según sea necesario.
Spark empieza con un número reducido de ejecutores (2 en clústeres con autoescalado) y sigue duplicando el número de ejecutores mientras haya tareas pendientes.
De esta forma, se suaviza la memoria pendiente (hay menos picos de memoria pendiente). Te recomendamos que asignes un valor alto al scaleUpFactor
de autoescalado, como 1.0 (100%), para las tareas de Spark.
Inhabilitar la asignación dinámica de Spark
Si ejecutas tareas de Spark independientes que no se benefician de la asignación dinámica de Spark, puedes inhabilitar la asignación dinámica de Spark configurando spark.dynamicAllocation.enabled=false
y spark.executor.instances
.
Puedes seguir usando el escalado automático para aumentar y reducir los clústeres mientras se ejecutan los trabajos de Spark independientes.
Tareas de Spark con datos almacenados en caché
spark.dynamicAllocation.cachedExecutorIdleTimeout
o quitar de la caché los conjuntos de datos cuando ya no se necesiten. De forma predeterminada, Spark no elimina los ejecutores que tienen datos almacenados en caché, lo que impediría reducir la escala del clúster.
Autoescalado y Spark Streaming
Como Spark Streaming tiene su propia versión de asignación dinámica que usa señales específicas de streaming para añadir y quitar ejecutores, define
spark.streaming.dynamicAllocation.enabled=true
y deshabilita la asignación dinámica de Spark Core definiendospark.dynamicAllocation.enabled=false
.No uses la retirada suave (autoescalado
gracefulDecommissionTimeout
) con tareas de Spark Streaming. En su lugar, para eliminar de forma segura los trabajadores con autoescalado, configura la creación de puntos de control para la tolerancia a fallos.
También puedes usar Spark Streaming sin autoescalado:
- Inhabilita la asignación dinámica de Spark Core (
spark.dynamicAllocation.enabled=false
) y - Define el número de ejecutores (
spark.executor.instances
) de tu trabajo. Consulta Propiedades del clúster.
Autoescalado y Spark Structured Streaming
El autoescalado no es compatible con Spark Structured Streaming, ya que este no admite la asignación dinámica (consulta SPARK-24815: Structured Streaming should support dynamic allocation).
Controlar el autoescalado mediante la creación de particiones y el paralelismo
Aunque el paralelismo suele definirse o determinarse mediante los recursos del clúster (por ejemplo, el número de bloques de HDFS controla el número de tareas), con el autoescalado se aplica lo contrario: el autoescalado define el número de trabajadores en función del paralelismo de las tareas. A continuación, se indican algunas directrices que te ayudarán a definir el paralelismo de los trabajos:
- Aunque Dataproc establece el número predeterminado de tareas de reducción de MapReduce en función del tamaño inicial del clúster, puedes definir
mapreduce.job.reduces
para aumentar el paralelismo de la fase de reducción. - El paralelismo de Spark SQL y DataFrame se determina mediante
spark.sql.shuffle.partitions
, cuyo valor predeterminado es 200. - Las funciones RDD de Spark tienen el valor predeterminado
spark.default.parallelism
, que se asigna al número de núcleos de los nodos de trabajador cuando se inicia la tarea. Sin embargo, todas las funciones de RDD que crean mezclas toman un parámetro para el número de particiones, que anulaspark.default.parallelism
.
Debes asegurarte de que tus datos estén particionados de forma uniforme. Si hay una desviación significativa de las claves, una o varias tareas pueden tardar mucho más que otras, lo que provoca una baja utilización.
Ajustes predeterminados de las propiedades de Spark y Hadoop del autoescalado
Los clústeres con escalado automático tienen valores de propiedad de clúster predeterminados que ayudan a evitar errores en las tareas cuando se eliminan trabajadores principales o se interrumpen trabajadores secundarios. Puede anular estos valores predeterminados al crear un clúster con escalado automático (consulte Propiedades del clúster).
Aumenta de forma predeterminada el número máximo de reintentos de tareas, maestros de aplicaciones y fases:
yarn:yarn.resourcemanager.am.max-attempts=10 mapred:mapreduce.map.maxattempts=10 mapred:mapreduce.reduce.maxattempts=10 spark:spark.task.maxFailures=10 spark:spark.stage.maxConsecutiveAttempts=10
Restablece de forma predeterminada los contadores de reintentos (útil para trabajos de Spark Streaming de larga duración):
spark:spark.yarn.am.attemptFailuresValidityInterval=1h spark:spark.yarn.executor.failuresValidityInterval=1h
De forma predeterminada, el mecanismo de inicio lento de Spark asignación dinámica empieza con un tamaño pequeño:
spark:spark.executor.instances=2
Preguntas frecuentes
En esta sección se incluyen preguntas y respuestas habituales sobre el ajuste automático de escala.
¿Se puede habilitar el autoescalado en clústeres de alta disponibilidad y de un solo nodo?
El autoescalado se puede habilitar en clústeres de alta disponibilidad, pero no en clústeres de un solo nodo (los clústeres de un solo nodo no admiten el cambio de tamaño).
¿Se puede cambiar el tamaño de un clúster de autoescalado manualmente?
Sí. Puedes decidir cambiar el tamaño de un clúster manualmente como medida provisional al ajustar una política de autoescalado. Sin embargo, estos cambios solo tendrán un efecto temporal y el escalado automático acabará reduciendo el tamaño del clúster.
En lugar de cambiar el tamaño de un clúster de autoescalado manualmente, puedes hacer lo siguiente:
Actualizando la política de autoescalado. Los cambios que se hagan en la política de autoescalado afectarán a todos los clústeres que la estén usando en ese momento (consulta Uso de políticas en varios clústeres).
Desvinculando la política y escalando manualmente el clúster al tamaño preferido.
Obtener asistencia de Dataproc
¿En qué se diferencia Dataproc del autoescalado de Dataflow?
Consulta Autoescalado horizontal de Dataflow y Autoescalado vertical de Dataflow Prime.
¿Puede el equipo de desarrollo de Dataproc restablecer el estado de un clúster de ERROR
a RUNNING
?
Por lo general, no. Para ello, es necesario verificar manualmente si es seguro restablecer el estado del clúster. Además, a menudo no se puede restablecer un clúster sin seguir otros pasos manuales, como reiniciar el NameNode de HDFS.
Dataproc asigna el estado ERROR
a un clúster cuando no puede determinar su estado después de que se haya producido un error en una operación. Los clústeres de ERROR
no se escalan automáticamente. Las causas más frecuentes son las siguientes:
Errores devueltos por la API de Compute Engine, a menudo durante las interrupciones de Compute Engine.
HDFS se corrompe debido a errores en la retirada de HDFS.
Errores de la API Dataproc Control, como "Task lease expired" (La asignación de tareas ha caducado).
Elimina y vuelve a crear los clústeres cuyo estado sea ERROR
.
¿Cuándo cancela el autoescalado una operación de reducción?
En el siguiente gráfico se muestra cuándo cancelará el escalado automático una operación de reducción (consulta también Cómo funciona el escalado automático).
Notas:
- El clúster tiene habilitado el autoescalado basado únicamente en las métricas de memoria de YARN (el valor predeterminado).
- T1-T9 representan los intervalos de enfriamiento en los que el escalador automático evalúa el número de trabajadores (se ha simplificado la sincronización de eventos).
- Las barras apiladas representan el número de trabajadores de YARN de clústeres activos, en proceso de retirada y retirados.
- El número de trabajadores recomendado por el escalador automático (línea negra) se basa en las métricas de memoria de YARN, el número de trabajadores activos de YARN y la configuración de la política de escalado automático (consulta Cómo funciona el escalado automático).
- El área de fondo rojo indica el periodo durante el que se ejecuta la operación de reducción.
- El área de fondo amarillo indica el periodo durante el que se cancela la operación de reducción.
- El área de fondo verde indica el periodo de la operación de escalado.
Las siguientes operaciones se realizan en los momentos que se indican:
T1: La herramienta de adaptación dinámica inicia una operación de reducción de escala de retirada gradual para reducir aproximadamente la mitad de los trabajadores del clúster actual.
T2: La herramienta de adaptación dinámica sigue monitorizando las métricas del clúster. No cambia la recomendación de reducción y la operación de reducción continúa. Algunos trabajadores se han retirado y otros se están retirando (Dataproc eliminará los trabajadores retirados).
T3: La herramienta de ajuste de escala automático calcula que el número de trabajadores se puede reducir aún más, posiblemente porque se ha liberado más memoria YARN. Sin embargo, como el número de trabajadores activos más el cambio recomendado en el número de trabajadores no es igual o superior al número de trabajadores activos más los que se van a retirar, no se cumplen los criterios para cancelar la reducción y el escalador automático no cancela la operación.
T4: YARN informa de un aumento de la memoria pendiente. Sin embargo, la herramienta de adaptación dinámica no cambia su recomendación sobre el número de trabajadores. Al igual que en T3, no se cumplen los criterios de cancelación de la reducción y el escalador automático no cancela la operación de reducción.
T5: la memoria pendiente de YARN aumenta y el cambio en el número de trabajadores recomendado por el escalador automático también aumenta. Sin embargo, como el número de trabajadores activos más el cambio recomendado en el número de trabajadores es inferior al número de trabajadores activos más los trabajadores que se van a retirar, no se cumplen los criterios de cancelación y no se cancela la operación de reducción.
T6: la memoria pendiente de YARN aumenta aún más. El número de trabajadores activos más el cambio en el número de trabajadores recomendado por el escalador automático ahora es mayor que el número de trabajadores activos más los que se están retirando. Se cumplen los criterios de cancelación y la herramienta de ajuste dinámico cancela la operación de reducción.
T7: El escalador automático está esperando a que se complete la cancelación de la operación de reducción. El escalador automático no evalúa ni recomienda un cambio en el número de trabajadores durante este intervalo.
T8: Se completa la cancelación de la operación de reducción. Los trabajadores retirados se añaden al clúster y se activan. La herramienta de ajuste automático detecta que se ha completado la cancelación de la operación de reducción de escala y espera al siguiente periodo de evaluación (T9) para calcular el número de trabajadores recomendado.
T9: no hay operaciones activas en el momento T9. Según la política de la herramienta de adaptación dinámica y las métricas de YARN, la herramienta de adaptación dinámica recomienda una operación de escalado vertical.