Se usó la API de Cloud Translation para traducir esta página.
Switch to English

Sugerencias de ajuste para trabajos de Spark

Usar Spark SQL

La API de DataFrame de Spark SQL es una optimización significativa de la API de RDD. Si interactúas con código que usa RDD, considera leer los datos como un DataFrame antes de pasar un RDD en el código. En código Java o Scala, considera usar la API de conjunto de datos de Spark SQL como un superconjunto de RDD y DataFrames.

Usa Apache Spark 3

Dataproc 2.0 instala Spark 3, que incluye las siguientes funciones y mejoras de rendimiento:

  • Asistencia de GPU
  • Permite leer archivos binarios.
  • Mejoras de rendimiento
  • Reducción en particiones dinámicas
  • Ejecución de consultas adaptable, que optimiza los trabajos de Spark en tiempo real

Use la asignación dinámica

Apache Spark incluye una función de asignación dinámica que escala la cantidad de ejecutores de Spark en un clúster. Esta característica permite que un trabajo use el clúster completo de Dataproc, incluso cuando se escala verticalmente. Esta función está habilitada de forma predeterminada en Dataproc (spark.dynamicAllocation.enabled se configura como true). Consulta Asignación dinámica de Spark para obtener más información.

Usar el ajuste de escala automático de Dataproc

El ajuste de escala automático de Dataproc agrega y quita de forma dinámica a los trabajadores de Dataproc de un clúster para garantizar que los trabajos de Spark tengan los recursos necesarios para completarse con rapidez.

Es unpráctica recomendada para configurar la política de ajuste de escala automáticotrabajadores secundarios las rutas "a GCP".

Usar el modo de flexibilidad mejorada de Dataproc

Los clústeres con VM interrumpibles o una política de ajuste de escala automático pueden recibir excepciones FetchFailed cuando los trabajadores se interrumpen o se quitan antes de terminar de entregar datos aleatorios a los reductores. Esta excepción puede causar reintentos de tarea y tiempos de finalización de trabajo más largos.

Recomendación: Usa el Modo de flexibilidad mejorada de Dataproc, que no almacena datos aleatorios en los trabajadores secundarios, de modo que los trabajadores secundarios puedan interrumpirse o reducirse de forma segura.

Configura la partición y la redistribución

Spark almacena datos en particiones temporales en el clúster. Si tu aplicación agrupa o convierte a DataFrames, los datos se redistribuyen en particiones nuevas según la agrupación y la configuración de bajo nivel.

La partición de datos tiene un impacto significativo en el rendimiento de la aplicación: demasiadas particiones limitan el paralelismo de trabajos y el uso de recursos del clúster. demasiadas particiones ralentizan el trabajo debido a un procesamiento y una redistribución adicionales de particiones adicionales.

Configura particiones

Las siguientes propiedades rigen el número y tamaño de tus particiones:

  • spark.sqlfiles.maxPartitionBytes: el tamaño máximo de las particiones cuando lees datos de Cloud Storage. El valor predeterminado es 128 MB, lo que es lo suficientemente grande para la mayoría de las aplicaciones que procesan menos de 100 TB.

  • spark.sql.shuffle.partitions: la cantidad de particiones después de realizar una reproducción aleatoria. El valor predeterminado es 200, que es adecuado para clústeres con menos de 100 CPU virtuales en total. Recomendación: Configura esto en 3 veces la cantidad de CPU virtuales en tu clúster.

  • spark.default.parallelism: La cantidad de particiones que se muestran después de realizar transformaciones RDD que requieren Shuffle, como join, reduceByKey y parallelize. El valor predeterminado es la cantidad total de CPU virtuales en tu clúster. Cuando usas RDD en trabajos de Spark, puedes establecer este número en 3 veces tus CPU virtuales

Limita la cantidad de archivos

Existe una pérdida de rendimiento cuando Spark lee una gran cantidad de archivos pequeños. Almacena datos en tamaños de archivo de más de 100 MB. Del mismo modo, limita la cantidad de archivos de salida (para forzar una reproducción aleatoria, consulta Evita las remezclas innecesarias).

Cómo configurar la ejecución de una consulta adaptable (Spark 3)

La ejecución de consultas adaptable (habilitada de forma predeterminada en la versión 2.0 de la imagen de Dataproc) proporciona mejoras en el rendimiento de los trabajos de Spark, incluidas las siguientes:

Aunque la configuración predeterminada es sonido para la mayoría de los casos prácticos, establecer spark.sql.adaptive.advisoryPartitionSizeInBytes en spark.sqlfiles.maxPartitionBytes (predeterminado de 128 MB) puede ser beneficioso.

Evita las repeticiones aleatorias innecesarias

Spark permite a los usuarios activar de forma manual un shuffle para volver a balancear sus datos con la función repartition. Las Shuffle son costosas, por lo que la readaptación de datos debe usarse con precaución. Establecer las configuraciones de partición correctamente debería ser suficiente para permitir que Spark haga una partición automática de tus datos.

Excepción: Cuando se escriben datos particionados en columnas en Cloud Storage, volver a particionar en una columna específica evita escribir muchos archivos pequeños para lograr tiempos de escritura más rápidos.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Almacena datos en Parquet o Avro

Spark SQL usa de forma predeterminada para leer y escribir datos en archivos Parquet comprimidos en Snappy. Parquet está en un formato de archivo de columnas eficiente que permite que Spark solo lea los datos que necesita para ejecutar una aplicación. Esta es una ventaja importante cuando se trabaja con conjuntos de datos grandes. Otros formatos de columna, como Apache ORC, también tienen un buen rendimiento.

Para los datos sin columnas, Apache Avro proporciona un formato de archivo eficiente de fila binaria. Aunque, por lo general, es más lento que Parquet, el rendimiento de Avro es mejor que los formatos basados en texto, como CSV o JSON.

Optimiza el tamaño del disco

La capacidad de procesamiento de los discos persistentes escala con el tamaño del disco, lo que puede afectar el rendimiento del trabajo de Spark, ya que los trabajos escriben metadatos y redistribuyen datos en el disco. Cuando usas discos persistentes estándar, el tamaño del disco debe ser al menos 1 terabyte por trabajador (consulta Rendimiento por tamaño de disco persistente).

Para supervisar la capacidad de procesamiento del disco de trabajador en Google Cloud Console, sigue estos pasos:

  1. Haz clic en el nombre del clúster en la página Clústeres.
  2. Haz clic en la pestaña INSTANCIAS DE VM.
  3. Haz clic en cualquier nombre de trabajador.
  4. Haz clic en la pestaña MONITORING y, luego, desplázate hacia abajo hasta la capacidad de procesamiento del disco para ver la capacidad de procesamiento de los trabajadores.

Consideraciones de disco

Los clústeres efímeros de Dataproc, que no se benefician del almacenamiento continuo. puede usar SSD locales. Los SSD locales están conectados de manera física al clúster y proporcionan mayor capacidad de procesamiento que los discos persistentes (consulta la tabla de rendimiento). Los SSD locales están disponibles en un tamaño fijo de 375 gigabytes, pero puedes agregar varios SSD para aumentar el rendimiento.

Los SSD locales no conservan los datos después de que se cierra un clúster. Si deseas el almacenamiento continuo, puedes usar discos persistentes SSD, que proporcionan una mayor capacidad de procesamiento para su tamaño que los discos persistentes estándar. Los discos persistentes SSD también son una buena opción si el tamaño de la partición será menor que 8 KB (sin embargo, Evita los pares pequeños).

Adjunta GPU a tu clúster

Spark 3 es compatible con las GPU. Usa las GPU con la acción de inicialización de RAPIDS para acelerar los trabajos de Spark mediante el Acelerador de SQL de RAPIDS. La acción de inicialización del controlador de GPU para configurar un clúster con GPU.

Correcciones de errores y fallas del trabajo comunes

No hay memoria suficiente

Ejemplos:

  • "Se perdió el ejecutor"
  • “java.lang.OutOfMemoryError: Se excedió el límite de sobrecarga de GC”
  • "Contenedor eliminado por YARN por exceder los límites de memoria"

Posibles correcciones:

Errores de recuperación de Shuffle

Ejemplos:

  • "FetchFailedException" (error de Spark)
  • "Error al conectarse a..." (Error de Spark)
  • "Error al recuperar" (error MapMap)

Por lo general, se debe a la eliminación prematura de los trabajadores que aún tienen datos aleatorios para entregar.

Posibles causas y correcciones:

  • El escalador automático quitó o quitó VM de trabajadores interrumpibles o las VM de trabajadores no interrumpibles. Solución: Usa el Modo de flexibilidad mejorada para que los trabajadores secundarios sean interrumpibles o de manera segura.
  • Se produjo un error en Executor o mapper debido a un error de OutOfMemory. Solución: Aumenta la memoria del ejecutor o el asignador.
  • El servicio de Shuffle de Spark puede estar sobrecargado. Solución: disminuye la cantidad de particiones de trabajo.

Los nodos YARN son UNHEALTHY

Ejemplos (de registros de YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

A menudo, se relaciona con un espacio de disco insuficiente para los datos de redistribución. Para diagnosticar, visualiza los archivos de registro:

  • Abre la página Clústeres de tu proyecto en Cloud Console y, luego, haz clic en el nombre del clúster.
  • Haga clic en VER REGISTROS.
  • Filtra los registros por hadoop-yarn-nodemanager.
  • Busca "UNHEALTHY".

Corrección: Si el registro informa el estado "UNHEALTHY", vuelve a crear tu clúster con más espacio en disco, que aumentará el límite de capacidad de procesamiento.

Más información