Las canalizaciones se ejecutan en clústeres de máquinas. Logran una alta capacidad de procesamiento gracias a dividir el trabajo que debe realizarse y, luego, ejecutarlo en paralelos a los múltiples ejecutores distribuidos en el clúster. En general, cuanto mayor sea la cantidad de divisiones (también llamadas particiones), más rápido se puede ejecutar la canalización. El nivel de paralelismo en tu canalización está determinado por entre las fuentes y las etapas de Shuffle de la canalización.
Fuente
Al comienzo de cada ejecución de la canalización, cada fuente de la canalización calcula qué datos se deben leer y cómo se pueden dividir esos datos. Por ejemplo, considera una canalización básica que lee desde Cloud Storage, realiza algunas transformaciones de Wrangler y, luego, vuelve a escribir en Cloud Storage.
Cuando se inicia la canalización, la fuente de Cloud Storage examina los archivos de entrada y los divide en función de los tamaños de los archivos. Por ejemplo, un de un solo gigabyte se puede dividir en 100 divisiones, cada una de 10 MB en de tamaño del ensamble. Cada ejecutor lee los datos de esa división, ejecuta las transformaciones de Wrangler y, luego, escribe el resultado en un archivo part.
Si tu canalización se ejecuta con lentitud, una de las primeras cosas que debes verificar es si tus fuentes crean suficientes divisiones para aprovechar al máximo el paralelismo. Por ejemplo, algunos tipos de compresión hacen que los archivos de texto simple no se puedan dividir. Si lees archivos comprimidos en gzip, es posible que notes que tu canalización se ejecuta mucho más lento que si leyeras archivos sin comprimir o archivos comprimidos con BZIP (que se puede dividir). Del mismo modo, si usas fuente de base de datos y la configuraste para que use una sola división, ejecuta mucho más lento que si lo configuraras para usar más divisiones.
Mezclas
Ciertos tipos de complementos hacen que los datos se mezclen en el clúster. Esta ocurre cuando los registros que procesa un ejecutor deben enviarse a otro ejecutor para realizar el procesamiento. Las Shuffles son operaciones costosas porque involucran mucho I/O. Los complementos que provocan la mezcla de los datos aparecen en en la sección Analytics de Pipeline Studio. Estos incluyen complementos, como Agrupar por, Eliminar duplicados, Distinto y Vincular. Por ejemplo, supongamos que se agrega una etapa Group By a la canalización en el ejemplo anterior.
Supongamos también que los datos que se leen representan compras realizadas en una tienda de comestibles.
Cada registro contiene un campo item
y un campo num_purchased
. En el Grupo
Por etapa, configuramos la canalización para agrupar registros en el campo item
y
calcular la suma del campo num_purchased
.
Cuando se ejecuta la canalización, los archivos de entrada se dividen como se describió anteriormente. Después del que cada registro se mezcle en el clúster, de manera que cada registro con el mismo elemento pertenece al mismo ejecutor.
Como se ilustra en el ejemplo anterior, los registros de compras de manzanas se distribuida originalmente en varios ejecutores. Para realizar la agregación, todos esos registros debían enviarse al mismo ejecutor a través del clúster.
La mayoría de los complementos que requieren un orden aleatorio te permiten especificar la cantidad de particiones que se usarán cuando se mezclen los datos. Esto controla cuántos ejecutores se usan para procesar los datos aleatorios.
En el ejemplo anterior, si la cantidad de particiones se establece en 2
, cada ejecutor calcula los totales de dos elementos en lugar de uno.
Ten en cuenta que es posible disminuir el paralelismo de la canalización si lo haces. etapa. Por ejemplo, considera la vista lógica de la canalización:
Si la fuente divide los datos en 500 particiones, pero la opción Agrupar por redistribuye mediante 200 particiones, el nivel máximo de paralelismo luego de que la agrupación de 500 a 200. En lugar de 500 archivos de partes diferentes escritos en Cloud Storage, solo tienes 200.
Cómo elegir las particiones
Si la cantidad de particiones es demasiado baja, no usarás la capacidad completa de para paralelizar la mayor cantidad de trabajo posible. Configura también las particiones los altos aumentan la sobrecarga innecesaria. En general, es mejor usa demasiadas particiones que pocas. La sobrecarga adicional es algo que debes tener en cuenta si tu canalización tarda unos minutos en ejecutarse y estás tratando de reducir unos minutos. Si tu canalización tarda horas en ejecutarse, por lo general, no debes preocuparte por la sobrecarga.
Una forma útil, pero demasiado simplista, de determinar la cantidad de particiones para
usar es establecerlo en max(cluster CPUs, input records / 500,000)
. En otro
palabras, toma el número de registros de entrada y divídelo por 500,000. Si ese número es
mayor que la cantidad de CPU del clúster, úsala para la cantidad de particiones.
De lo contrario, usa la cantidad de CPU del clúster. Por ejemplo, si tu clúster tiene
100 CPU y se espera que la etapa de mezcla tenga 100 millones de registros de entrada,
usa 200 particiones.
Una respuesta más completa es que los reordenamientos funcionan mejor cuando los datos de reordenamiento intermedios de cada partición pueden caber por completo en la memoria de un ejecutor, de modo que no se deba volcar nada en el disco. Spark reserva poco menos del 30% de la memoria de un ejecutor para contener datos de shuffle. El número exacto es (memoria total - 300 MB) × 30%. Si suponemos que cada ejecutor está configurado para usar 2 GB de memoria, Esto significa que cada partición no debe contener más de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Si suponemos que cada registro se comprime a 1 KB de tamaño, eso significa que (500 MB/partición)/ (1 KB/registro) = 500,000 registros por partición. Si tus ejecutores usan más memoria o si tus registros son más pequeños, puedes ajustar este número según corresponda.
Sesgo de datos
Ten en cuenta que, en el ejemplo anterior, las compras de varios artículos se distribuyeron de manera uniforme. Es decir, hubo tres compras de manzanas, bananas, zanahorias y huevos. La mezcla en una clave distribuida de forma uniforme es la más eficaz de Shuffle, pero muchos conjuntos de datos no tienen esta propiedad. Continuando con la compras en la tienda de comestibles en el ejemplo anterior, esperarías tener muchas más compras de huevos que de tarjetas de boda. Cuando hay algunas teclas de reproducción aleatoria que son mucho más comunes que otras, significa que tienes datos sesgados. Los datos sesgados pueden tener un rendimiento significativamente peor que los no sesgados porque un una cantidad desproporcionada de trabajo está siendo realizada por un pequeño puñado de ejecutores. Hace que un subconjunto pequeño de particiones sea mucho más grande que todos los demás.
En este ejemplo, hay cinco veces más compras de huevos que compras con tarjeta lo que significa que el agregado de huevos tarda aproximadamente cinco veces más en calcularse. No importa mucho cuando se trata de solo 10 registros, en lugar de dos, pero hace una gran diferencia cuando se trata de cinco mil millones de registros en lugar de uno. Cuando hay sesgo de datos, la cantidad de particiones que se usan en un ordenamiento aleatorio no tiene un gran impacto en el rendimiento de la canalización.
Para reconocer el sesgo de los datos, puedes examinar el grafo en busca de registros de salida a lo largo del tiempo. Si la etapa está generando registros a un ritmo mucho más alto al comienzo del de la canalización y, luego, se ralentiza repentinamente, lo que podría significar que tienes datos sesgados.
También puedes reconocer el sesgo de los datos examinando el uso de memoria del clúster a lo largo del tiempo. Si tu clúster está a plena capacidad durante un tiempo, pero de repente tiene un uso bajo de la memoria durante un período, esto también es un indicador de que estás lidiando con un sesgo de datos.
Los datos sesgados tienen un mayor impacto en el rendimiento cuando se realiza una unión
una tarea. Existen algunas técnicas que se pueden usar para mejorar el rendimiento de las combinaciones sesgadas. Para obtener más información, consulta
Procesamiento paralelo para operaciones de JOIN
.
Ajuste adaptable para la ejecución
Para ajustar la ejecución de forma adaptativa, especifica el rango de particiones que se usarán, no el número exacto de particiones. El número de partición exacto, incluso si se establece en la configuración de la canalización, se ignora cuando se habilita la ejecución adaptativa.
Si usas un clúster efímero de Dataproc, Cloud Data Fusion establece la configuración adecuada automáticamente, pero para los clústeres estáticos de Dataproc o Hadoop, se pueden establecer los siguientes dos parámetros de configuración:
spark.default.parallelism
: Establece la cantidad total de vCores disponibles en el clúster. Esto garantiza que tu clúster no esté sobrecargado y define la límite inferior para la cantidad de particiones.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: Establece el valor en 32 veces la cantidad de vCores disponibles en el clúster. Esto define el límite superior para la cantidad de particiones.Spark.sql.adaptive.enabled
: Para habilitar las optimizaciones, establece este valor comotrue
Dataproc lo configura automáticamente, pero si usas clústeres de Hadoop genéricos, debes asegurarte de que estén habilitados .
Estos parámetros pueden fijarse en la configuración del motor de un o en las propiedades del clúster de un Dataproc estático clúster.
¿Qué sigue?
- Obtén información sobre el procesamiento paralelo para operaciones
JOIN
.