Modo de flexibilidad mejorada de Dataproc

Cuando se quita un nodo de Dataproc, el modo de flexibilidad mejorada (EFM) de Dataproc conserva los datos de nodo con estado, como los datos aleatorios de MapReduce, en HDFS.

Como HDFS solo se ejecuta en trabajadores principales, esta función es especialmente adecuada para clústeres que usan VM interrumpibles o que solo realizan ajustes de escala automático al grupo de trabajadores interrumpible.

Limitaciones:

  • Por el momento, el modo de flexibilidad mejorada solo es compatible con los clústeres de imágenes 1.4.
  • Los trabajos de Apache Hadoop YARN que no admiten la reubicación de AppMaster pueden fallar con más frecuencia en el modo de flexibilidad mejorada (consulta Cuándo esperar a que terminen AppMasters).
  • El modo de flexibilidad mejorada no se recomienda en los siguientes casos:
    • un clúster solo tiene trabajadores primarios
    • el ajuste de escala automático del trabajador principal está activado, a menos que EFM esté configurado para apuntar a HDFS de otro clúster.
      Nota: Si los trabajos dependen mucho de HDFS, no se recomienda el ajuste de escala automático de HDFS.

Uso del modo de flexibilidad mejorada

Crea un clúster con el modo de flexibilidad mejorada habilitado:

gcloud dataproc clusters create cluster-name \
    --properties dataproc:efm.spark.shuffle=hcfs \
    --properties dataproc:efm.mapreduce.shuffle=hcfs \
    --image-version 1.4 \
    --region cluster-region (e.g., us-central1)

Ejemplo de MapReduce de Apache Hadoop

  1. Desde el jar de ejemplos de mapreduce en el clúster del modo de flexibilidad mejorada, ejecuta un pequeño trabajo de teragen para generar datos de entrada en Cloud Storage para un trabajo de terasort posterior.
    gcloud dataproc jobs submit hadoop \
        --cluster cluster-name \
        --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (e.g., gs://terasort/input)
    
  2. Ejecuta un trabajo de terasort en los datos.
    gcloud dataproc jobs submit hadoop \
        --cluster cluster-name \
        --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Ejemplo de Apache Spark

  1. Ejecuta un trabajo de WordCount en texto público de Shakespeare con el jar de ejemplos de Spark en el clúster del modo de flexibilidad mejorada.
    gcloud dataproc jobs submit spark \
        --cluster cluster-name \
        --jars file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Retiro de servicio ordenado de YARN en clústeres del modo de flexibilidad mejorada

retiro de servicio ordenado de YARN se puede usar para quitar nodos rápidamente con un impacto mínimo en las aplicaciones en ejecución. Para los clústeres con ajuste de escala automático, se puede configurar el tiempo de espera del retiro de servicio ordenado en una AutoscalingPolicy que esté conectada al clúster del modo de flexibilidad mejorada.

Mejoras en el modo de flexibilidad mejorada para el retiro de servicio ordenado

  1. Dado que los datos intermedios se almacenan en un sistema de archivos distribuido, los nodos se pueden quitar de un clúster del modo de flexibilidad mejorada tan pronto como terminan todos los contenedores que se ejecutan en esos nodos. En comparación, los nodos no se quitan en los clústeres estándar de Dataproc hasta que la aplicación termina.

  2. La eliminación de nodos no espera a que finalicen las instancias principales de la aplicación que se ejecutan en un nodo. Cuando se elimina el contenedor principal de la app, se reprograma en otro nodo que no se retira del servicio. El progreso del trabajo no se pierde: la nueva instancia principal de la app recupera rápidamente el estado de la instancia principal de la aplicación anterior mediante la lectura del historial de trabajos.

Cómo usar el retiro de servicio ordenado en un clúster de modo de flexibilidad mejorado

Ejemplo:

  1. Crea un clúster de modo de flexibilidad mejorado con la misma cantidad de trabajadores principales y secundarios.
    gcloud dataproc clusters create cluster-name \
        --properties dataproc:efm.spark.shuffle=hcfs \
        --properties dataproc:efm.mapreduce.shuffle=hcfs \
        --image-version 1.4 \
        --region cluster-region (e.g., us-central1) \
        --num-workers 5 \
        --num-secondary-workers 5
    
  2. Desde el jar de ejemplos de mapreduce en el clúster, ejecuta un trabajo de mapreduce que calcula el valor de pi.
    gcloud dataproc jobs submit hadoop \
        --cluster cluster-name \
        --jar file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    
  3. Mientras se ejecuta el trabajo, reduzca la escala del clúster mediante un retiro de servicio ordenado.
    gcloud dataproc clusters update cluster-name \
        --num-secondary-workers 0 \
        --graceful-decommission-timeout 1h
    

Los nodos se quitarán del clúster rápidamente (antes de que finalice el trabajo) mientras se minimiza la pérdida de progreso del trabajo. Las caídas temporales en el progreso de los trabajos pueden ocurrir debido a los siguientes motivos:

  • Conmutación por error de la instancia principal de la aplicación. Si el progreso del trabajo disminuye al 0% y, luego, se incrementa de inmediato al valor previo a la eliminación, es probable que la instancia principal haya finalizado y que una nueva instancia principal haya recuperado su estado. Esto no debería afectar de manera significativa el progreso del trabajo, ya que la conmutación por error ocurre rápidamente.
  • interrupción de VM Debido a que HDFS solo conserva los resultados completos, no parciales, de la tarea de asignación, las caídas temporales en el progreso del trabajo pueden ocurrir cuando una VM se interrumpe mientras trabaja en una tarea de asignación.

Configuración avanzada

Configura HDFS

A fin de mejorar el rendimiento de las combinaciones grandes, puedes aumentar la cantidad de subprocesos de entrega para el nodo de nombre y el nodo de datos si aumentas los valores de dfs.namenode.handler.count y dfs.datanode.handler.count cuando creas el clúster. De forma predeterminada, los nodos de nombres y de datos tienen 10 subprocesos de entrega. A fin de mejorar aún más el rendimiento del nodo de nombre, puedes conectar SSD locales al nodo principal si pasas la marca --num-master-local-ssds al comando clusters create. También puedes usar la marca --num-worker-local-ssds para adjuntar SSD locales a los nodos de datos a fin de mejorar el rendimiento de HDFS.

Partición y paralelismo

Cuando se envía un trabajo de MapReduce o Spark, es importante configurar un nivel adecuado de partición. La cantidad de particiones de entrada y salida para una etapa aleatoria determinada intercambia características de rendimiento diferentes. Es mejor experimentar con valores que funcionen para tus formas de trabajo.

Para los trabajos de MapReduce, la cantidad de particiones de salida está controlada por mapreduce.job.reduces. Para Spark SQL, la cantidad de particiones de salida se controla mediante spark.sql.shuffle.partitions. Para los trabajos de Spark que usan la API de RDD, puedes especificar la cantidad de particiones. La partición de entrada de MapReduce y Spark se establece de forma implícita por el conjunto de datos de entrada.

Aumento de reintentos

La cantidad máxima de intentos permitidos para las instancias principales, las tareas y las etapas se puede establecer mediante la configuración de las siguientes propiedades:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Debido a que las instancias principales y las tareas de la app se eliminarán con mayor frecuencia en clústeres que usan muchas VM interrumpibles o en los ajuste de escala automático sin retiro de servicio ordenado, se recomienda aumentar los valores anteriores en esos clústeres.

Proporción de trabajadores interrumpibles

Dado que los trabajadores interrumpibles escriben sus datos aleatorios en HDFS, es importante asegurarse de que su clúster contenga suficientes trabajadores primarios para admitir los datos aleatorios de su trabajo. Para los clústeres de ajuste de escala automático, puedes controlar la fracción de destino de los trabajadores interrumpibles en tu clúster mediante la asignación de pesos en workerConfig y secondaryWorkerConfig en tu AutocalingPolicy.

Otros sistemas de archivos compatibles con Hadoop

Según la configuración predeterminada, los datos aleatorios se escriben en HDFS, pero puedes usar cualquier sistema de archivos compatible con Hadoop (HCFS). Por ejemplo, puedes decidir escribir de manera aleatoria en Cloud Storage o en el HDFS de un clúster diferente. Para especificar un sistema de archivos , puedes apuntar fs.defaultFS al sistema de archivos de destino cuando envíes un trabajo a tu clúster.