Modo de flexibilidad mejorada de Dataproc

El modo de flexibilidad mejorada (EFM) de Dataproc administra los datos aleatorios para minimizar los retrasos en el progreso de los trabajos, provocados por la eliminación de nodos de un clúster en ejecución. EFM escribe datos de Spark Shuffle en los trabajadores principales. Los trabajadores extraen de esos nodos remotos durante la fase de reducción.

Dado que EFM no almacena datos aleatorios intermedios en trabajadores secundarios, es adecuado para su uso en clústeres que usan VMs interrumpibles o solo ajustan la escala automáticamente del grupo de trabajadores secundarios.

Limitaciones:

  • Los trabajos de Apache Hadoop YARN que no admiten la reubicación de AppMaster pueden fallar en el modo de flexibilidad mejorada (consulta Cuándo esperar a que finalice AppMasters).
  • No se recomienda usar el modo de flexibilidad mejorada:
    • En un clúster que solo tiene trabajadores principales.
  • No se admite el modo de flexibilidad mejorada:
    • Cuando el ajuste de escala automático del trabajador principal está habilitado. En la mayoría de los casos, los trabajadores principales seguirán almacenando datos aleatorios que no se migren de manera automática. El escalamiento descendente del grupo de trabajadores principales anula los beneficios de EFM.
    • cuando los trabajos de Spark se ejecutan en un clúster con el retiro de servicio ordenado habilitado. El retiro de servicio ordenado y EFM pueden funcionar con fines cruzados, ya que el mecanismo de retiro de servicio ordenado de YARN mantiene los nodos DE DESCOMISIÓN hasta que se completen todas las aplicaciones involucradas.

Usa el modo de flexibilidad mejorada

El modo de flexibilidad mejorada se configura por motor de ejecución, y debe configurarse durante la creación del clúster. La implementación de Spark EFM se configura con la propiedad del clúster dataproc:efm.spark.shuffle=primary-worker.

Ejemplo: Crea un clúster con Shuffle del trabajador principal para Spark:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Ejemplo de Apache Spark

  1. Ejecutar un trabajo de WordCount con un texto público de Shakespeare mediante el jar de ejemplos de Spark en el clúster de EFM
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Configura SSD locales para Shuffle de trabajadores principales

Las implementaciones de combinaciones aleatorias de trabajadores principales-y HDFS escriben datos aleatorios intermedios en discos conectados a VM y se benefician de la capacidad de procesamiento adicional y las IOPS que ofrecen las SSD locales. Para facilitar la asignación de recursos, selecciona un objetivo de alrededor de 1 partición del SSD local por 4 CPU virtuales cuando configures las máquinas de trabajador principales.

Para adjuntar las SSD locales, pasa la marca --num-worker-local-ssds al comando gcloud dataproc clusters create.

Proporción de trabajadores secundarios

Dado que los trabajadores secundarios escriben sus datos aleatorios en los trabajadores principales, el clúster debe contener una cantidad suficiente de trabajadores principales con recursos de CPU, memoria y disco suficientes para adaptarse a la carga aleatoria de tu trabajo. Para evitar que el grupo principal escale y genere un comportamiento no deseado, configura minInstances en el valor maxInstances en la política de ajuste de escala automático del grupo de trabajadores principales, a fin de evitar el ajuste de escala automático de los clústeres.

Si tienes una proporción alta de trabajadores principal a secundaria (por ejemplo, 10:1), supervisa el uso de CPU, red y disco de los trabajadores principales para determinar si están sobrecargados. Para ello, siga estos pasos:

  1. Ve a la página Instancias de VM en la consola de Google Cloud.

  2. Haz clic en la casilla de verificación que se encuentra al lado izquierdo del trabajador principal.

  3. Haz clic en la pestaña MONITORING para ver el uso de CPU del trabajador principal, IOPS de disco, bytes de red y otras métricas.

Si los trabajadores principales están sobrecargados, considera escalar verticalmente los trabajadores principales de forma manual.

Cambia el tamaño del grupo de trabajadores principales

El grupo de trabajadores principales se puede escalar verticalmente de forma segura, pero reducir el grupo de trabajadores principales puede afectar el progreso del trabajo de manera negativa. Las operaciones que reducen la escala del grupo de trabajadores principales deben usar el retiro de servicio ordenado, que se habilita mediante la configuración de la marca --graceful-decommission-timeout.

Clústeres con ajuste de escala automático: El escalamiento del grupo de trabajadores principales está inhabilitado en los clústeres con EFM con políticas de ajuste de escala automático. Para cambiar el tamaño del grupo de trabajadores principal en un clúster con ajuste de escala automático, haz lo siguiente:

  1. Inhabilita el ajuste de escala automático.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Escala el grupo principal.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Vuelve a habilitar el ajuste de escala automático:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Supervisa el uso de disco de trabajadores principales

Los trabajadores principales deben tener suficiente espacio en el disco para los datos aleatorios del clúster. Puedes supervisar esto de manera indirecta a través de la métrica remaining HDFS capacity. A medida que se llena el disco local, el espacio deja de estar disponible para HDFS y la capacidad restante disminuye.

De forma predeterminada, cuando el uso del disco local de un trabajador principal supera el 90% de la capacidad, el nodo se marcará como EN MAL ESTADO en la IU del nodo YARN. Si tienes problemas de capacidad de disco, puedes borrar los datos que no se usen de HDFS o escalar verticalmente el grupo de trabajadores principales.

Ten en cuenta que, por lo general, los datos aleatorios intermedios no se limpian hasta el final de un trabajo. Cuando se usa la redistribución del trabajador principal con Spark, esto puede demorar hasta 30 minutos después de que se completa un trabajo.

Configuración avanzada

Partición y paralelismo

Cuando envíes un trabajo de MapReduce o Spark, configura un nivel de partición adecuado. Decidir la cantidad de particiones de entrada y salida para una etapa de redistribución implica una compensación entre las diferentes características de rendimiento. Es mejor experimentar con valores que funcionen para tus formas de trabajo.

Particiones de entrada

La partición de entrada de MapReduce y Spark está determinada por el conjunto de datos de entrada. Cuando se leen archivos desde Cloud Storage, cada tarea procesa aproximadamente un valor de “tamaño de bloque” de datos.

  • En los trabajos de Spark SQL, spark.sql.files.maxPartitionBytes controla el tamaño máximo de la partición. Considera aumentarlo a 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • En los trabajos de MapReduce y los RDD de Spark, el tamaño de la partición se suele controlar con fs.gs.block.size, que se configura de forma predeterminada en 128 MB. Considera aumentarlo a 1 GB. También puedes establecer propiedades específicas de InputFormat, como mapreduce.input.fileinputformat.split.minsize y mapreduce.input.fileinputformat.split.maxsize.

    • Para trabajos de MapReduce, usa este comando: --properties fs.gs.block.size=1073741824
    • Para los RDD de Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Particiones de salida

Varias propiedades controlan la cantidad de tareas en etapas posteriores. En trabajos más grandes que procesan más de 1 TB, considera tener al menos 1 GB por partición.

  • Para los trabajos de MapReduce, mapreduce.job.reduces controla la cantidad de particiones de salida.

  • En Spark SQL, la cantidad de particiones de salida está controlada por spark.sql.shuffle.partitions.

  • Para los trabajos de Spark con la API de RDD, puedes especificar la cantidad de particiones de salida o configurar spark.default.parallelism.

Ajuste de Shuffle para trabajadores principales

La propiedad más significativa es --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Ten en cuenta que esta es una propiedad YARN a nivel de clúster porque el servidor de redistribución de Spark se ejecuta como parte de Node Manager. El valor predeterminado es el doble (2x) de núcleos en la máquina (por ejemplo, 16 subprocesos en un n1-highmem-8). Si “Tiempo de lectura bloqueado de Shuffle” es mayor que 1 segundo y los trabajadores principales no alcanzaron los límites de red, CPU o disco, considera aumentar la cantidad de subprocesos del servidor de redistribución.

En tipos de máquinas más grandes, considera aumentar spark.shuffle.io.numConnectionsPerPeer, que se establece de forma predeterminada en 1. (Por ejemplo, configúralo en 5 conexiones por par de hosts).

Aumenta los reintentos

La cantidad máxima de intentos permitidos para las instancias principales, las tareas y las etapas se puede establecer mediante la configuración de las siguientes propiedades:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Dado que las instancias principales y las tareas de la app se finalizan con más frecuencia en clústeres que usan muchas VM interrumpibles o ajuste de escala automático sin un retiro de servicio ordenado, aumentar los valores de las propiedades anteriores en esos clústeres puede ser útil (ten en cuenta que no se admite el uso de EFM con Spark y el retiro de servicio ordenado).

Retiro de servicio ordenado de YARN en clústeres de EFM

El retiro de servicio ordenado de YARN se puede usar para quitar nodos rápidamente con un impacto mínimo en las aplicaciones en ejecución. Para los clústeres con ajuste de escala automático, se puede configurar el tiempo de espera del retiro de servicio ordenado en una AutoscalingPolicy que esté conectada al clúster de EFM.

Mejoras de EFM para el retiro de servicio ordenado

  1. Debido a que los datos intermedios se almacenan en un sistema de archivos distribuido, los nodos se pueden quitar de un clúster de EFM apenas finalicen todos los contenedores que se ejecutan en esos nodos. En comparación, los nodos no se quitan en los clústeres estándar de Dataproc hasta que finaliza la aplicación.

  2. La eliminación de nodos no espera a que finalicen las instancias principales de las apps que se ejecutan en un nodo. Cuando se finaliza el contenedor de la instancia principal de la app, se reprograma en otro nodo que no se retira de servicio. El progreso del trabajo no se pierde: la nueva instancia principal de la app recupera rápidamente el estado de la instancia principal anterior mediante la lectura del historial de trabajos.