Procesamiento paralelo

Las canalizaciones se ejecutan en clústeres de máquinas. Para lograr una alta capacidad de procesamiento, dividen el trabajo que se debe realizar y, luego, lo ejecutan en paralelo en los múltiples ejecutores distribuidos en el clúster. En general, cuanto mayor sea la cantidad de divisiones (también llamadas particiones), más rápido podrá ejecutarse la canalización. El nivel de paralelismo en tu canalización está determinado por las fuentes y las etapas de redistribución de la canalización.

Fuentes

Al comienzo de cada ejecución de la canalización, cada fuente de la canalización calcula qué datos se deben leer y cómo se pueden dividir esos datos. Por ejemplo, considera una canalización básica que lee desde Cloud Storage, realiza algunas transformaciones de Wrangler y, luego, vuelve a escribir en Cloud Storage.

Canalización básica que muestra la fuente de Cloud Storage, la transformación de Wrangler y el destino de Cloud Storage

Cuando se inicia la canalización, la fuente de Cloud Storage examina los archivos de entrada y los divide en función de los tamaños de los archivos. Por ejemplo, un archivo de un gigabyte se puede dividir en 100 fragmentos, cada uno de 10 MB de tamaño. Cada ejecutor lee los datos de esa división, ejecuta las transformaciones de Wrangler y, luego, escribe el resultado en un archivo part.

Datos particionados en Cloud Storage en transformaciones Wrangler paralelas en archivos de partes

Si tu canalización se ejecuta con lentitud, una de las primeras cosas que debes verificar es si tus fuentes crean suficientes divisiones para aprovechar al máximo el paralelismo. Por ejemplo, algunos tipos de compresión hacen que los archivos de texto simple no se puedan dividir. Si estás leyendo archivos comprimidos en gzip, es posible que notes que tu canalización se ejecuta mucho más lento que si estuvieras leyendo archivos sin comprimir o archivos comprimidos con BZIP (que se puede dividir). Del mismo modo, si usas la fuente de la base de datos y la configuraste para usar solo una división, se ejecutará mucho más lento que si la configuras para usar más divisiones.

Mezclas

Ciertos tipos de complementos hacen que los datos se mezclen en el clúster. Esto ocurre cuando los registros que procesa un ejecutor deben enviarse a otro para realizar el procesamiento. Los shuffles son operaciones costosas porque implican muchas operaciones de E/S. Los complementos que hacen que los datos se mezclen aparecen en la sección Analytics de Pipeline Studio. Estos incluyen complementos, como Agrupar por, Eliminar duplicados, Distinto y Vincular. Por ejemplo, supongamos que se agrega una etapa Group By a la canalización en el ejemplo anterior.

Supongamos también que los datos que se leen representan compras realizadas en una tienda de comestibles. Cada registro contiene un campo item y un campo num_purchased. En la etapa Agrupar por, configuramos la canalización para agrupar registros en el campo item y calcular la suma del campo num_purchased.

Cuando se ejecuta la canalización, los archivos de entrada se dividen como se describió anteriormente. Después de eso, cada registro se mezcla en el clúster de modo que cada registro con el mismo elemento pertenezca al mismo ejecutor.

Como se ilustra en el ejemplo anterior, los registros de las compras de manzanas se distribuyeron originalmente entre varios ejecutores. Para realizar la agregación, todos esos registros debían enviarse al mismo ejecutor a través del clúster.

La mayoría de los complementos que requieren un orden aleatorio te permiten especificar la cantidad de particiones que se deben usar cuando se ordenan los datos de forma aleatoria. Esto controla cuántos ejecutores se usan para procesar los datos aleatorios.

En el ejemplo anterior, si la cantidad de particiones se establece en 2, cada ejecutor calcula los totales de dos elementos en lugar de uno.

Ten en cuenta que es posible disminuir el paralelismo de tu canalización después de esa etapa. Por ejemplo, considera la vista lógica de la canalización:

Si la fuente divide los datos en 500 particiones, pero la agrupación por intercala con 200 particiones, el nivel máximo de paralelismo después de la agrupación por disminuye de 500 a 200. En lugar de 500 archivos de partes diferentes escritos en Cloud Storage, solo tienes 200.

Cómo elegir las particiones

Si la cantidad de particiones es demasiado baja, no usarás la capacidad completa de tu clúster para paralelizar la mayor cantidad de trabajo posible. Si estableces las particiones demasiado altas, aumenta la cantidad de sobrecarga innecesaria. En general, es mejor usar demasiadas particiones que muy pocas. La sobrecarga adicional es algo que debes tener en cuenta si tu canalización tarda unos minutos en ejecutarse y estás tratando de reducir unos minutos. Si tu canalización tarda horas en ejecutarse, por lo general, no debes preocuparte por la sobrecarga.

Una forma útil, pero demasiado simplista, de determinar la cantidad de particiones que se usarán es configurarla en max(cluster CPUs, input records / 500,000). En otras palabras, toma la cantidad de registros de entrada y divídelos por 500,000. Si esa cantidad es mayor que la cantidad de CPUs del clúster, úsala para la cantidad de particiones. De lo contrario, usa la cantidad de CPUs del clúster. Por ejemplo, si tu clúster tiene 100 CPU y se espera que la etapa de mezcla tenga 100 millones de registros de entrada, usa 200 particiones.

Una respuesta más completa es que los reordenamientos funcionan mejor cuando los datos de reordenamiento intermedios de cada partición pueden caber por completo en la memoria de un ejecutor, de modo que no se deba volcar nada en el disco. Spark reserva poco menos del 30% de la memoria de un ejecutor para contener datos de shuffle. El número exacto es (memoria total - 300 MB) × 30%. Si suponemos que cada ejecutor está configurado para usar 2 GB de memoria, eso significa que cada partición no debe contener más de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Si suponemos que cada registro se comprime a 1 KB de tamaño, eso significa que (500 MB / partición) / (1 KB/registro) = 500,000 registros por partición. Si tus ejecutores usan más memoria o tus registros son más pequeños, puedes ajustar esta cantidad según corresponda.

Sesgo de datos

Ten en cuenta que, en el ejemplo anterior, las compras de varios artículos se distribuyeron de manera uniforme. Es decir, hubo tres compras de manzanas, bananas, zanahorias y huevos. El ordenamiento aleatorio en una clave distribuida de forma uniforme es el tipo de ordenamiento aleatorio con mejor rendimiento, pero muchos conjuntos de datos no tienen esta propiedad. Siguiendo con la compra en la tienda de comestibles del ejemplo anterior, se esperaría que haya muchas más compras de huevos que de tarjetas de boda. Cuando hay algunas teclas de shuffle que son mucho más comunes que otras, significa que tienes datos sesgados. Los datos sesgados pueden tener un rendimiento mucho peor que los datos no sesgados porque un puñado de ejecutores realiza una cantidad desproporcionada de trabajo. Hace que un subconjunto pequeño de particiones sea mucho más grande que todos los demás.

En este ejemplo, hay cinco veces más compras de huevos que de tarjetas, lo que significa que el agregado de huevos tarda aproximadamente cinco veces más en calcularse. No importa mucho cuando se trata de solo 10 registros, en lugar de dos, pero marca una gran diferencia cuando se trata de cinco mil millones de registros en lugar de uno. Cuando hay sesgo de datos, la cantidad de particiones que se usan en un ordenamiento aleatorio no tiene un gran impacto en el rendimiento de la canalización.

Para reconocer la distorsión de datos, examina el gráfico en busca de registros de salida a lo largo del tiempo. Si la etapa genera registros a un ritmo mucho más alto al comienzo de la ejecución de la canalización y, luego, se ralentiza de repente, es posible que tengas datos sesgados.

También puedes reconocer el sesgo de datos examinando el uso de la memoria del clúster a lo largo del tiempo. Si tu clúster está a plena capacidad durante un tiempo, pero de repente tiene un uso bajo de la memoria durante un período, esto también es un indicador de que estás lidiando con un sesgo de datos.

Los datos sesgados afectan de manera más significativa el rendimiento cuando se realiza una unión. Existen algunas técnicas que se pueden usar para mejorar el rendimiento de las combinaciones sesgadas. Para obtener más información, consulta Procesamiento en paralelo para operaciones JOIN.

Ajuste adaptable para la ejecución

Para ajustar la ejecución de forma adaptativa, especifica el rango de particiones que se usarán, no el número exacto de particiones. El número de partición exacto, incluso si se establece en la configuración de la canalización, se ignora cuando se habilita la ejecución adaptativa.

Si usas un clúster efímero de Dataproc, Cloud Data Fusion establece la configuración adecuada automáticamente, pero para los clústeres estáticos de Dataproc o Hadoop, se pueden establecer los siguientes dos parámetros de configuración:

  • spark.default.parallelism: Establece la cantidad total de vCores disponibles en el clúster. Esto garantiza que tu clúster no tenga una carga insuficiente y define el límite inferior para la cantidad de particiones.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: Establece el valor en 32 veces la cantidad de vCores disponibles en el clúster. Esto define el límite superior para la cantidad de particiones.
  • Spark.sql.adaptive.enabled: Para habilitar las optimizaciones, establece este valor en true. Dataproc lo establece automáticamente, pero si usas clústeres genéricos de Hadoop, debes asegurarte de que esté habilitado .

Estos parámetros se pueden establecer en la configuración del motor de una canalización específica o en las propiedades del clúster de un clúster de Dataproc estático.

¿Qué sigue?