Sugerencias de ajuste para trabajos de Spark

En las siguientes secciones, encontrarás sugerencias para ajustar tu Dataproc las aplicaciones Spark.

Usar clústeres efímeros

Cuando usas el entorno “efímero” de Dataproc de clúster, creas un clúster dedicado para cada trabajo y, cuando este finalice, borrarás el clúster. Con el modelo efímero, se puede tratar el almacenamiento y el procesamiento por separado guardar datos de entrada y salida de los trabajos en Cloud Storage o BigQuery, con el clúster solo para procesamiento y almacenamiento de datos temporales.

Errores comunes en los clústeres persistentes

Usar clústeres efímeros de un trabajo evita los siguientes errores y posibles problemas problemas asociados con el uso de aplicaciones “persistentes” clústeres:

  • Puntos únicos de fallo: un estado de error de clúster compartido puede provocar que todos los trabajos fallen. bloquear toda una canalización de datos. Investiga y recupérate de un error puede tardar horas. Dado que los clústeres efímeros mantienen solo los estados temporales dentro del clúster, cuando se produce un error, se pueden borrar y recrear rápidamente.
  • Dificultad para mantener y migrar estados de clústeres en HDFS, MySQL o sistemas de archivos locales
  • Contenciones de recursos entre trabajos que afectan negativamente los SLO
  • Daemons de servicio que no responden debido a la presión de la memoria
  • Acumulación de registros y archivos temporales que pueden exceder la capacidad del disco
  • Falla de aumento de la escala debido al agotamiento de la zona del clúster
  • Falta de compatibilidad para versiones desactualizadas de imágenes de clústeres.

Beneficios del cúmulo efímero

Como aspecto positivo, los clústeres efímeros te permiten hacer lo siguiente:

  • Configurar diferentes permisos de IAM para distintos trabajos con diferentes Cuentas de servicio de la VM de Dataproc.
  • Optimizar la configuración de hardware y software de un clúster para cada y cambiar los parámetros de configuración del clúster según sea necesario.
  • Actualiza las versiones de imágenes en clústeres nuevos para obtener la seguridad más reciente parches, corrección de errores y optimizaciones.
  • Soluciona problemas más rápido en un clúster aislado de un solo trabajo.
  • Ahorra costos pagando solo por el tiempo de ejecución de los clústeres efímeros, no para el tiempo de inactividad entre trabajos en un clúster compartido.

Usa Spark SQL

La API de DataFrame de Spark SQL es una optimización significativa de la API de RDD. Si interactúas con un código que usa RDD, considera leer datos como un DataFrame antes de pasar un RDD en el código. En código Java o Scala, considera usar la API de Dataset de Spark SQL como un superconjunto de RDD y DataFrames.

Usa Apache Spark 3

Dataproc 2.0 instala Spark 3, que incluye las siguientes funciones y mejoras de rendimiento:

  • Asistencia de GPU
  • Habilidad de leer archivos binarios
  • Mejoras en el rendimiento
  • Reducción dinámica de particiones
  • Ejecución de consultas adaptables, que optimiza los trabajos de Spark en tiempo real

Usa la asignación dinámica

Apache Spark incluye una función de asignación dinámica que escala la cantidad de ejecutores de Spark en los trabajadores dentro de un clúster. Esta característica permite que un trabajo use el clúster completo de Dataproc incluso cuando el clúster escala verticalmente. Esta función está habilitada de forma predeterminada en Dataproc (spark.dynamicAllocation.enabled se establece en true). Consulta Asignación dinámica de Spark para obtener más información.

Usa el ajuste de escala automático de Dataproc

El ajuste de escala automático de Dataproc agrega y quita de forma dinámica los trabajadores de Dataproc de un clúster a fin de garantizar que los trabajos de Spark tengan los recursos necesarios para completarse con rapidez.

Se recomienda configurar la política de ajuste de escala automático para escalar solo los trabajadores secundarios.

Usa el modo de flexibilidad mejorada de Dataproc

Los clústeres con VM interrumpibles o una política de ajuste de escala automático pueden recibir excepciones FetchFailed cuando los trabajadores se interrumpen o se quitan antes de que terminen de entregar datos aleatorios a los reductores. Esta excepción puede provocar reintentos de tareas y tiempos de finalización del trabajo más largos.

Recomendación: Usa el Modo de flexibilidad mejorado de Dataproc, que no almacena datos aleatorios intermedios en trabajadores secundarios, para que estos se puedan interrumpir o reducir la escala verticalmente de forma segura.

Configura la partición y las combinaciones aleatorias

Spark almacena datos en particiones temporales en el clúster. Si tu aplicación agrupa o une DataFrames, combina los datos en particiones nuevas según la agrupación y la configuración de bajo nivel.

La partición de datos afecta significativamente el rendimiento de la aplicación: muy pocas particiones limitan el paralelismo de trabajos y el uso de recursos de clúster; demasiadas particiones ralentizan el trabajo debido al procesamiento y la recombinación aleatoria adicional de particiones.

Configura particiones

Las siguientes propiedades rigen la cantidad y el tamaño de las particiones:

  • spark.sql.files.maxPartitionBytes: Es el tamaño máximo de las particiones cuando lees datos de Cloud Storage. El valor predeterminado es 128 MB, que es lo suficientemente grande para la mayoría de las aplicaciones que procesan menos de 100 TB.

  • spark.sql.shuffle.partitions: Es la cantidad de particiones después de realizar una redistribución. El valor predeterminado es 200, lo que es apropiado para clústeres con menos de 100 CPU virtuales en total. Recomendación: Establece esto en 3 veces la cantidad de CPU virtuales en el clúster.

  • spark.default.parallelism: Es la cantidad de particiones que se muestran después de realizar transformaciones de RDD que requieren reproducciones aleatorias, como join, reduceByKey y parallelize. El valor predeterminado es la cantidad total de CPU virtuales en el clúster. Cuando usas RDD en trabajos de Spark, puedes establecer este número en 3 veces tus CPU virtuales.

Limita la cantidad de archivos.

Se produce pérdida de rendimiento cuando Spark lee una gran cantidad de archivos pequeños. Almacena los datos en archivos de mayor tamaño, por ejemplo, tamaños de archivo en el rango de 256 MB a 512 MB. Del mismo modo, limita la cantidad de archivos de salida (para forzar una redistribución, consulta Evita las redistribuciones innecesarias).

Configura la ejecución de consultas adaptables (Spark 3)

La ejecución de consultas adaptables (habilitada de forma predeterminada en la versión 2.0 de la imagen de Dataproc) proporciona mejoras en el rendimiento del trabajo de Spark, que incluyen lo siguiente:

Aunque la configuración predeterminada es segura para la mayoría de los casos de uso, establecer spark.sql.adaptive.advisoryPartitionSizeInBytes en spark.sqlfiles.maxPartitionBytes (predeterminado de 128 MB) puede ser beneficioso.

Evita las combinaciones aleatorias innecesarias

Spark permite a los usuarios activar de forma manual una combinación aleatoria para volver a balancear sus datos con la función repartition. Las combinaciones aleatorias son costosas, por lo que la recombinación aleatoria debe usarse con precaución. Establecer las configuraciones de partición de forma adecuada debería ser suficiente para permitir que Spark particione tus datos de forma automática.

Excepción: Cuando se escriben datos particionados por columnas en Cloud Storage, la repartición en una columna específica evita en muchos archivos pequeños para lograr tiempos de escritura más rápidos.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Almacena datos en Parquet o Avro

De forma predeterminada, Spark SQL lee y escribe datos en archivos Parquet comprimidos de Snappy. Parquet tiene un formato de archivo en columna eficiente que permite a Spark leer solo los datos que necesita para ejecutar una aplicación. Esta es una ventaja importante cuando se trabaja con conjuntos de datos grandes. Otros formatos de columna, como Apache ORC, también tienen un buen rendimiento.

Para los datos no columnas, Apache Avro proporciona un formato de archivo eficiente de fila binaria. Aunque suele ser más lento que Parquet, el rendimiento de Avro es mejor que los formatos basados en texto, como CSV o JSON.

Optimiza el tamaño del disco

La capacidad de procesamiento de los discos persistentes escala con el tamaño del disco, lo que puede afectar el rendimiento del trabajo de Spark porque los trabajos escriben metadatos y se combinan aleatoriamente los datos en el disco. Cuando se usan discos persistentes estándar, el tamaño del disco debe ser de al menos 1 terabyte por trabajador (consulta Rendimiento por tamaño de disco persistente).

Para supervisar la capacidad de procesamiento del disco de los trabajadores en la Consola de Google Cloud:

  1. Haz clic en el nombre del clúster en la página Clústeres.
  2. Haz clic en la pestaña INSTANCIAS DE VM.
  3. Haz clic en cualquier nombre de trabajador.
  4. Haga clic en la pestaña SUPERVISIÓN y, luego, desplácese hacia abajo hasta Capacidad de procesamiento del disco para ver la capacidad de procesamiento del trabajador.

Consideraciones sobre discos

Clústeres efímeros de Dataproc, que no del almacenamiento persistente. pueden usar SSD locales. Las SSD locales están conectadas a nivel físico al clúster y proporcionan una capacidad de procesamiento mayor que los discos persistentes (consulta la tabla Rendimiento). Las SSD locales están disponibles en un tamaño fijo de 375 gigabytes, pero puedes agregar varios SSD para aumentar el rendimiento.

Los SSD locales no conservan los datos después de que se cierra un clúster. Si necesitas discos persistentes de procesamiento, puedes usar discos persistentes SSD, Proporcionan mayor capacidad de procesamiento. que los discos persistentes estándar. Los discos persistentes SSD también son una buena opción si el tamaño de la partición es inferior a 8 KB (sin embargo, evita las pariciones pequeñas).

Adjunta las GPU a tu clúster

Spark 3 admite GPU. Usa GPU con Acción de inicialización de RAPIDS para acelerar los trabajos de Spark con Acelerador de SQL de RAPIDS. El Acción de inicialización del controlador de GPU para configurar un clúster con GPU.

Errores y correcciones de trabajos comunes

No hay memoria suficiente

Ejemplos:

  • “Ejecutor perdido”
  • "java.lang.OutOfMemoryError: Se excedió el límite de sobrecarga de GC"
  • “Contenedor que elimina YARN por exceder los límites de memoria”

Posibles soluciones:

Errores de recuperación de combinaciones aleatorias

Ejemplos:

  • “FetchFailedException” (error de Spark)
  • “No se pudo establecer la conexión con…”. (Error de Spark)
  • “No se pudo recuperar” (error de MapReduce)

Por lo general, se debe a la eliminación prematura de los trabajadores que aún tienen datos aleatorios para entregar.

Causas posibles y correcciones:

  • El escalador automático quitó las VM de trabajadores interrumpibles o no interrumpibles. Solución: Usa el Modo de flexibilidad mejorada para que los trabajadores secundarios sean interrumpibles o escalables de forma segura.
  • El ejecutor o asignador falló debido a un error OutOfMemory. Solución: Aumenta la memoria del ejecutor o el asignador.
  • El servicio de reproducción aleatoria de Spark puede estar sobrecargado. Solución: Disminuye la cantidad de particiones del trabajo.

Los nodos YARN están en MAL ESTADO

Ejemplos (de registros de YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

A menudo, se relaciona con el espacio insuficiente en el disco para los datos aleatorios. Para diagnosticar mediante la visualización de archivos de registro:

  • Abre el archivo Clústeres de la consola de Google Cloud y, luego, haz clic en el nombre del clúster.
  • Haz clic en VER REGISTROS.
  • Filtra los registros por hadoop-yarn-nodemanager.
  • Busca por “MAL ESTADO”.

Correcciones posibles:

  • La caché del usuario se almacena en el directorio especificado por el propiedad yarn.nodemanager.local-dirs en yarn-site.xml file Este archivo se encuentra en /etc/hadoop/conf/yarn-site.xml Puedes consultar el espacio libre en la ruta de acceso /hadoop/yarn/nm-local-dir y liberar espacio Se está borrando la carpeta de caché del usuario /hadoop/yarn/nm-local-dir/usercache.
  • Si el registro informa "UNHEALTHY" recrea tu clúster con más espacio en el disco, lo que aumentar el límite de la capacidad de procesamiento

El trabajo falla debido a que no hay suficiente memoria del controlador

Cuando se ejecutan trabajos en modo de clúster, el trabajo falla si el tamaño de memoria de la instancia principal es significativamente más grande que el tamaño de la memoria del nodo trabajador.

Ejemplo de los registros del controlador:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Correcciones posibles:

  • Establece un valor inferior a yarn:yarn.scheduler.maximum-allocation-mb en spark:spark.driver.memory.
  • Usa el mismo tipo de máquina para los nodos principales y trabajadores.

Más información