Procesamiento paralelo

Las canalizaciones se ejecutan en clústeres de máquinas. Logran una capacidad de procesamiento alta mediante la división del trabajo que se debe realizar y, luego, ejecutando el trabajo en paralelo en los múltiples ejecutores distribuidos en el clúster. En general, cuanto mayor sea la cantidad de divisiones (también llamadas particiones), más rápido se podrá ejecutar la canalización. El nivel de paralelismo en tu canalización se determina según las fuentes y las etapas de Shuffle de la canalización.

Fuentes

Al comienzo de cada ejecución de canalización, cada fuente de la canalización calcula qué datos se deben leer y cómo se pueden dividir esos datos. Por ejemplo, considera una canalización básica que lee desde Cloud Storage, realiza algunas transformaciones de Wrangler y, luego, escribe de nuevo en Cloud Storage.

Canalización básica que muestra la fuente de Cloud Storage, la transformación de Wrangler y el receptor de Cloud Storage

Cuando se inicia la canalización, la fuente de Cloud Storage examina los archivos de entrada y los divide en divisiones según los tamaños de los archivos. Por ejemplo, un archivo de un solo gigabyte se puede dividir en 100 divisiones de 10 MB cada una. Cada ejecutor lee los datos para esa división, ejecuta las transformaciones de Wrangler y, luego, escribe el resultado en un archivo part.

Datos particionados en Cloud Storage en transformaciones de Wrangler paralelas en archivos de partes

Si la canalización se ejecuta con lentitud, una de las primeras tareas que debes verificar es si tus fuentes crean divisiones suficientes para aprovechar al máximo el paralelismo. Por ejemplo, algunos tipos de compresión hacen que los archivos de texto simple no se puedan dividir. Si lees archivos que se comprimieron con gzip, es posible que notes que la canalización se ejecuta mucho más lento que si lees archivos sin comprimir o archivos comprimidos con BZIP (que se puede dividir). De manera similar, si usas la fuente de la base de datos y la configuraste para usar una sola división, se ejecutará mucho más lento que si la configuraras para que use más divisiones.

Reproducción aleatoria

Ciertos tipos de complementos hacen que los datos se distribuyan en el clúster. Esto sucede cuando los registros que procesa un ejecutor deben enviarse a otro ejecutor para realizar el procesamiento. Los Shuffles son operaciones costosas porque implican mucha E/S. Los complementos que provocan la redistribución de los datos se muestran en la sección Analytics de Pipeline Studio. Estos incluyen complementos, como Group By, DeDuplicate, Distinct y Joiner. Por ejemplo, supongamos que se agrega una etapa de Agrupar por a la canalización en el ejemplo anterior.

También supongamos que los datos que se leen representan compras realizadas en un supermercado. Cada registro contiene un campo item y un campo num_purchased. En la etapa Agrupar por, configuramos la canalización para agrupar registros en el campo item y calcular la suma del campo num_purchased.

Cuando se ejecuta la canalización, los archivos de entrada se dividen como se describió antes. Después de eso, cada registro se mezcla en el clúster, de modo que cada registro con el mismo elemento pertenezca al mismo ejecutor.

Como se ilustra en el ejemplo anterior, los registros de compras de manzanas se repartieron originalmente en varios ejecutores. Para realizar la agregación, todos esos registros debían enviarse a través del clúster al mismo ejecutor.

La mayoría de los complementos que requieren una redistribución te permiten especificar la cantidad de particiones que se usarán cuando se redistribuyan los datos. Esto controla cuántos ejecutores se usan para procesar los datos aleatorios.

En el ejemplo anterior, si la cantidad de particiones se establece en 2, cada ejecutor calcula las agregaciones de dos elementos en lugar de uno.

Ten en cuenta que es posible disminuir el paralelismo de la canalización después de esa etapa. Por ejemplo, considera la vista lógica de la canalización:

Si la fuente divide los datos en 500 particiones, pero Agrupar por redistribuye con 200 particiones, el nivel máximo de paralelismo después de Agrupar por disminuye de 500 a 200. En lugar de 500 archivos de partes diferentes escritos en Cloud Storage, solo tienes 200.

Elige particiones

Si la cantidad de particiones es demasiado baja, no usarás la capacidad completa de tu clúster para paralelizar tanto trabajo como puedas. Configurar las particiones en un nivel demasiado alto aumenta la cantidad de sobrecarga innecesaria. En general, es mejor usar demasiadas particiones que pocas. La sobrecarga adicional es algo de lo que debes preocuparte si la canalización tarda unos minutos en ejecutarse y intentas quitar unos minutos. Si tu canalización tarda horas en ejecutarse, por lo general, no debes preocuparte por la sobrecarga.

Una forma útil, pero demasiado simplista de determinar la cantidad de particiones que se usarán es establecerla en max(cluster CPUs, input records / 500,000). En otras palabras, toma el número de registros de entrada y divídelo por 500,000. Si esa cantidad es mayor que la cantidad de CPU del clúster, úsala para la cantidad de particiones. De lo contrario, usa la cantidad de CPU del clúster. Por ejemplo, si tu clúster tiene 100 CPU y se espera que la etapa de redistribución tenga 100 millones de registros de entrada, usa 200 particiones.

Una respuesta más completa es que las redistribuciones funcionan mejor cuando los datos intermedios de Shuffle de cada partición pueden caber por completo en la memoria de un ejecutor, de modo que no sea necesario derramar nada en el disco. Spark reserva un poco menos del 30% de la memoria de un ejecutivo para almacenar datos de Shuffle. El número exacto es (memoria total - 300 MB) * 30%. Si suponemos que cada ejecutor está configurado para usar 2 GB de memoria, eso significa que cada partición no debe contener más de (2 GB a 300 MB) × 30% = aproximadamente 500 MB de registros. Si suponemos que cada registro se comprime hasta 1 KB de tamaño, eso significa (500 MB / partición) / (1 KB/registro) = 500,000 registros por partición. Si tus ejecutores usan más memoria o tus registros son más pequeños, puedes ajustar este número según corresponda.

Sesgo de datos

Ten en cuenta que, en el ejemplo anterior, las compras de varios artículos se distribuyeron de manera uniforme. Es decir, hubo tres compras, cada una de manzanas, bananas, zanahorias y huevos. La mezcla en una clave distribuida de manera uniforme es el tipo de redistribución con mejor rendimiento, pero muchos conjuntos de datos no tienen esta propiedad. Siguiendo con la compra de una tienda de comestibles en el ejemplo anterior, es de esperar que haya muchas más compras de huevos que de tarjetas de boda. Cuando hay algunas claves de Shuffle que son mucho más comunes que otras claves, se trata de datos sesgados. Los datos sesgados pueden tener un rendimiento mucho peor que los no sesgados, ya que un pequeño puñado de ejecutivos realiza una cantidad desproporcionada de trabajo. Hace que un pequeño subconjunto de particiones sea mucho más grande que todos los demás.

En este ejemplo, hay cinco veces más compras de huevos que compras con tarjeta, lo que significa que el agregado de huevos tarda aproximadamente cinco veces más en calcularse. No importa mucho cuando trabajas con solo 10 registros, en lugar de dos, pero marca una gran diferencia cuando se trata de cinco mil millones de registros en lugar de mil. Cuando tienes un sesgo de datos, la cantidad de particiones usadas en una redistribución no tiene un gran impacto en el rendimiento de la canalización.

Para reconocer el sesgo de los datos, puedes examinar el grafo en busca de registros de salida a lo largo del tiempo. Si la etapa genera registros a un ritmo mucho más alto al comienzo de la ejecución de la canalización y, de repente, se ralentiza, puede significar que los datos están sesgados.

También puedes reconocer el sesgo de los datos examinando el uso de memoria del clúster a lo largo del tiempo. Si tu clúster alcanzó su capacidad máxima durante un tiempo, pero de repente tiene un uso bajo de memoria durante un período, esto también es una señal de que estás lidiando con un sesgo de datos.

Los datos sesgados afectan de forma más significativa el rendimiento cuando se realiza una unión. Existen algunas técnicas que se pueden usar para mejorar el rendimiento de las uniones sesgadas. Si deseas obtener más información, consulta Procesamiento paralelo para operaciones de JOIN.

Ajuste adaptable para la ejecución

Especifica el rango de particiones que se usará para ajustar la ejecución de forma adaptable, no el número exacto. El número exacto de partición, incluso si se establece en la configuración de la canalización, se ignora cuando la ejecución adaptable está habilitada.

Si usas un clúster de Dataproc efímero, Cloud Data Fusion establece la configuración adecuada de forma automática, pero para clústeres estáticos de Dataproc o Hadoop, se pueden establecer los siguientes dos parámetros de configuración:

  • spark.default.parallelism: Configúralo en la cantidad total de vCore disponibles en el clúster. Esto garantiza que tu clúster no esté sobrecargado y define el límite inferior para la cantidad de particiones.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: Configúralo en 32 veces la cantidad de vCore disponibles en el clúster. Esto define el límite superior de la cantidad de particiones.
  • Spark.sql.adaptive.enabled: Para habilitar las optimizaciones, establece este valor en true. Dataproc lo configura automáticamente, pero si usas clústeres de Hadoop genéricos, debes asegurarte de que esté habilitado .

Estos parámetros se pueden establecer en la configuración del motor de una canalización específica o en las propiedades del clúster de un clúster estático de Dataproc.

¿Qué sigue?