Modo de flexibilidad mejorada de Dataproc

Cuando se quita un nodo de Dataproc, el modo de flexibilidad mejorada (EFM) de Dataproc conserva los datos de nodo con estado, como los datos aleatorios de MapReduce, en HDFS.

Como HDFS solo se ejecuta en trabajadores principales, esta función es especialmente adecuada para clústeres que usan VM interrumpibles o que solo realizan ajustes de escala automático al grupo de trabajadores interrumpible.

Limitaciones:

  • Por el momento, el modo de flexibilidad mejorada solo es compatible con los clústeres de imágenes 1.4.
  • Los trabajos de Apache Hadoop YARN que no admiten la reubicación de AppMaster pueden fallar con más frecuencia en el modo de flexibilidad mejorada (consulta Cuándo esperar a que terminen AppMasters).
  • El modo de flexibilidad mejorada no se recomienda en los siguientes casos:
    • un clúster solo tiene trabajadores principales
    • el ajuste de escala automático del trabajador principal está activado, a menos que EFM esté configurado para apuntar a HDFS de otro clúster.
      Nota: Si los trabajos dependen mucho de HDFS, no se recomienda el ajuste de escala automático de HDFS.

Usa el modo de flexibilidad mejorada

Crea un clúster con el modo de flexibilidad mejorado habilitado:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=hcfs \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --image-version=1.4

Ejemplo de MapReduce de Apache Hadoop

  1. Desde el jar de ejemplos de mapreduce en el clúster del modo de flexibilidad mejorada, ejecuta un pequeño trabajo de teragen para generar datos de entrada en Cloud Storage para un trabajo de terasort posterior.
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    
  2. Ejecuta un trabajo de terasort en los datos.
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Ejemplo de Apache Spark

  1. Ejecuta un trabajo de WordCount en texto público de Shakespeare con el jar de ejemplos de Spark en el clúster del modo de flexibilidad mejorada.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Retiro de servicio ordenado de YARN en clústeres del modo de flexibilidad mejorada

retiro de servicio ordenado de YARN se puede usar para quitar nodos rápidamente con un impacto mínimo en las aplicaciones en ejecución. Para los clústeres con ajuste de escala automático, se puede configurar el tiempo de espera del retiro de servicio ordenado en una AutoscalingPolicy que esté conectada al clúster del modo de flexibilidad mejorada.

Mejoras en el modo de flexibilidad mejorada para el retiro de servicio ordenado

  1. Dado que los datos intermedios se almacenan en un sistema de archivos distribuido, los nodos se pueden quitar de un clúster de modo de flexibilidad mejorado tan pronto como todos los contenedores que se ejecutan en esos nodos finalizan. En comparación, los nodos no se quitan en los clústeres estándar de Dataproc hasta que la aplicación finaliza.

  2. La eliminación de nodos no espera a que finalicen las instancias principales de la aplicación. Cuando se elimina el contenedor principal de la app, se reprograma en otro nodo que no se retira del servicio. El progreso del trabajo no se pierde: la nueva instancia principal recupera rápidamente el estado de la instancia principal anterior mediante la lectura del historial de trabajos.

Cómo usar el retiro de servicio ordenado en un clúster de modo de flexibilidad mejorado

Ejemplo:

  1. Crea un clúster en modo de flexibilidad mejorada con la misma cantidad de trabajadores principales y secundarios.
    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.spark.shuffle=hcfs \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --image-version=1.4 \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    
  2. Desde el jar de ejemplo de Mapreduce en el clúster, ejecuta un trabajo de MapReduce que calcula el valor de pi.
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    
  3. Mientras se ejecuta el trabajo, reduzca la escala del clúster mediante un retiro de servicio ordenado.
    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    

Los nodos se quitarán del clúster con rapidez (antes de que el trabajo finalice) y se minimiza la pérdida del progreso del trabajo. Las caídas temporales en el progreso del trabajo pueden ocurrir debido a los siguientes motivos:

  • Conmutación por error de la instancia principal de la app. Si el progreso del trabajo disminuye al 0% y, luego, se incrementa de inmediato al valor previo a la eliminación, es probable que la instancia principal de la app haya finalizado y que una nueva instancia principal recupere su estado. Esto no debería afectar significativamente el progreso del trabajo, ya que la conmutación por error ocurre rápidamente.
  • interrupción de VM Debido a que HDFS solo conserva los resultados completos, no parciales, de la tarea de asignación, las caídas temporales en el progreso del trabajo pueden ocurrir cuando una VM se interrumpe mientras trabaja en una tarea de asignación.

Configuración avanzada

Configura HDFS

A fin de mejorar el rendimiento de las redistribuciones grandes, puedes aumentar la cantidad de subprocesos de entrega para el nombre del nodo y el nodo de datos si aumentas los valores de dfs.namenode.handler.count y dfs.datanode.handler.count cuando creas el clúster. De forma predeterminada, el nombre del nodo y los nodos de datos tienen 10 subprocesos de entrega. A fin de mejorar aún más el rendimiento del nombre del nodo, puedes adjuntar SSD locales al nodo principal si pasas la marca --num-master-local-ssds al comando clusters create. También puedes usar la marca --num-worker-local-ssds para adjuntar SSD locales a los nodos de datos a fin de mejorar el rendimiento de HDFS.

Partición y paralelismo

Cuando envías un trabajo de MapReduce o Spark, es importante configurar un nivel adecuado de partición. La cantidad de particiones de entrada y salida para una etapa aleatoria intercambiando características diferentes de rendimiento. Es mejor experimentar con valores que funcionen para tus formas de trabajo.

Para los trabajos de MapReduce, la cantidad de particiones de salida se controla mediante mapreduce.job.reduces. Para Spark SQL, la cantidad de particiones de salida se controla mediante spark.sql.shuffle.partitions. Para los trabajos de Spark que usan la API de RDD, puedes especificar la cantidad de particiones. La partición de entrada de MapReduce y Spark se establece de forma implícita por el conjunto de datos de entrada.

Aumenta los reintentos

La cantidad máxima de intentos permitidos para las instancias principales, las tareas y las etapas se puede establecer mediante la configuración de las siguientes propiedades:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Debido a que las instancias principales y las tareas de la app se eliminarán con mayor frecuencia en clústeres que usan muchas VM interrumpibles o en los ajuste de escala automático sin retiro de servicio ordenado, se recomienda aumentar los valores anteriores en esos clústeres.

Proporción de trabajadores interrumpibles

Dado que los trabajadores interrumpibles escriben sus datos aleatorios en HDFS, es importante asegurarte de que tu clúster contenga suficientes trabajadores principales para adaptarse a los datos aleatorios de tu trabajo. Para los clústeres de ajuste de escala automático, puedes controlar la fracción de destino de los trabajadores interrumpibles en tu clúster mediante la asignación de pesos en workerConfig y secondaryWorkerConfig en tu AutocalingPolicy.

Otros sistemas de archivos compatibles con Hadoop

De forma predeterminada, los datos aleatorios se escriben en HDFS, pero puedes usar cualquier sistema de archivos compatible con Hadoop (HCFS). Por ejemplo, puedes decidir escribir datos aleatorios en Cloud Storage o en el HDFS de un clúster diferente. Para especificar un sistema de archivos , puedes apuntar fs.defaultFS al sistema de archivos de destino cuando envíes un trabajo a tu clúster.