Sugerencias de ajuste para trabajos de Spark

En las siguientes secciones, se proporcionan sugerencias para ayudarte a ajustar tus aplicaciones Spark de Dataproc.

Usa clústeres efímeros

Cuando usas el modelo de clúster “efímero” de Dataproc, creas un clúster dedicado para cada trabajo y, cuando el trabajo finaliza, borras el clúster. Con el modelo efímero, puedes tratar el almacenamiento y el procesamiento por separado, y guardar los datos de entrada y salida del trabajo en Cloud Storage o BigQuery, y usar el clúster solo para el procesamiento y el almacenamiento de datos temporales.

Errores de los clústeres persistentes

Usar clústeres efímeros de un trabajo evita las siguientes dificultades y los posibles problemas asociados con el uso de clústeres “persistentes” compartidos y de larga duración:

  • Puntos únicos de falla: un estado de error de clúster compartido puede hacer que todos los trabajos fallen, lo que bloquea una canalización de datos completa. Investigar y recuperarse de un error puede llevar horas. Dado que los clústeres efímeros solo conservan estados temporales del clúster, cuando se produce un error, se pueden borrar y volver a crear con rapidez.
  • Dificultad para mantener y migrar estados de clústeres en HDFS, MySQL o sistemas de archivos locales
  • Contenciones de recursos entre trabajos que afectan de forma negativa los SLO
  • Daemones de servicio que no responden debido a la presión de la memoria
  • Acumulación de registros y archivos temporales que pueden exceder la capacidad del disco
  • Falla del escalamiento vertical debido al agotamiento de la zona del clúster
  • Falta de compatibilidad para las versiones de imagen de clúster desactualizadas.

Beneficios del clúster efímero

En el lado positivo, los clústeres efímeros te permiten hacer lo siguiente:

  • Configura diferentes permisos de IAM para trabajos distintos con diferentes cuentas de servicio de VM de Dataproc.
  • Optimizar la configuración de hardware y software de un clúster para cada trabajo y cambiar la configuración del clúster según sea necesario
  • Actualiza las versiones de imágenes en clústeres nuevos para obtener los parches de seguridad, las correcciones de errores y las optimizaciones más recientes.
  • Soluciona problemas con mayor rapidez en un clúster aislado de un solo trabajo.
  • Ahorra costos pagando solo por el tiempo de ejecución del clúster efímero y no por el tiempo de inactividad entre trabajos en un clúster compartido.

Usa Spark SQL

La API de DataFrame de Spark SQL es una optimización significativa de la API de RDD. Si interactúas con un código que usa RDD, considera leer datos como un DataFrame antes de pasar un RDD en el código. En código Java o Scala, considera usar la API de Dataset de Spark SQL como un superconjunto de RDD y DataFrames.

Usa Apache Spark 3

Dataproc 2.0 instala Spark 3, que incluye las siguientes funciones y mejoras de rendimiento:

  • Asistencia de GPU
  • Habilidad de leer archivos binarios
  • Mejoras en el rendimiento
  • Reducción dinámica de particiones
  • Ejecución de consultas adaptables, que optimiza los trabajos de Spark en tiempo real

Usa la asignación dinámica

Apache Spark incluye una función de asignación dinámica que escala la cantidad de ejecutores de Spark en los trabajadores dentro de un clúster. Esta característica permite que un trabajo use el clúster completo de Dataproc incluso cuando el clúster escala verticalmente. Esta función está habilitada de forma predeterminada en Dataproc (spark.dynamicAllocation.enabled se establece en true). Consulta Asignación dinámica de Spark para obtener más información.

Usa el ajuste de escala automático de Dataproc

El ajuste de escala automático de Dataproc agrega y quita de forma dinámica los trabajadores de Dataproc de un clúster a fin de garantizar que los trabajos de Spark tengan los recursos necesarios para completarse con rapidez.

Se recomienda configurar la política de ajuste de escala automático para escalar solo los trabajadores secundarios.

Usa el modo de flexibilidad mejorada de Dataproc

Los clústeres con VM interrumpibles o una política de ajuste de escala automático pueden recibir excepciones FetchFailed cuando los trabajadores se interrumpen o se quitan antes de que terminen de entregar datos aleatorios a los reductores. Esta excepción puede provocar reintentos de tareas y tiempos de finalización del trabajo más largos.

Recomendación: Usa el Modo de flexibilidad mejorado de Dataproc, que no almacena datos aleatorios intermedios en trabajadores secundarios, para que estos se puedan interrumpir o reducir la escala verticalmente de forma segura.

Configura la partición y las combinaciones aleatorias

Spark almacena datos en particiones temporales en el clúster. Si tu aplicación agrupa o une DataFrames, combina los datos en particiones nuevas según la agrupación y la configuración de bajo nivel.

La partición de datos afecta significativamente el rendimiento de la aplicación: muy pocas particiones limitan el paralelismo de trabajos y el uso de recursos de clúster; demasiadas particiones ralentizan el trabajo debido al procesamiento y la recombinación aleatoria adicional de particiones.

Configura particiones

Las siguientes propiedades rigen la cantidad y el tamaño de las particiones:

  • spark.sql.files.maxPartitionBytes: Es el tamaño máximo de las particiones cuando lees datos de Cloud Storage. El valor predeterminado es 128 MB, que es lo suficientemente grande para la mayoría de las aplicaciones que procesan menos de 100 TB.

  • spark.sql.shuffle.partitions: Es la cantidad de particiones después de realizar una redistribución. El valor predeterminado es 200, lo que es apropiado para clústeres con menos de 100 CPU virtuales en total. Recomendación: Establece esto en 3 veces la cantidad de CPU virtuales en el clúster.

  • spark.default.parallelism: Es la cantidad de particiones que se muestran después de realizar transformaciones de RDD que requieren reproducciones aleatorias, como join, reduceByKey y parallelize. El valor predeterminado es la cantidad total de CPU virtuales en el clúster. Cuando usas RDD en trabajos de Spark, puedes establecer este número en 3 veces tus CPU virtuales.

Limita la cantidad de archivos.

Se produce pérdida de rendimiento cuando Spark lee una gran cantidad de archivos pequeños. Almacena los datos en archivos de mayor tamaño, por ejemplo, en el rango de 256 MB a 512 MB. Del mismo modo, limita la cantidad de archivos de salida (para forzar una redistribución, consulta Cómo evitar redistribuciones innecesarias).

Configura la ejecución de consultas adaptables (Spark 3)

La ejecución de consultas adaptables (habilitada de forma predeterminada en la versión 2.0 de la imagen de Dataproc) proporciona mejoras en el rendimiento del trabajo de Spark, que incluyen lo siguiente:

Aunque la configuración predeterminada es segura para la mayoría de los casos de uso, establecer spark.sql.adaptive.advisoryPartitionSizeInBytes en spark.sqlfiles.maxPartitionBytes (predeterminado de 128 MB) puede ser beneficioso.

Evita las combinaciones aleatorias innecesarias

Spark permite a los usuarios activar de forma manual una combinación aleatoria para volver a balancear sus datos con la función repartition. Las combinaciones aleatorias son costosas, por lo que la recombinación aleatoria debe usarse con precaución. Establecer las configuraciones de partición de forma adecuada debería ser suficiente para permitir que Spark particione tus datos de forma automática.

Excepción: Cuando escribes datos particionados en columnas en Cloud Storage, volver a realizar la partición en una columna específica evita escribir muchos archivos pequeños para lograr tiempos de escritura más rápidos.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Almacena datos en Parquet o Avro

De forma predeterminada, Spark SQL lee y escribe datos en archivos Parquet comprimidos de Snappy. Parquet tiene un formato de archivo en columna eficiente que permite a Spark leer solo los datos que necesita para ejecutar una aplicación. Esta es una ventaja importante cuando se trabaja con conjuntos de datos grandes. Otros formatos de columna, como Apache ORC, también tienen un buen rendimiento.

Para los datos no columnas, Apache Avro proporciona un formato de archivo eficiente de fila binaria. Aunque suele ser más lento que Parquet, el rendimiento de Avro es mejor que los formatos basados en texto, como CSV o JSON.

Optimiza el tamaño del disco

La capacidad de procesamiento de los discos persistentes escala con el tamaño del disco, lo que puede afectar el rendimiento del trabajo de Spark porque los trabajos escriben metadatos y se combinan aleatoriamente los datos en el disco. Cuando se usan discos persistentes estándar, el tamaño del disco debe ser de al menos 1 terabyte por trabajador (consulta Rendimiento por tamaño de disco persistente).

Para supervisar la capacidad de procesamiento del disco de los trabajadores en la consola de Google Cloud, sigue estos pasos:

  1. Haz clic en el nombre del clúster en la página Clústeres.
  2. Haz clic en la pestaña INSTANCIAS DE VM.
  3. Haz clic en cualquier nombre de trabajador.
  4. Haga clic en la pestaña SUPERVISIÓN y, luego, desplácese hacia abajo hasta Capacidad de procesamiento del disco para ver la capacidad de procesamiento del trabajador.

Consideraciones de disco

Los clústeres efímeros de Dataproc, que no se benefician del almacenamiento persistente, pueden usar SSD locales. Las SSD locales están conectadas a nivel físico al clúster y proporcionan una capacidad de procesamiento mayor que los discos persistentes (consulta la tabla Rendimiento). Las SSD locales están disponibles en un tamaño fijo de 375 gigabytes, pero puedes agregar varios SSD para aumentar el rendimiento.

Los SSD locales no conservan datos después del cierre de un clúster. Si necesitas almacenamiento persistente, puedes usar discos persistentes SSD, que proporcionan una mayor capacidad de procesamiento según su tamaño que los discos persistentes estándar. Los discos persistentes SSD también son una buena opción si el tamaño de la partición es inferior a 8 KB (sin embargo, evita las pariciones pequeñas).

Adjunta las GPU a tu clúster

Spark 3 admite GPU. Usa las GPU con la acción de inicialización de RAPIDS para acelerar los trabajos de Spark con el acelerador de SQL de RAPIDS. La acción de inicialización del controlador de GPU para configurar un clúster con las GPU.

Errores y correcciones de trabajos comunes

No hay memoria suficiente

Ejemplos:

  • “Ejecutor perdido”
  • "java.lang.OutOfMemoryError: Se excedió el límite de sobrecarga de GC"
  • “Contenedor que elimina YARN por exceder los límites de memoria”

Posibles soluciones:

Errores de recuperación de combinaciones aleatorias

Ejemplos:

  • “FetchFailedException” (error de Spark)
  • “No se pudo establecer la conexión con…”. (Error de Spark)
  • “No se pudo recuperar” (error de MapReduce)

Por lo general, se debe a la eliminación prematura de los trabajadores que aún tienen datos aleatorios para entregar.

Causas posibles y correcciones:

  • El escalador automático quitó las VM de trabajadores interrumpibles o no interrumpibles. Solución: Usa el Modo de flexibilidad mejorada para que los trabajadores secundarios sean interrumpibles o escalables de forma segura.
  • El ejecutor o asignador falló debido a un error OutOfMemory. Solución: Aumenta la memoria del ejecutor o el asignador.
  • El servicio de reproducción aleatoria de Spark puede estar sobrecargado. Solución: Disminuye la cantidad de particiones del trabajo.

Los nodos YARN están en MAL ESTADO

Ejemplos (de registros YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

A menudo, se relaciona con el espacio insuficiente en el disco para los datos aleatorios. Para diagnosticar mediante la visualización de archivos de registro:

  • Abre la página Clústeres de tu proyecto en la consola de Google Cloud y, luego, haz clic en el nombre del clúster.
  • Haz clic en VER REGISTROS.
  • Filtra los registros por hadoop-yarn-nodemanager.
  • Busca por “MAL ESTADO”.

Correcciones posibles:

  • La caché del usuario se almacena en el directorio que especifica la propiedad yarn.nodemanager.local-dirs en yarn-site.xml file. Este archivo se encuentra en /etc/hadoop/conf/yarn-site.xml. Puedes verificar el espacio libre en la ruta de acceso /hadoop/yarn/nm-local-dir y liberar espacio borrando la carpeta de caché del usuario /hadoop/yarn/nm-local-dir/usercache.
  • Si el registro informa el estado “UNHEALTHY”, vuelve a crear el clúster con más espacio en el disco, lo que aumentará el límite de capacidad de procesamiento.

El trabajo falla debido a que la memoria del controlador es insuficiente

Cuando se ejecutan trabajos en modo de clúster, el trabajo falla si el tamaño de la memoria del nodo principal es mucho mayor que el tamaño de la memoria del nodo trabajador.

Ejemplo de registros del controlador:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Correcciones posibles:

  • El valor de spark:spark.driver.memory debe ser inferior a yarn:yarn.scheduler.maximum-allocation-mb.
  • Usa el mismo tipo de máquina para los nodos principales y trabajadores.

Para más información