En las siguientes secciones se ofrecen consejos para ayudarte a optimizar tus aplicaciones de Spark en Dataproc.
Usar clústeres efímeros
Cuando usas el modelo de clúster efímero de Dataproc, creas un clúster dedicado para cada trabajo y, cuando el trabajo finaliza, eliminas el clúster. Con el modelo efímero, puedes tratar el almacenamiento y la computación por separado, guardar los datos de entrada y salida de las tareas en Cloud Storage o BigQuery, y usar el clúster solo para la computación y el almacenamiento de datos temporales.
Errores comunes en clústeres persistentes
El uso de clústeres efímeros de un solo trabajo evita los siguientes problemas y posibles problemas asociados al uso de clústeres "persistentes" compartidos y de larga duración:
- Puntos únicos de fallo: un estado de error de clúster compartido puede provocar que fallen todos los trabajos, lo que bloquea toda una canalización de datos. Investigar y recuperarse de un error puede llevar horas. Como los clústeres efímeros solo conservan los estados temporales en el clúster, cuando se produce un error, se pueden eliminar y volver a crear rápidamente.
- Dificultad para mantener y migrar los estados de los clústeres en HDFS, MySQL o sistemas de archivos locales
- Conflictos de recursos entre trabajos que afectan negativamente a los SLOs
- Los daemons de servicio que no responden debido a la presión de la memoria
- Acumulación de registros y archivos temporales que pueden superar la capacidad del disco
- No se puede aumentar la escala debido a que no hay stock en la zona del clúster
- No se admiten versiones de imagen de clúster obsoletas.
Ventajas de los clústeres efímeros
Por otro lado, los clústeres efímeros te permiten hacer lo siguiente:
- Configurar diferentes permisos de gestión de identidades y accesos para diferentes tareas con diferentes cuentas de servicio de VM de Dataproc.
- Optimizar las configuraciones de hardware y software de un clúster para cada trabajo y cambiar las configuraciones del clúster según sea necesario.
- Actualiza las versiones de imagen en los clústeres nuevos para obtener los parches de seguridad, las correcciones de errores y las optimizaciones más recientes.
- Soluciona problemas más rápido en un clúster aislado de un solo trabajo.
- Ahorra costes pagando solo por el tiempo de ejecución del clúster efímero, no por el tiempo de inactividad entre tareas en un clúster compartido.
Usar Spark SQL
La API DataFrame de Spark SQL es una optimización significativa de la API RDD. Si interactúas con código que usa RDDs, considera la posibilidad de leer los datos como un DataFrame antes de pasar un RDD en el código. En el código Java o Scala, te recomendamos que uses la API Dataset de Spark SQL como superconjunto de RDDs y DataFrames.
Usar Apache Spark 3
Dataproc 2.0 instala Spark 3, que incluye las siguientes funciones y mejoras de rendimiento:
- Compatibilidad con GPU
- Posibilidad de leer archivos binarios
- Mejoras de rendimiento
- Eliminación de particiones dinámica
- Ejecución de consultas adaptativa: optimiza los trabajos de Spark en tiempo real.
Usar la asignación dinámica
Apache Spark incluye una función de asignación dinámica que escala el número de ejecutores de Spark en los trabajadores de un clúster. Esta función permite que un trabajo use todo el clúster de Dataproc incluso cuando el clúster se amplía. Esta función está habilitada de forma predeterminada en Dataproc (spark.dynamicAllocation.enabled
tiene el valor true
). Consulta más información sobre la asignación dinámica de Spark.
Usar el autoescalado de Dataproc
El autoescalado de Dataproc añade y quita trabajadores de Dataproc de un clúster de forma dinámica para asegurarse de que las tareas de Spark tengan los recursos necesarios para completarse rápidamente.
Es una práctica recomendada configurar la política de autoescalado para que solo escale los trabajadores secundarios.
Usar el modo de flexibilidad mejorado de Dataproc
Los clústeres con VMs de instancia de máquina virtual de uso continuado o una política de escalado automático pueden recibir excepciones FetchFailed cuando los trabajadores se interrumpen o se eliminan antes de que terminen de servir datos de aleatorización a los reductores. Esta excepción puede provocar reintentos de tareas y tiempos de finalización de trabajos más largos.
Recomendación: Usa el modo de flexibilidad mejorado de Dataproc, que no almacena datos de orden aleatorio intermedios en los trabajadores secundarios, de modo que estos se puedan desalojar o reducir de forma segura.
Configurar particiones y aleatorización
Spark almacena datos en particiones temporales del clúster. Si tu aplicación agrupa o combina DataFrames, baraja los datos en particiones nuevas según la agrupación y la configuración de bajo nivel.
El particionado de datos influye significativamente en el rendimiento de las aplicaciones: si hay pocas particiones, se limita el paralelismo de los trabajos y el uso de los recursos del clúster; si hay demasiadas particiones, el trabajo se ralentiza debido al procesamiento y la redistribución de particiones adicionales.
Configurar particiones
Las siguientes propiedades rigen el número y el tamaño de las particiones:
spark.sql.files.maxPartitionBytes
: tamaño máximo de las particiones al leer datos de Cloud Storage. El valor predeterminado es 128 MB, que es lo suficientemente grande para la mayoría de las aplicaciones que procesan menos de 100 TB.spark.sql.shuffle.partitions
: número de particiones después de realizar una mezcla. El valor predeterminado es1000
para los clústeres de versiones de imagen2.2
y posteriores. Recomendación: Asigna a este valor el triple del número de vCPUs de tu clúster.spark.default.parallelism
: número de particiones devueltas después de realizar transformaciones de RDD que requieren mezclas, comojoin
,reduceByKey
yparallelize
. El valor predeterminado es el número total de vCPUs de tu clúster. Cuando se usan RDDs en tareas de Spark, puedes definir este número como 3 veces el número de tus vCPUs.
Limitar el número de archivos
El rendimiento se reduce cuando Spark lee un gran número de archivos pequeños. Almacena datos en archivos de mayor tamaño, por ejemplo, de entre 256 y 512 MB. Del mismo modo, limita el número de archivos de salida (para forzar una aleatorización, consulta Evitar aleatorizaciones innecesarias).
Configurar la ejecución adaptativa de consultas (Spark 3)
La ejecución de consultas adaptativa (habilitada de forma predeterminada en la versión 2.0 de la imagen de Dataproc) mejora el rendimiento de los trabajos de Spark, entre otras cosas:
- Fusión de particiones después de las mezclas
- Convertir combinaciones de ordenación y fusión en combinaciones de difusión
- Optimizaciones de las combinaciones con sesgo.
Aunque los ajustes de configuración predeterminados son adecuados para la mayoría de los casos prácticos, puede ser útil definir spark.sql.adaptive.advisoryPartitionSizeInBytes
en spark.sqlfiles.maxPartitionBytes
(el valor predeterminado es 128 MB).
Evitar mezclas innecesarias
Spark permite a los usuarios activar manualmente una aleatorización para reequilibrar sus datos con la función repartition
. Las mezclas son caras, por lo que se deben usar con precaución. Si se definen las configuraciones de partición de forma adecuada, Spark debería poder particionar los datos automáticamente.
Excepción: Cuando se escriben datos particionados por columnas en Cloud Storage, la repartición en una columna específica evita escribir muchos archivos pequeños para conseguir tiempos de escritura más rápidos.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Almacenar datos en Parquet o Avro
De forma predeterminada, Spark SQL lee y escribe datos en archivos Parquet comprimidos con Snappy. Parquet es un formato de archivo columnar eficiente que permite que Spark solo lea los datos que necesita para ejecutar una aplicación. Esta es una ventaja importante cuando se trabaja con conjuntos de datos grandes. Otros formatos de columnas, como Apache ORC, también ofrecen un buen rendimiento.
En el caso de los datos no tabulares, Apache Avro proporciona un formato de archivo de filas binario eficiente. Aunque suele ser más lento que Parquet, el rendimiento de Avro es mejor que el de los formatos basados en texto,como CSV o JSON.
Optimizar el tamaño del disco
El rendimiento de los discos persistentes se escala según el tamaño del disco, lo que puede afectar al rendimiento de los trabajos de Spark, ya que estos escriben metadatos y barajan datos en el disco. Cuando se usan discos persistentes estándar, el tamaño del disco debe ser de al menos 1 terabyte por trabajador (consulta Rendimiento por tamaño del disco persistente).
Para monitorizar el rendimiento del disco de los trabajadores en la Google Cloud consola, sigue estos pasos:
- Haz clic en el nombre del clúster en la página Clústeres.
- Haz clic en la pestaña INSTANCIAS DE VM.
- Haz clic en el nombre de cualquier trabajador.
- Haga clic en la pestaña MONITORING (MONITORIZACIÓN) y, a continuación, desplácese hacia abajo hasta Disk Throughput (Rendimiento del disco) para ver el rendimiento de los trabajadores.
Consideraciones sobre los discos
Los clústeres de Dataproc efímeros, que no se benefician del almacenamiento persistente, pueden usar SSD locales. Las SSDs locales están conectadas físicamente al clúster y proporcionan un mayor rendimiento que los discos persistentes (consulta la tabla de rendimiento). Los SSD locales están disponibles con un tamaño fijo de 375 gigabytes, pero puedes añadir varios SSD para aumentar el rendimiento.
Los SSD locales no conservan los datos después de que se cierre un clúster. Si necesitas almacenamiento persistente, puedes usar discos persistentes SSD, que ofrecen un mayor rendimiento por su tamaño que los discos persistentes estándar. Los discos persistentes SSD también son una buena opción si el tamaño de la partición es inferior a 8 KB (sin embargo, evita las particiones pequeñas).
Conectar GPUs a un clúster
Spark 3 admite GPUs. Usa GPUs con la acción de inicialización de RAPIDS para acelerar los trabajos de Spark con el acelerador de SQL de RAPIDS. La acción de inicialización del controlador de GPU para configurar un clúster con GPUs.
Fallos habituales en los trabajos y cómo solucionarlos
Sin memoria
Ejemplos:
- "Lost executor" ("Ejecutor perdido")
- "java.lang.OutOfMemoryError: se ha superado el límite de sobrecarga de GC"
- "YARN ha eliminado el contenedor por superar los límites de memoria"
Posibles soluciones:
- Si usas PySpark, aumenta
spark.executor.memoryOverhead
y reducespark.executor.memory
. - Usa tipos de máquinas con alta capacidad de memoria.
- Usa particiones más pequeñas.
Shuffle Fetch Failures
Ejemplos:
- "FetchFailedException" (error de Spark)
- "No se ha podido conectar con..." (Error de Spark)
- "Failed to fetch" (Error de MapReduce)
Suele deberse a la eliminación prematura de trabajadores que aún tienen datos de aleatorización que servir.
Posibles causas y soluciones:
- El autoescalador ha reclamado las VMs de trabajador interrumpibles o ha eliminado las VMs de trabajador no interrumpibles. Solución: usa el modo de flexibilidad mejorado para que los trabajadores secundarios se puedan interrumpir o escalar de forma segura.
- El ejecutor o el mapper se han bloqueado debido a un error OutOfMemory. Solución: Aumenta la memoria del ejecutor o del mapper.
- El servicio de aleatorización de Spark puede estar sobrecargado. Solución: Reduce el número de particiones de la tarea.
Los nodos de YARN están en mal estado
Ejemplos (de registros de YARN):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
A menudo, se relaciona con la falta de espacio en el disco para los datos aleatorios. Diagnosticar problemas viendo los archivos de registro:
- Abre la página Clusters (Clústeres) de tu proyecto en la consola de Google Cloud y, a continuación, haz clic en el nombre del clúster.
- Haz clic en VER REGISTROS.
- Filtrar registros por
hadoop-yarn-nodemanager
. - Busca "UNHEALTHY".
Posibles soluciones:
- La caché de usuario se almacena en el directorio especificado por la propiedad
yarn.nodemanager.local-dirs
enyarn-site.xml file
. Este archivo se encuentra en/etc/hadoop/conf/yarn-site.xml
. Puedes comprobar el espacio libre en la ruta/hadoop/yarn/nm-local-dir
y liberar espacio eliminando la carpeta de caché de usuario/hadoop/yarn/nm-local-dir/usercache
. - Si el registro indica el estado "UNHEALTHY", vuelve a crear el clúster con más espacio en disco, lo que aumentará el límite de rendimiento.
El trabajo falla porque no hay suficiente memoria del controlador
Cuando se ejecutan trabajos en modo de clúster, el trabajo falla si el tamaño de la memoria del nodo de trabajador es inferior al tamaño de la memoria del trabajo.
Ejemplo de los registros del controlador:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Posibles soluciones:
- Asigna un valor de
spark:spark.driver.memory
inferior ayarn:yarn.scheduler.maximum-allocation-mb
. - Usa el mismo tipo de máquina para los nodos maestros y de trabajador.
Siguientes pasos
- Consulta más información sobre la optimización del rendimiento de Spark.