Ajuste de escala automático de los clústeres de Dataproc

¿Qué es el ajuste de escala automático?

Es difícil calcular la cantidad “adecuada” trabajadores (nodos) del clúster de una carga de trabajo, un tamaño único del clúster para toda la canalización no suele ser lo ideal. El Escalamiento de clústeres iniciado por parte del usuario aborda de forma parcial este desafío, pero requiere la supervisión del uso y la intervención manual del clúster.

La API de AutoscalingPolicies de Dataproc proporciona un mecanismo para automatizar la administración de recursos del clúster y habilitar el ajuste de escala automático de la VM de trabajador del clúster. Una Autoscaling Policy es una configuración reutilizable que describe cómo se deben escalar los trabajadores del clúster que usan la política de ajuste de escala automático. Define los límites de escalamiento, frecuencia y agresividad para proporcionar un control detallado sobre los recursos de los clústeres a lo largo de su ciclo de vida.

Cuándo usar el ajuste de escala automático

Usa el ajuste de escala automático:

en clústeres que almacenan datos en servicios externos, como Cloud Storage o BigQuery

en clústeres que procesan muchos trabajos

para escalar clústeres de un solo trabajo

con el modo de flexibilidad mejorada para los trabajos por lotes de Spark

El ajuste de escala automático no se recomienda para o con:

  • HDFS: El ajuste de escala automático no se diseñó para escalar HDFS en el clúster:

    1. El uso de HDFS no es un indicador para el ajuste de escala automático.
    2. Los datos HDFS solo se alojan en trabajadores principales. La cantidad de trabajadores principales debe ser suficiente para alojar todos los datos de HDFS.
    3. El retiro de DataNodes de HDFS puede retrasar la eliminación de trabajadores. DataNodes copian bloques de HDFS a otros DataNodes antes de que se quite un trabajador. Según el tamaño de los datos y el factor de replicación, este proceso puede tomar horas.
  • Etiquetas de nodos de YARN: el ajuste de escala automático no es compatible con etiquetas de nodo de YARN, ni la propiedad dataproc:am.primary_only debido a YARN-9088. YARN informa de manera incorrecta las métricas del clúster cuando se usan etiquetas de nodo.

  • Spark Structured Streaming: El ajuste de escala automático no es compatible con la Spark Structured Streaming (consulta Ajuste de escala automático y Spark Structured Streaming).

  • Clústeres inactivos: No se recomienda el ajuste de escala automático para reducir el tamaño de un clúster al tamaño mínimo cuando este está inactivo. Dado que la creación de un clúster nuevo es tan rápida como cambiar el tamaño de uno, considera borrar los clústeres inactivos y volver a crearlos. Las siguientes herramientas son compatibles con este modelo “efímero”:

    Usa los flujos de trabajo de Dataproc para programar un conjunto de trabajos en un clúster dedicado y, luego, borra el clúster cuando finalicen los trabajos. Para una organización más avanzada, usa Cloud Composer, que se basa en Apache Airflow.

    Para clústeres que procesan consultas ad hoc o cargas de trabajo programadas externamente, usa la eliminación programada de clústeres para borrar el clúster después de un período o duración de inactividad que se especificó, o en un momento en particular.

  • Cargas de trabajo de diferentes tamaños: Cuando los trabajos pequeños y grandes se ejecutan en un clúster, la reducción de escala vertical con retiro de servicio ordenado esperará a que finalicen los trabajos grandes. El resultado es que un trabajo de larga duración retrasará el ajuste de escala automático de los recursos de los trabajos más pequeños que se ejecutan en el clúster hasta que finalice el trabajo de larga duración. Para evitar este resultado, agrupa trabajos más pequeños de tamaño similar en un clúster y aísla cada trabajo de larga duración en un clúster independiente.

Habilita el ajuste de escala automático

Para habilitar el ajuste de escala automático en un clúster, haz lo siguiente:

  1. Crea una política de ajuste de escala automático

  2. Realiza una de las siguientes acciones:

    1. Crea un clúster de ajuste de escala automático
    2. Habilita el ajuste de escala automático en un clúster existente.

Crea una política de ajuste de escala automático

Comando de gcloud

Puedes usar el comando gcloud dataproc autoscaling-policies import para crear una política de ajuste de escala automático. Lee un archivo local YAML que define una política de ajuste de escala automático. El formato y el contenido del archivo deben coincidir con los objetos de configuración y los campos que define la API de REST autoscalingPolicies.

El siguiente ejemplo de YAML define una política que especifica todos los campos obligatorios. También proporciona laminInstances ymaxInstances los valores de los trabajadores principales,maxInstances valor para los trabajadores secundarios (interrumpibles) y especifica un plazo de 4 minutoscooldownPeriod (el valor predeterminado es de 2 minutos). workerConfig configura los trabajadores principales. En este ejemplo, minInstances y maxInstances se configuran con el mismo valor para evitar escalar los trabajadores principales.

workerConfig:
  minInstances: 10
  maxInstances: 10
secondaryWorkerConfig:
  maxInstances: 50
basicAlgorithm:
  cooldownPeriod: 4m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    gracefulDecommissionTimeout: 1h

Este es otro ejemplo de YAML en el que se especifican todos los campos opcionales y obligatorios de la política de ajuste de escala automático.

workerConfig:
  minInstances: 10
  maxInstances: 10
  weight: 1
secondaryWorkerConfig:
  minInstances: 0
  maxInstances: 100
  weight: 1
basicAlgorithm:
  cooldownPeriod: 2m
  yarnConfig:
    scaleUpFactor: 0.05
    scaleDownFactor: 1.0
    scaleUpMinWorkerFraction: 0.0
    scaleDownMinWorkerFraction: 0.0
    gracefulDecommissionTimeout: 1h

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell para crear la política de ajuste de escala automático. Proporciona un nombre para la política. Este nombre se convertirá en el id de la política, que puedes usar en comandos de gcloud posteriores para hacer referencia a la política. Usa la marca --source a fin de especificar la ruta de acceso del archivo local y el nombre de archivo del archivo YAML de la política de ajuste de escala automático para importar.

gcloud dataproc autoscaling-policies import policy-name \
    --source=filepath/filename.yaml \
    --region=region

API de REST

Para crear una política de ajuste de escala automático, define una AutoscalingPolicy como parte de una solicitud autoscalingPolicies.create.

Console

Para crear una política de ajuste de escala automático, selecciona CREAR POLÍTICA en la página Políticas de ajuste de escala automático de Dataproc mediante la consola de Google Cloud. En la página Crear política, puedes seleccionar un panel de recomendación de política a fin de propagar los campos de la política de ajuste de escala automático para un tipo de trabajo o un objetivo de escalamiento específicos.

Crea un clúster de ajuste de escala automático

Después de crear una política de ajuste de escala automático, crea un clúster que use esta política. El clúster debe estar en la misma región que la política de ajuste de escala automático.

Comando de gcloud

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell para crear un clúster de ajuste de escala automático. Proporciona un nombre para el clúster y usa la marca --autoscaling-policy a fin de especificar el policy id (el nombre de la política que especificaste cuando creaste la política) o la política resource URI (resource name) (consulta los campos id y name de AutoscalingPolicy ).

gcloud dataproc clusters create cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API de REST

Crea un clúster de ajuste de escala automático mediante la inclusión de AutoscalingConfig como parte de una solicitud clusters.create.

Console

Puedes seleccionar una política de ajuste de escala automático existente para aplicarla a un clúster nuevo en la sección Política de ajuste de escala automático del panel Configurar clústeres en la página Crea un clúster de Dataproc en la consola de Google Cloud.

Habilita el ajuste de escala automático en un clúster existente

Después de crear una política de ajuste de escala automático, puedes habilitarla en un clúster existente en la misma región.

Comando de gcloud

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell a fin de habilitar una política de ajuste de escala automático en un clúster existente. Proporciona el nombre del clúster y usa la marca --autoscaling-policy para especificar el policy id (el nombre de la política que especificaste cuando creaste la política) o la política resource URI (resource name) (consulta los campos id y name de AutoscalingPolicy).

gcloud dataproc clusters update cluster-name \
    --autoscaling-policy=policy id or resource URI \
    --region=region

API de REST

Para habilitar una política de ajuste de escala automático en un clúster existente, configura el AutoscalingConfig.policyUri de la política en el updateMask de una solicitud clusters.patch.

Console

En la actualidad, la consola de Google Cloud no admite habilitar una política de ajuste de escala automático en un clúster existente.

Uso de la política en varios clústeres

  • Una política de ajuste de escala automático define el comportamiento de escalamiento que se puede aplicar a varios clústeres. Una política de ajuste de escala automático se aplica mejor en varios clústeres cuando estos compartan cargas de trabajo similares o ejecuten trabajos con patrones de uso de recursos similares.

  • Puedes actualizar una política que varios clústeres usan. Las actualizaciones afectan de inmediato el comportamiento del ajuste de escala automático de todos los clústeres que usan la política (consulta autoscalingPolicies.update). Si no deseas que una actualización de la política se aplique a un clúster que usa la política, inhabilita el ajuste de escala automático en el clúster antes de actualizarla.

Comando de gcloud

Ejecuta el siguiente comando de gcloud desde una terminal local o en Cloud Shell para inhabilitar el ajuste de escala automático en un clúster.

gcloud dataproc clusters update cluster-name --disable-autoscaling \
    --region=region

API de REST

Para inhabilitar el ajuste de escala automático en un clúster, establece AutoscalingConfig.policyUri en la string vacía y establece update_mask=config.autoscaling_config.policy_uri en una solicitud clusters.patch.

Console

Por el momento, la consola de Google Cloud no admite la inhabilitación del ajuste de escala automático en un clúster.

Cómo funciona el ajuste de escala automático

El ajuste de escala automático verifica las métricas de Hadoop YARN del clúster a medida que transcurre el período de enfriamiento para determinar si se debe escalar el clúster y, si es así, la magnitud de la actualización.

  1. El valor de la métrica de recursos pendientes de YARN (Memoria pendiente o Núcleos pendientes) determina si se aumenta o se reduce la escala verticalmente. Un valor superior a 0 indica que los trabajos YARN están esperando recursos y que puede ser necesario escalar verticalmente. Un valor 0 indica que YARN tiene recursos suficientes para que no sea necesario reducir la escala o realizar otros cambios.

    Si el recurso pendiente es > 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ durante\ período\ de inactividad\Big(\frac{Pendiente + Disponible + Asignado + Reservado}{Recurso\ por\ trabajador}\Big)\BigGrceil \]

    Si el recurso pendiente es 0:

    $estimated\_worker\_count =$

    \[ \Biggl \lceil AVERAGE\ durante\ período\ de enfriamiento\Big(\frac{Asignado + Reservado}{Recurso\ por\ trabajador}\Grand)\Biggr \rceil \]

    De forma predeterminada, el escalador automático supervisa el recurso de memoria YARN. Si habilitas el ajuste de escala automático basado en núcleos, se supervisan la memoria y los núcleos YARN: estimated_worker_count se evalúa por separado para la memoria y los núcleos, y se selecciona el recuento de trabajadores más grande resultante.

    $estimated\_worker\_count =$

    \[max(estimated\_worker\_count\_by\_memory,\Estimated\_worker\_count\_by\_cores) \]

    \[ estimated\ \Delta worker = estimated\_worker\_count - current\_worker\_count \]

  2. Dado el cambio estimado necesario para la cantidad de trabajadores, el ajuste de escala automático usa una scaleUpFactor o scaleDownFactor para calcular el cambio real en la cantidad de trabajadores:

    if estimated Δworkers > 0:
      actual Δworkers = ROUND_UP(estimated Δworkers * scaleUpFactor)
      # examples:
      # ROUND_UP(estimated Δworkers=5 * scaleUpFactor=0.5) = 3
      # ROUND_UP(estimated Δworkers=0.8 * scaleUpFactor=0.5) = 1
    else:
      actual Δworkers = ROUND_DOWN(estimated Δworkers * scaleDownFactor)
      # examples:
      # ROUND_DOWN(estimated Δworkers=-5 * scaleDownFactor=0.5) = -2
      # ROUND_DOWN(estimated Δworkers=-0.8 * scaleDownFactor=0.5) = 0
      # ROUND_DOWN(estimated Δworkers=-1.5 * scaleDownFactor=0.5) = 0
    
    Un scaleUpFactor o scaleDownFactor de 1.0 significa que el ajuste de escala automático escalará para que el recurso pendiente o disponible sea 0 (uso perfecto).

  3. Una vez que se calcula el cambio en la cantidad de trabajadores, scaleUpMinWorkerFraction y scaleDownMinWorkerFraction actúan como un umbral para determinar si el ajuste de escala automático escalará el clúster. Una fracción pequeña significa que el ajuste de escala automático debe escalar incluso si el Δworkers es pequeño. Una fracción mayor significa que el escalamiento solo debe suceder cuando el Δworkers es grande.

    IF (Δworkers >  scaleUpMinWorkerFraction * current_worker_count) then scale up
    
    O
    IF (abs(Δworkers) >  scaleDownMinWorkerFraction * current_worker_count),
    THEN scale down.
    

  4. Si la cantidad de trabajadores a fin de escalar es bastante grande para activar el escalamiento, el ajuste de escala automático usa los límitesminInstances y maxInstances de workerConfig y secondaryWorkerConfig y weight (proporción de trabajadores principales a secundarios) a fin de determinar cómo dividir la cantidad de trabajadores en los grupos de instancias de trabajadores principales y secundarios. El resultado de estos cálculos es el cambio de ajuste de escala automático final en el clúster para el período de escalamiento.

  5. Las solicitudes de reducción de escala automático se cancelarán en clústeres creados con versiones de imagen posteriores a 2.0.57 y 2.1.5 si se cumplen las siguientes condiciones:

    1. hay una reducción de escala en curso con un valor de tiempo de espera de retiro de servicio ordenado distinto de cero y
    2. la cantidad de trabajadores YARN ACTIVOS (“trabajadores activos”) más el cambio en la cantidad total de trabajadores que recomienda el escalador automático (Δworkers) es igual o mayor que DECOMMISSIONING trabajadores YARN (“trabajadores activos”), como se muestra en la siguiente fórmula:

      IF (active workers + Δworkers ≥ active workers + decommissioning workers)
      THEN cancel the scaledown operation
      

    Para ver un ejemplo de cancelación de reducción de escala, consulta ¿Cuándo se cancela una operación para reducir la escala de forma automática el ajuste de escala automático?.

Recomendaciones de configuración del ajuste de escala automático

Evita escalar trabajadores principales

Los trabajadores principales ejecutan Datanodes de HDFS, mientras que los trabajadores secundarios son de solo procesamiento. El uso de trabajadores secundarios te permite escalar de manera eficiente los recursos de procesamiento sin la necesidad de aprovisionar almacenamiento, lo que da como resultado capacidades de escalamiento más rápidas. Los NameNodes de HDFS pueden tener varias condiciones de carrera que hacen que HDFS se dañe, de modo que el retiro de servicio se detenga de forma indefinida. Para evitar este problema, evita escalar trabajadores principales. Por ejemplo: workerConfig: minInstances: 10 maxInstances: 10 secondaryWorkerConfig: minInstances: 0 maxInstances: 100

Hay algunas modificaciones que se deben realizar en el comando de creación del clúster:

  1. Configura --num-workers=10 para que coincida con el tamaño del grupo de trabajadores principales de la política de ajuste de escala automático.
  2. Configura --secondary-worker-type=non-preemptible para configurar los trabajadores secundarios a fin de que no sean interrumpibles. A menos que se deseen las VM interrumpibles.
  3. Copia la configuración de hardware de los trabajadores principales a los secundarios. Por ejemplo, configura --secondary-worker-boot-disk-size=1000GB para que coincida con --worker-boot-disk-size=1000GB.

Usa el modo de flexibilidad mejorada para los trabajos por lotes de Spark

Usa el Modo de flexibilidad mejorada (EFM) con el ajuste de escala automático para hacer lo siguiente:

habilita una reducción de escala más rápida del clúster mientras se ejecutan los trabajos.

evitan interrupciones en los trabajos en ejecución debido a la reducción de escala del clúster

minimizan las interrupciones en los trabajos en ejecución debido a la interrupción de trabajadores secundarios interrumpibles.

Con EFM habilitado, el tiempo de espera del retiro de servicio ordenado de una política de ajuste de escala automático debe establecerse en 0s. La política de ajuste de escala automático solo debe realizar un ajuste de escala automático de trabajadores secundarios.

Elige un tiempo de espera de retiro de servicio ordenado

El ajuste de escala automático admite el retiro de servicio ordenado de YARN cuando se quitan nodos de un clúster. El retiro de servicio ordenado permite que las aplicaciones terminen de reproducir aleatoriamente datos entre etapas para evitar volver a establecer la configuración el progreso del trabajo. El ⁠tiempo de espera de retiro de servicio ordenado que se proporciona en una política de ajuste de escala automático es el límite superior de duración que esperará YARN para aplicaciones en ejecución (aplicaciones que se ejecutaban cuando se inició el retiro) antes de quitar los nodos.

Cuando un proceso no se completa dentro del tiempo de espera del retiro de servicio ordenado especificado, el nodo trabajador se cierra de manera forzosa, lo que podría causar la pérdida de datos o la interrupción del servicio. Para evitar esta posibilidad, establece el tiempo de espera del retiro de servicio ordenado en un valor mayor que el trabajo más largo que procesará el clúster. Por ejemplo, si esperas que tu trabajo más largo se ejecute durante una hora, establece el tiempo de espera en, al menos, una hora (1h).

Considera migrar trabajos que tarden más de 1 hora en sus propios clústeres efímeros para evitar bloquear el retiro de servicio ordenado.

Configura scaleUpFactor

scaleUpFactor controla la agresividad con la que el escalador automático escala verticalmente un clúster. Especifica un número entre 0.0 y 1.0 para establecer el valor fraccionario del recurso pendiente de YARN que provoca la adición de nodos.

Por ejemplo, si hay 100 contenedores pendientes que solicitan 512 MB cada uno, hay 50 GB de memoria YARN pendiente. Si scaleUpFactor es 0.5, el escalador automático agregará suficientes nodos para agregar 25 GB de memoria YARN. De forma similar, si es 0.1, el escalador automático agregará suficientes nodos por 5 GB. Ten en cuenta que estos valores corresponden a la memoria YARN, no a la memoria total disponible en una VM.

Un buen punto de partida es 0.05 para los trabajos de MapReduce y los trabajos de Spark con la asignación dinámica habilitada. Para los trabajos de Spark con un recuento de ejecutor fijo y trabajos de Tez, usa 1.0. Un scaleUpFactor de 1.0 significa que el ajuste de escala automático escalará para que el recurso pendiente o disponible sea 0 (uso perfecto).

Configura scaleDownFactor

scaleDownFactor controla la agresividad con la que el escalador automático disminuye un clúster. Especifica un número entre 0.0 y 1.0 para establecer el valor fraccionario del recurso YARN disponible que provoca la eliminación del nodo.

Deja este valor en 1.0 para la mayoría de los clústeres de varios trabajos que necesitan escalar verticalmente con frecuencia. Como resultado del retiro de servicio ordenado, las operaciones de reducción de escala son mucho más lentas que las operaciones de escalamiento vertical. Cuando se configura scaleDownFactor=1.0, se establece una tasa de reducción de escala agresiva, lo que minimiza la cantidad de operaciones de escalamiento descendente necesarias para lograr el tamaño de clúster adecuado.

Para los clústeres que necesitan más estabilidad, establece una scaleDownFactor más baja para una tasa de reducción de escala más lenta.

Establece este valor en 0.0 para evitar reducir la escala del clúster, por ejemplo, cuando usas clústeres efímeros o de un solo trabajo.

Configura scaleUpMinWorkerFraction y scaleDownMinWorkerFraction

scaleUpMinWorkerFraction y scaleDownMinWorkerFraction se usan con scaleUpFactor o scaleDownFactor, y tienen valores predeterminados de 0.0. Representan los umbrales en los que el escalador automático aumenta o reduce la escala del clúster: el aumento o la disminución mínimos del valor fraccionario en el tamaño del clúster necesario para emitir solicitudes de escalamiento vertical o vertical.

Ejemplos: El escalador automático no emitirá una solicitud de actualización para agregar 5 trabajadores a un clúster de 100 nodos, a menos que scaleUpMinWorkerFraction sea menor o igual que 0.05 (5%). Si se configura como 0.1, el escalador automático no emitirá la solicitud para escalar verticalmente el clúster. De manera similar, si scaleDownMinWorkerFraction es 0.05, el escalador automático no emitirá una solicitud de actualización para quitar nodos de un clúster de 100 nodos, a menos que se quiten al menos 5 nodos.

El valor predeterminado de 0.0 significa que no hay umbral.

Se recomienda configurar una scaleDownMinWorkerFractionthresholds mayor en clústeres grandes (más de 100 nodos) para evitar operaciones de escalamiento pequeñas e innecesarias.

Elige un período de enfriamiento

cooldownPeriod establece un período durante el cual el escalador automático no emitirá solicitudes para cambiar el tamaño del clúster. Puedes usarlo para limitar la frecuencia de los cambios del escalador automático al tamaño del clúster.

El cooldownPeriod mínimo y predeterminado es de dos minutos. Si se configura un cooldownPeriod más corto en una política, los cambios en la carga de trabajo afectarán más rápido el tamaño del clúster, pero los clústeres podrían escalarse verticalmente o reducirse de forma innecesaria. Se recomienda configurar el scaleUpMinWorkerFraction y scaleDownMinWorkerFraction de una política en un valor distinto de cero cuando uses un cooldownPeriod más corto. Esto garantiza que el clúster solo escale verticalmente o se reduzca cuando el cambio en el uso de recursos sea suficiente para garantizar una actualización del clúster.

Si tu carga de trabajo es sensible a los cambios en el tamaño del clúster, puedes aumentar el período de inactividad. Por ejemplo, si ejecutas un trabajo de procesamiento por lotes, puedes establecer el período de inactividad en 10 minutos o más. Experimenta con diferentes períodos de inactividad a fin de encontrar el valor que funcione mejor para tu carga de trabajo.

Límites de recuento de trabajadores y pesos de grupo

Cada grupo de trabajadores tiene minInstances y maxInstances que configuran un límite estricto en cuanto al tamaño de cada grupo.

Cada grupo también tiene un parámetro llamado weight que configura el balanceo de destino entre los dos grupos. Ten en cuenta que este parámetro es solo una sugerencia y, si un grupo alcanza su tamaño mínimo o máximo, los nodos solo se agregarán o quitarán del otro grupo. Por lo tanto, weight siempre se puede dejar en el valor predeterminado 1.

Habilita el ajuste de escala automático basado en núcleos

De forma predeterminada, YARN usa métricas de memoria para la asignación de recursos. En aplicaciones con uso intensivo de CPU, se recomienda configurar YARN para usar la Calculadora de recursos dominantes. Para ello, configura la siguiente propiedad cuando crees un clúster:

capacity-scheduler:yarn.scheduler.capacity.resource-calculator=org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

Ajuste de escala automático de métricas y registros

Los siguientes recursos y herramientas pueden ayudarte a supervisar las operaciones de ajuste de escala automático y su efecto en tu clúster y sus trabajos.

Cloud Monitoring

Usa Cloud Monitoring para hacer lo siguiente:

  • Visualizar las métricas que usa el ajuste de escala automático
  • ver la cantidad de administradores de nodos en tu clúster
  • comprender por qué el ajuste de escala automático escaló o no tu clúster autoscaling-stackdriver1 autoscaling-stackdriver2 autoscaling-stackdriver3

Cloud Logging

Usa Cloud Logging para ver los registros del escalador automático de Cloud Dataproc.

1) Busca registros para tu clúster.

autoscaling-logs-for-cluster

2) Selecciona dataproc.googleapis.com/autoscaler.

autoscaling-log-file

3) Expande los mensajes del registro para ver el campo status. Los registros están en JSON, un formato procesable.

autoscaling-three-logs autoscaling-update-operation

4) Expande el mensaje de registro para ver las recomendaciones de escalamiento, las métricas utilizadas a fin de tomar decisiones de escalamiento, el tamaño original del clúster y el tamaño nuevo del clúster de destino.

autoscaling-recommendation-message

Segundo plano: ajuste de escala automático con Apache Hadoop y Apache Spark

En las secciones a continuación, se analiza cómo interopera el ajuste de escala automático (o no) con Hadoop YARN y Hadoop Mapreduce, y con Apache Spark, Spark Streaming y Spark Structured Streaming.

Métricas de Hadoop YARN

El ajuste de escala automático se centra en las siguientes métricas de YARN de Hadoop:

  1. Allocated resource se refiere al recurso total de YARN que ocupa la ejecución de contenedores en todo el clúster. Si hay 6 contenedores en ejecución que pueden usar hasta 1 unidad de recurso, hay 6 recursos asignados.

  2. Available resource es un recurso YARN en el clúster que no usan los contenedores asignados. Si hay 10 unidades de recursos en todos los administradores de nodos y 6 de ellos están asignados, hay 4 recursos disponibles. Si hay recursos disponibles (sin usar) en el clúster, el ajuste de escala automático puede quitar trabajadores del clúster.

  3. Pending resource es la suma de las solicitudes de recursos de YARN para los contenedores pendientes. Los contenedores pendientes esperan a que se ejecute el espacio en YARN. El recurso pendiente es distinto de cero solo si el recurso disponible es cero o demasiado pequeño para asignarlo al siguiente contenedor. Si hay contenedores pendientes, el ajuste de escala automático puede agregar trabajadores al clúster.

Puedes ver estas métricas en Cloud Monitoring. Como configuración predeterminada, la memoria YARN será 0.8 * memoria total en el clúster, con memoria restante reservada para otros daemons y uso del sistema operativo, como la caché de la página. Puedes anular el valor predeterminado con la configuración de YARN “yarn.nodemanager.resource.memory-mb” (consulta Apache Hadoop YARN, HDFS, Spark y propiedades relacionadas).

Ajuste de escala automático y MapReduce de Hadoop

MapReduce ejecuta cada asignación y reduce la tarea como un contenedor de YARN independiente. Cuando se inicia un trabajo, MapReduce envía solicitudes de contenedor a cada tarea de asignación, lo que da como resultado un gran aumento en la memoria YARN. A medida que finalizan las tareas de asignación, la memoria pendiente disminuye.

Cuando mapreduce.job.reduce.slowstart.completedmaps se completa (el 95% de forma predeterminada en Dataproc), MapReduce agrega a la cola las solicitudes de contenedor para todos los reductores, lo que da como resultado otro aumento en la memoria pendiente.

A menos que las tareas de reducción y el mapeo tarden varios minutos o más, no establezcas un valor alto para el ajuste de escala automático scaleUpFactor. Agregar trabajadores al clúster tarda al menos 1.5 minutos, así que asegúrate de que haya suficiente trabajo pendiente para usar uno nuevo durante varios minutos. Un buen punto de partida es establecer scaleUpFactor en 0.05 (5%) o 0.1 (10%) de memoria pendiente.

Ajuste de escala automático y Spark

Spark agrega una capa adicional de programación sobre YARN. En particular, la asignación dinámica de Spark Core realiza solicitudes a YARN para contenedores que ejecuten los ejecutores de Spark y, luego, programa las tareas de Spark en los subprocesos de esos ejecutores. Los clústeres de Dataproc habilitan la asignación dinámica de forma predeterminada, por lo que los ejecutores se agregan y quitan según sea necesario.

Spark siempre solicita YARN para contenedores, pero sin la asignación dinámica, solo requiere contenedores al comienzo del trabajo. Con la asignación dinámica, quitará contenedores o solicitará nuevos, según sea necesario.

Spark comienza desde una pequeña cantidad de ejecutores (2 en clústeres de ajuste de escala automático) y continúa duplicando la cantidad de ejecutores mientras hay tareas pendientes. Esto reduce la memoria pendiente (menos aumentos repentinos de memoria pendiente). Se recomienda configurar el ajuste de escala automático scaleUpFactor en un número mayor, como 1.0 (100%), para los trabajos de Spark.

Inhabilita la asignación dinámica de Spark

Si ejecutas trabajos de Spark independientes que no se benefician de la asignación dinámica de Spark, puedes inhabilitar spark.dynamicAllocation.enabled=false y configurar spark.executor.instances. Aún puedes usar el ajuste de escala automático para aumentar o reducir el escalamiento de los clústeres mientras se ejecutan los trabajos de Spark independientes.

Trabajos de Spark con datos almacenados en caché

Configura spark.dynamicAllocation.cachedExecutorIdleTimeout o los conjuntos de datos que no están almacenados en caché cuando ya no sean necesarios. De forma predeterminada, Spark no quita los ejecutores que tienen datos almacenados en caché, lo que evitaría disminuir la escala del clúster.

Ajuste de escala automático y transmisión de Spark

  1. Dado que Spark Streaming tiene su propia versión de asignación dinámica que usa señales específicas de transmisión para agregar y quitar ejecutores, debes configurar spark.streaming.dynamicAllocation.enabled=true e inhabilitar la asignación dinámica de Spark Core mediante el establecimiento de spark.dynamicAllocation.enabled=false.

  2. No uses el retiro de servicio ordenado (ajuste de escala automático de gracefulDecommissionTimeout) con trabajos de Spark Streaming. En su lugar, para quitar trabajadores de forma segura con el ajuste de escala automático, configura el punto de control para obtener la tolerancia a errores.

Como alternativa, para usar Spark Streaming sin ajuste de escala automático, haz lo siguiente:

  1. Inhabilita la asignación dinámica de Spark Core (spark.dynamicAllocation.enabled=false)
  2. Configura la cantidad de ejecutores (spark.executor.instances) de tu trabajo. Consulta Propiedades del clúster.

Ajuste de escala automático y Spark Structured Streaming

El ajuste de escala automático no es compatible con Spark Structured Streaming porque este no admite la asignación dinámica (consulta SPARK-24815: Structured Streaming debe admitir la asignación dinámica).

Controla el ajuste de escala automático a través de particiones y paralelismo

Si bien el paralelismo suele establecerse o determinarse en función de los recursos del clúster (por ejemplo, la cantidad de bloques de HDFS controla la cantidad de tareas), con el ajuste de escala automático sucede lo contrario: el ajuste de escala automático establece la cantidad de trabajadores según el paralelismo de trabajo. A continuación, se incluyen lineamientos que te ayudarán a configurar el paralelismo de trabajos:

  • Si bien Dataproc establece la cantidad predeterminada de MapReduce, reduce las tareas según el tamaño inicial del clúster de tu clúster, puedes configurar mapreduce.job.reduces para aumentar el paralelismo de la fase de reducción.
  • El paralelismo de Spark SQL y Dataframe está determinado por spark.sql.shuffle.partitions, que se establece de forma predeterminada en 200.
  • El valor predeterminado de las funciones RDD de Spark es spark.default.parallelism, que se establece en la cantidad de núcleos en los nodos trabajadores cuando se inicia el trabajo. Sin embargo, todas las funciones de RDD que crean reproducciones aleatorias toman un parámetro para la cantidad de particiones, que anula spark.default.parallelism.

Debes asegurarte de que tus datos estén particionados de forma uniforme. Si existe una desviación significativa de la clave, una o más tareas pueden tardar mucho más tiempo que otras, lo que genera un uso bajo.

Ajuste de escala automático de la configuración predeterminada de las propiedades de Spark y Hadoop

Los clústeres de ajuste de escala automático tienen valores de propiedad de clúster predeterminados que ayudan a evitar fallas de trabajo cuando se quitan los trabajadores principales o se interrumpen los trabajadores secundarios. Puedes anular estos valores predeterminados cuando creas un clúster con ajuste de escala automático (consulta Propiedades del clúster).

La configuración predeterminada aumenta la cantidad máxima de reintentos para tareas, instancias principales de aplicaciones y etapas:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

La configuración predeterminada restablece los contadores de reintentos (útil para trabajos de transmisión de Spark de larga duración):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Configuración predeterminada para que el mecanismo de asignación dinámica de inicio lento de Spark comience desde un tamaño pequeño:

spark:spark.executor.instances=2

Preguntas frecuentes

¿Se puede habilitar el ajuste de escala automático en clústeres de alta disponibilidad y de un solo nodo?

El ajuste de escala automático se puede habilitar en clústeres de alta disponibilidad, pero no en clústeres de un solo nodo (los clústeres de un solo nodo no admiten el cambio de tamaño).

¿Puedes cambiar el tamaño de un clúster de ajuste de escala automático de forma manual?

Sí. Puedes decidir cambiar el tamaño de un clúster de forma manual como una medida de detención cuando se modifica una política de ajuste de escala automático. Sin embargo, estos cambios solo tendrán un efecto temporal, y el ajuste de escala automático escalará el clúster luego.

En lugar de cambiar el tamaño de un clúster de ajuste de escala automático de forma manual, considera lo siguiente:

Actualiza la política de ajuste de escala automático Cualquier cambio que se realice en la política de ajuste de escala automático afectará a todos los clústeres que usen la política en este momento (consulta Uso de la política en varios clústeres).

Desconecta la política y escala de forma manual el clúster hasta alcanzar el tamaño preferido.

Obtén asistencia de Dataproc.

¿En qué se diferencia Dataproc del ajuste de escala automático de Dataflow?

Consulta Ajuste de escala automático horizontal de Dataflow y Ajuste de escala automático vertical de Dataflow Prime.

¿El equipo de desarrollo de Dataproc puede restablecer el estado de un clúster de ERROR a RUNNING?

En general, no. Esto requiere un esfuerzo manual para verificar si es seguro restablecer el estado del clúster y, a menudo, un clúster no se puede restablecer sin otros pasos manuales, como reiniciar el Namenode de HDFS.

Dataproc establece el estado de un clúster en ERROR cuando no puede determinar el estado de un clúster después de una operación con errores. Los clústeres en ERROR no tienen ajuste de escala automático ni ejecutan trabajos. Entre las causas comunes, se incluyen las siguientes:

  1. Errores que se muestran desde la API de Compute Engine, a menudo durante las interrupciones de Compute Engine.

  2. HDFS entra en un estado dañado debido a errores en el retiro de servicio de HDFS.

  3. Errores de la API de control de Dataproc, como “Venció la asignación de tiempo de tareas”

Borra y vuelve a crear los clústeres cuyo estado sea ERROR.

¿Cuándo se cancela el ajuste de escala automático una operación para reducir la escala verticalmente?

El siguiente gráfico es una ilustración que muestra cuándo el ajuste de escala automático cancelará una operación para reducir la escala verticalmente (también consulta Cómo funciona el ajuste de escala automático).

dataproc-autoscaling-cancellation-example

Notas:

  • El clúster tiene el ajuste de escala automático habilitado solo en función de las métricas de memoria de YARN (configuración predeterminada).
  • T1-T9 representa los intervalos de inactividad cuando el escalador automático evalúa la cantidad de trabajadores (se simplificó el tiempo de los eventos).
  • Las barras apiladas representan los recuentos de trabajadores de YARN de clústeres activos, con retiro de servicio y con retiro de servicio.
  • La cantidad recomendada de trabajadores del escalador automático (línea negra) se basa en las métricas de memoria de YARN, el recuento de trabajadores activos de YARN y la configuración de la política de ajuste de escala automático (consulta la sección Cómo funciona el ajuste de escala automático).
  • El área de fondo rojo indica el período en el que se ejecuta la operación de reducción de escala.
  • El área de fondo amarillo indica el período en el que se cancelará la operación de reducción de escala.
  • El área de fondo verde indica el período de la operación de escalamiento vertical.

Las siguientes operaciones ocurren en los siguientes momentos:

  • T1: El escalador automático inicia una operación de retiro de servicio ordenado para reducir la escala de alrededor de la mitad de los trabajadores del clúster actuales.

  • T2: El escalador automático continúa supervisando las métricas del clúster. No cambia su recomendación de reducción de escala, y la operación de reducción de escala continúa. Algunos trabajadores se retiraron de servicio y otros se están retirando (Dataproc borrará los trabajadores que se retiraron de servicio).

  • T3: El escalador automático calcula que la cantidad de trabajadores puede reducirse aún más, posiblemente debido a la disponibilidad de memoria YARN adicional. Sin embargo, como la cantidad de trabajadores activos más el cambio recomendado en la cantidad de trabajadores no es igual o mayor que la cantidad de trabajadores activos más los del retiro de servicio, no se cumplen los criterios para la cancelación de reducción de escala vertical, y el escalador automático no cancela la operación de reducción de escala.

  • T4: YARN informa un aumento en la memoria pendiente. Sin embargo, el escalador automático no cambia su recomendación del recuento de trabajadores. Al igual que en T3, los criterios de cancelación de reducción de escala vertical no se cumplen y el escalador automático no cancela la operación.

  • T5: Aumenta la memoria pendiente de YARN y el cambio en la cantidad de trabajadores que recomienda el escalador automático. Sin embargo, como la cantidad de trabajadores activos más el cambio recomendado en la cantidad de trabajadores es menor que la cantidad de trabajadores activos más los que están fuera de servicio, no se cumplen los criterios de cancelación y la operación de reducción de escala no se cancela.

  • T6: La memoria pendiente de YARN aumenta aún más. La cantidad de trabajadores activos más el cambio en la cantidad de trabajadores que recomienda el escalador automático ahora es mayor que la cantidad de trabajadores activos más los que están retirando de servicio. Se cumplen los criterios de cancelación, y el escalador automático cancela la operación para reducir la escala verticalmente.

  • T7: El escalador automático está esperando que se cancele la operación de reducción de escala. El escalador automático no evalúa ni recomienda un cambio en la cantidad de trabajadores durante este intervalo.

  • T8: Se completa la cancelación de la operación de reducción de escala. Los trabajadores de retiro de servicio se agregan al clúster y se activan. El escalador automático detecta la finalización de la cancelación de la operación de reducción de escala y espera al siguiente período de evaluación (T9) para calcular la cantidad recomendada de trabajadores.

  • T9: No hay operaciones activas en el momento T9. Según la política del escalador automático y las métricas de YARN, este recomienda una operación de escalamiento vertical.