El modo de flexibilidad mejorada (EFM) de Dataproc administra los datos aleatorios para minimizar los retrasos en el progreso de los trabajos, provocados por la eliminación de nodos de un clúster en ejecución. EFM descarga los datos aleatorios en uno de los dos modos seleccionables por el usuario:
Shuffle de trabajador principal. Los asignadores escriben datos en los trabajadores principales. Los trabajadores extraen datos de esos nodos remotos durante la fase de reducción. Este modo solo está disponible para los trabajos de Spark y se recomienda para ellos.
Shuffle de HCFS (Sistema de archivos compatible con Hadoop). Los asignadores escriben datos en una implementación de HCFS (HDFS de forma predeterminada). Al igual que con el modo de trabajador principal, solo los trabajadores principales participan en las implementaciones de HDFS y HCFS (si Shuffle de HCFS usa el Conector de Cloud Storage, los datos se almacenan fuera del clúster). Este modo puede beneficiar a los trabajos con pequeñas cantidades de datos, pero no se recomienda para trabajos más grandes debido a las limitaciones de escalamiento.
Debido a que ambos modos EFM no almacenan datos aleatorios intermedios en trabajadores secundarios, EFM es adecuado en clústeres que usan VM interrumpibles o solo realizan ajustes la escala automática al grupo de trabajadores secundarios.
- Los trabajos de Apache Hadoop YARN que no admiten la reubicación de AppMaster pueden fallar en el modo de flexibilidad mejorada (consulta Cuándo esperar a que finalicen AppMasters).
- El modo de flexibilidad mejorada no se recomienda en los siguientes casos:
- En un clúster que solo tiene trabajadores principales
- en los trabajos de transmisión, ya que puede tardar hasta 30 minutos después de que se complete el trabajo para limpiar los datos aleatorios intermedios.
- El modo de flexibilidad mejorada no es compatible:
- Cuando el ajuste de escala automático del trabajador principal está habilitado. En la mayoría de los casos, los trabajadores principales seguirán almacenando datos aleatorios que no se migren de manera automática. El escalamiento descendente del grupo de trabajadores principales anula los beneficios de EFM.
- Cuando los trabajos de Spark se ejecutan en un clúster con retiro de servicio ordenado habilitado. El retiro de servicio ordenado y el EFM pueden funcionar de forma cruzada, ya que el mecanismo de retiro de servicio ordenado de YARN mantiene los nodos DECOMMISSIONING hasta que se completen todas las aplicaciones involucradas.
Usa el modo de flexibilidad mejorada
El modo de flexibilidad mejorada se configura por motor de ejecución, y debe configurarse durante la creación del clúster.
La implementación del EFM de Spark se configura con la propiedad de clúster
dataproc:efm.spark.shuffle
. Valores de propiedad válidos:primary-worker
para la combinación aleatoria de trabajadores principales (recomendado)hcfs
para Shuffle basado en HCFS. Este modo está obsoleto y solo está disponible en clústeres que ejecutan la versión de imagen 1.5. No se recomienda para flujos de trabajo nuevos.
La implementación de MapReduce de Hadoop se configura con la propiedad de clúster
dataproc:efm.mapreduce.shuffle
. Valores de propiedad válidos:hcfs
Ejemplo: Crea un clúster con combinación aleatoria de trabajadores principales para Spark y combinación aleatoria de HCFS para MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Ejemplo de Apache Spark
- Ejecuta un trabajo de WordCount con texto público de Shakespeare mediante el jar de ejemplos de Spark en el clúster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Ejemplo de MapReduce de Apache Hadoop
Ejecuta un trabajo de teragen pequeño para generar datos de entrada en Cloud Storage para un trabajo de terasort más tarde mediante el jar de ejemplos de MapReduce en el clúster de EFM.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Ejecuta un trabajo de terasort con los datos
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Configura SSD locales para Shuffle de trabajadores principales
Las implementaciones de combinaciones aleatorias de trabajadores principales-y HDFS escriben datos aleatorios intermedios en discos conectados a VM y se benefician de la capacidad de procesamiento adicional y las IOPS que ofrecen las SSD locales. Para facilitar la asignación de recursos, selecciona un objetivo de alrededor de 1 partición del SSD local por 4 CPU virtuales cuando configures las máquinas de trabajador principales.
Para adjuntar las SSD locales, pasa la marca --num-worker-local-ssds
al comando gcloud dataproc clusters create.
Por lo general, no necesitarás SSD locales en trabajadores secundarios.
Agregar SSD locales a los trabajadores secundarios de un clúster (con la marca --num-secondary-worker-local-ssds
) suele ser de menor importancia porque los trabajadores secundarios no escriben datos de forma aleatoria.
Sin embargo, como los SSD locales mejoran el rendimiento del disco local, puedes decidir agregar
SSD locales a los trabajadores secundarios si esperas
que las tareas estén limitadas por la E/S
debido al uso del disco local: tu tarea usa un disco local significativo para
el espacio en blanco o tus particiones son
demasiado grandes para caber en la memoria y se volcarán en el disco.
Proporción de trabajadores secundarios
Dado que los trabajadores secundarios escriben sus datos aleatorios en los trabajadores principales, el clúster debe contener una cantidad suficiente de trabajadores principales con recursos de CPU, memoria y disco suficientes para adaptarse a la carga aleatoria del trabajo. Para los clústeres con ajuste de escala automático, a fin de evitar que el grupo principal escale y provoque un comportamiento no deseado, configura minInstances
en el valor maxInstances
en la política de ajuste de escala automático para el grupo de trabajadores principal.
Si tienes una proporción alta de trabajadores principal a secundaria (por ejemplo, 10:1), supervisa el uso de CPU, red y disco de los trabajadores principales para determinar si están sobrecargados. Para ello, sigue estos pasos:
Ve a la página Instancias de VM en la consola deGoogle Cloud .
Haz clic en la casilla de verificación que se encuentra al lado izquierdo del trabajador principal.
Haz clic en la pestaña MONITORING para ver el uso de CPU del trabajador principal, IOPS de disco, bytes de red y otras métricas.
Si los trabajadores principales están sobrecargados, considera escalar verticalmente los trabajadores principales de forma manual.
Cambia el tamaño del grupo de trabajadores principales
El grupo de trabajadores principales se puede escalar verticalmente de forma segura, pero reducir el grupo de trabajadores principales puede afectar el progreso del trabajo de manera negativa. Las operaciones que reducen la escala del grupo de trabajadores principal deben usar el retiro de servicio ordenado, que se habilita configurando la marca --graceful-decommission-timeout
.
Clústeres con ajuste de escala automático: El escalamiento del grupo de trabajadores principales está inhabilitado en los clústeres con EFM con políticas de ajuste de escala automático. Para cambiar el tamaño del grupo de trabajadores principal en un clúster con ajuste de escala automático, haz lo siguiente:
Inhabilita el ajuste de escala automático.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Escala el grupo principal.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Vuelve a habilitar el ajuste de escala automático:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Supervisa el uso de disco de trabajadores principales
Los trabajadores principales deben tener suficiente espacio en el disco para los datos aleatorios del clúster.
Puedes supervisar esto de manera indirecta a través de la métrica remaining HDFS capacity
.
A medida que se llena el disco local, el espacio deja de estar disponible para HDFS y la capacidad restante disminuye.
De forma predeterminada, cuando el uso del disco local de un trabajador principal supera el 90% de la capacidad, el nodo se marcará como EN MAL ESTADO en la IU del nodo YARN. Si tienes problemas de capacidad de disco, puedes borrar los datos que no se usen de HDFS o escalar verticalmente el grupo de trabajadores principales.
Configuración avanzada
Partición y paralelismo
Cuando envíes un trabajo de MapReduce o Spark, configura un nivel de partición adecuado. Decidir la cantidad de particiones de entrada y salida para una etapa de redistribución implica una compensación entre las diferentes características de rendimiento. Es mejor experimentar con valores que funcionen para tus formas de trabajo.
Particiones de entrada
La partición de entrada de MapReduce y Spark está determinada por el conjunto de datos de entrada. Cuando se leen archivos desde Cloud Storage, cada tarea procesa aproximadamente un valor de “tamaño de bloque” de datos.
En el caso de los trabajos de Spark SQL,
spark.sql.files.maxPartitionBytes
controla el tamaño máximo de la partición. Considera aumentarlo a 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.En los trabajos de MapReduce y los RDD de Spark, el tamaño de la partición se suele controlar con
fs.gs.block.size
, que se configura de forma predeterminada en 128 MB. Considera aumentarlo a 1 GB. También puedes establecer propiedades específicas deInputFormat
, comomapreduce.input.fileinputformat.split.minsize
ymapreduce.input.fileinputformat.split.maxsize
.- Para trabajos de MapReduce, usa este comando:
--properties fs.gs.block.size=1073741824
- Para los RDD de Spark:
--properties spark.hadoop.fs.gs.block.size=1073741824
- Para trabajos de MapReduce, usa este comando:
Particiones de salida
Varias propiedades controlan la cantidad de tareas en etapas posteriores. En trabajos más grandes que procesan más de 1 TB, considera tener al menos 1 GB por partición.
En el caso de los trabajos de MapReduce,
mapreduce.job.reduces
controla la cantidad de particiones de salida.En Spark SQL, la cantidad de particiones de salida se controla con
spark.sql.shuffle.partitions
.Para los trabajos de Spark con la API de RDD, puedes especificar la cantidad de particiones de salida o configurar
spark.default.parallelism
.
Ajuste de Shuffle para trabajadores principales
La propiedad más significativa es --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Ten en cuenta que esta es una propiedad YARN a nivel de clúster porque el servidor de redistribución de Spark se ejecuta como parte de Node Manager. El valor predeterminado es el doble (2x) de núcleos en la máquina (por ejemplo, 16 subprocesos en un n1-highmem-8). Si “Tiempo de lectura bloqueado de Shuffle” es mayor que 1 segundo y los trabajadores principales no alcanzaron los límites de red, CPU o disco, considera aumentar la cantidad de subprocesos del servidor de redistribución.
En tipos de máquinas más grandes, considera aumentar spark.shuffle.io.numConnectionsPerPeer
, que se establece de forma predeterminada en 1. (Por ejemplo, configúralo en 5 conexiones por par de hosts).
Aumenta los reintentos
La cantidad máxima de intentos permitidos para las instancias principales, las tareas y las etapas se puede establecer mediante la configuración de las siguientes propiedades:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Debido a que las instancias principales y las tareas de la aplicaciones terminan con mayor frecuencia en clústeres que usan muchas VM interrumpibles o en los ajuste de escala automático sin retiro de servicio ordenado, aumentar los valores de las propiedades anteriores en esos clústeres puede ser útil (ten en cuenta que no se admite el uso de EFM con Spark y el retiro de servicio ordenado).
Configura HDFS para la suffle HCFS
Para mejorar el rendimiento de los shuffles grandes, puedes disminuir la contención de bloqueo en el NameNode configurando dfs.namenode.fslock.fair=false
. Ten en cuenta que esto corre el riesgo de quitar las solicitudes individuales, pero puede mejorar la capacidad de procesamiento de todo el clúster.
Para mejorar aún más el rendimiento de NameNode, puedes configurar --num-master-local-ssds
para conectar SSD locales al
nodo principal. También puedes agregar SSD locales a los trabajadores principales para mejorar el rendimiento de DataNode configurando --num-worker-local-ssds
.
Otros sistemas de archivos compatibles con Hadoop para el Shuffle HCFS
De forma predeterminada, los datos de Shuffle de HCFS de EFM se escriben en HDFS, pero puedes usar cualquier sistema de archivos compatible con Hadoop (HCFS).
Por ejemplo, puedes decidir escribir Shuffle en Cloud Storage o en el HDFS de un clúster diferente. Para especificar un sistema de archivos, puedes apuntar fs.defaultFS
al sistema de archivos de destino cuando envíes un trabajo a tu clúster.
Retiro de servicio ordenado de YARN en clústeres de EFM
El retiro de servicio ordenado de YARN se puede usar para quitar nodos rápidamente con un impacto mínimo en las aplicaciones en ejecución. Para los clústeres con ajuste de escala automático, se puede configurar el tiempo de espera del retiro de servicio ordenado en una AutoscalingPolicy que esté conectada al clúster de EFM.
Mejoras en el EFM de MapReduce para un retiro de servicio ordenado
Debido a que los datos intermedios se almacenan en un sistema de archivos distribuido, los nodos se pueden quitar de un clúster de EFM apenas finalicen todos los contenedores que se ejecutan en esos nodos. En comparación, los nodos no se quitan en los clústeres estándar de Dataproc hasta que finaliza la aplicación.
La eliminación de nodos no espera a que finalicen las instancias principales de las apps que se ejecutan en un nodo. Cuando se finaliza el contenedor de la instancia principal de la app, se reprograma en otro nodo que no se retira de servicio. No se pierde el progreso del trabajo: la nueva instancia principal de la app recupera rápidamente el estado de la instancia principal anterior leyendo el historial de trabajos.
Usa retiro de servicio ordenado en un clúster de EFM con MapReduce
Crea un clúster de EFM con una cantidad similar de trabajadores primarios y secundarios.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Ejecuta un trabajo de MapReduce que calcule el valor de pi con ejemplos jar de mapreduce en el clúster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
Mientras se ejecuta el trabajo, reduzca la escala del clúster mediante un retiro de servicio ordenado.
Los nodos se quitarán del clúster con rapidez antes de que finalice el trabajo, a la vez que se minimiza la pérdida del progreso del trabajo. Se pueden producir pausas temporales en el progreso del trabajo, debido a los siguientes motivos:gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- Conmutación por error de la instancia principal de la app. Si el progreso del trabajo disminuye al 0% y, luego, llega de inmediato al valor previo al descarte, es posible que la instancia principal de la app finalice y que una nueva recupere el estado. Esto no debería afectar significativamente el progreso del trabajo, puesto que la conmutación por error se produce con rapidez.
- interrupción de VM Debido a que HDFS solo conserva los resultados completos, no parciales, de la tarea de asignación, las caídas temporales en el progreso del trabajo pueden ocurrir cuando una VM se interrumpe mientras trabaja en una tarea de asignación.
Para acelerar la eliminación de nodos, puedes reducir la escala del clúster sin un retiro de servicio ordenado si omites la marca --graceful-decommission-timeout
en el ejemplo de comando gcloud
anterior. Se conservará el progreso del trabajo de las tareas de asignación que se hayan completado, pero se perderá el resultado de la tarea de asignación parcialmente completada (se volverán a ejecutar las tareas de asignación).