Los flujos de trabajo se ejecutan en clústeres de máquinas. Consiguen un alto rendimiento dividiendo el trabajo que hay que hacer y, a continuación, ejecutándolo en paralelo en los distintos ejecutores distribuidos por todo el clúster. Por lo general, cuanto mayor sea el número de divisiones (también llamadas particiones), más rápido se podrá ejecutar la canalización. El nivel de paralelismo de tu canalización se determina en función de las fuentes y las fases de aleatorización de la canalización.
Fuentes
Al inicio de cada ejecución de la canalización, cada fuente de la canalización calcula qué datos se deben leer y cómo se pueden dividir en divisiones. Por ejemplo, supongamos que tienes un flujo de procesamiento básico que lee datos de Cloud Storage, realiza algunas transformaciones con Wrangler y, a continuación, escribe los datos en Cloud Storage.
Cuando se inicia la canalización, la fuente de Cloud Storage examina los archivos de entrada y los divide en fragmentos en función de su tamaño. Por ejemplo, un archivo de un gigabyte se puede dividir en 100 partes de 10 MB cada una. Cada ejecutor lee los datos de esa división, ejecuta las transformaciones de Wrangler y, a continuación, escribe la salida en un archivo part.
Si tu flujo de procesamiento va lento, una de las primeras cosas que debes comprobar es si tus fuentes están creando suficientes divisiones para aprovechar al máximo el paralelismo. Por ejemplo, algunos tipos de compresión hacen que los archivos de texto sin formato no se puedan dividir. Si lees archivos que se han comprimido con gzip, puede que observes que tu canalización funciona mucho más lento que si leyeras archivos sin comprimir o archivos comprimidos con BZIP (que se pueden dividir). Del mismo modo, si usas la fuente de la base de datos y la has configurado para que use solo una división, funcionará mucho más lento que si la configuras para que use más divisiones.
Aleatorio
Algunos tipos de complementos provocan que los datos se barajen en todo el clúster. Esto ocurre cuando los registros que procesa un ejecutor deben enviarse a otro para realizar el cálculo. Las mezclas son operaciones costosas porque implican muchas operaciones de E/S. Los complementos que provocan que los datos se barajen se muestran en la sección Analytics de Pipeline Studio. Entre ellos se incluyen los complementos Group By, Deduplicate, Distinct y Joiner. Por ejemplo, supongamos que se añade una fase Agrupar por a la canalización del ejemplo anterior.
Supongamos también que los datos que se leen representan las compras realizadas en una tienda de comestibles.
Cada registro contiene un campo item
y un campo num_purchased
. En la fase Agrupar por, configuramos la canalización para agrupar los registros en el campo item
y calcular la suma del campo num_purchased
.
Cuando se ejecuta el proceso, los archivos de entrada se dividen tal como se ha descrito anteriormente. Después, cada registro se baraja en el clúster de forma que todos los registros con el mismo elemento pertenezcan al mismo ejecutor.
Como se muestra en el ejemplo anterior, los registros de las compras de Apple se distribuyeron originalmente en varios ejecutores. Para llevar a cabo la agregación, todos esos registros debían enviarse a través del clúster al mismo ejecutor.
La mayoría de los complementos que requieren una aleatorización te permiten especificar el número de particiones que se van a usar al aleatorizar los datos. Controla cuántos ejecutores se usan para procesar los datos aleatorizados.
En el ejemplo anterior, si el número de particiones es 2
, cada ejecutor calcula las agregaciones de dos elementos en lugar de uno.
Ten en cuenta que es posible reducir el paralelismo de tu canalización después de esa fase. Por ejemplo, considere la vista lógica de la canalización:
Si la fuente divide los datos en 500 particiones, pero la función Group By los reorganiza en 200 particiones, el nivel máximo de paralelismo después de Group By se reduce de 500 a 200. En lugar de 500 archivos de partes diferentes escritos en Cloud Storage, solo tienes 200.
Elegir particiones
Si el número de particiones es demasiado bajo, no utilizará toda la capacidad de su clúster para paralelizar todo el trabajo posible. Si se definen particiones demasiado grandes, se incrementa la cantidad de sobrecarga innecesaria. En general, es mejor usar demasiadas particiones que demasiado pocas. El tiempo de inactividad adicional es algo que debes tener en cuenta si tu canalización tarda unos minutos en ejecutarse y quieres reducirla en un par de minutos. Si tu canalización tarda horas en ejecutarse, el tiempo de inactividad no es algo que te deba preocupar.
Una forma útil, aunque demasiado simplista, de determinar el número de particiones que se van a usar es establecerlo en max(cluster CPUs, input records / 500,000)
. En otras palabras, toma el número de registros de entrada y divídelo entre 500.000. Si ese número es mayor que el número de CPUs del clúster, úsalo como número de particiones.
De lo contrario, usa el número de CPUs del clúster. Por ejemplo, si tu clúster tiene 100 CPUs y se espera que la fase de aleatorización tenga 100 millones de registros de entrada, usa 200 particiones.
Una respuesta más completa es que las mezclas funcionan mejor cuando los datos de mezcla intermedios de cada partición caben completamente en la memoria de un ejecutor, de modo que no es necesario que se escriban en el disco. Spark reserva algo menos del 30% de la memoria de un ejecutor para almacenar datos de aleatorización. El número exacto es (memoria total - 300 MB) * 30%. Si suponemos que cada ejecutor está configurado para usar 2 GB de memoria, significa que cada partición no debe contener más de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Si suponemos que cada registro se comprime hasta 1 KB, significa que (500 MB por partición) / (1 KB por registro) = 500.000 registros por partición. Si tus ejecutores usan más memoria o tus registros son más pequeños, puedes ajustar este número en consecuencia.
Distribución desigual de los datos
Ten en cuenta que, en el ejemplo anterior, las compras de varios artículos se distribuyeron de forma uniforme. Es decir, se han comprado tres unidades de manzanas, plátanos, zanahorias y huevos. El orden aleatorio con una clave distribuida de forma uniforme es el tipo de orden aleatorio más eficiente, pero muchos conjuntos de datos no tienen esta propiedad. Siguiendo con el ejemplo de la compra en la tienda de comestibles, es de esperar que se compren muchos más huevos que tarjetas de boda. Cuando hay algunas claves de aleatorización que son mucho más comunes que otras, se trata de datos sesgados. Los datos sesgados pueden dar resultados significativamente peores que los datos no sesgados, ya que una cantidad desproporcionada de trabajo la realizan unos pocos ejecutores. Esto provoca que un pequeño subconjunto de particiones sea mucho mayor que el resto.
En este ejemplo, hay cinco veces más compras de huevos que de tarjetas, por lo que el agregado de huevos tarda aproximadamente cinco veces más en calcularse. No importa mucho cuando se trata de 10 registros en lugar de dos, pero supone una gran diferencia cuando se trata de cinco mil millones de registros en lugar de mil millones. Cuando hay sesgo de datos, el número de particiones que se usan en un shuffle no influye mucho en el rendimiento de la canalización.
Puede reconocer la asimetría de los datos examinando el gráfico de registros de salida a lo largo del tiempo. Si la fase genera registros a un ritmo mucho mayor al inicio de la ejecución de la canalización y, de repente, se ralentiza, puede que los datos estén sesgados.
También puede reconocer la asimetría de los datos examinando el uso de la memoria del clúster a lo largo del tiempo. Si tu clúster está a plena capacidad durante un tiempo, pero de repente tiene un uso de memoria bajo durante un periodo, también es un signo de que estás lidiando con una asimetría de datos.
Los datos sesgados afectan al rendimiento de forma más significativa cuando se realiza una unión. Hay algunas técnicas que se pueden usar para mejorar el rendimiento de las combinaciones sesgadas. Para obtener más información, consulta Procesamiento en paralelo de operaciones JOIN
.
Ajuste adaptativo para la ejecución
Para ajustar la ejecución de forma adaptativa, especifica el intervalo de particiones que se va a usar, no el número exacto de la partición. El número de partición exacto, aunque se haya definido en la configuración de la canalización, se ignora cuando la ejecución adaptativa está habilitada.
Si usas un clúster de Dataproc efímero, Cloud Data Fusion define la configuración adecuada automáticamente. Sin embargo, en el caso de los clústeres de Dataproc o Hadoop estáticos, se pueden definir los dos parámetros de configuración siguientes:
spark.default.parallelism
: defínelo como el número total de vCores disponibles en el clúster. De esta forma, te aseguras de que el clúster no esté sobrecargado y defines el límite inferior del número de particiones.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: defínelo en 32 veces el número de vCores disponibles en el clúster. Define el límite superior del número de particiones.Spark.sql.adaptive.enabled
: para habilitar las optimizaciones, asigna el valortrue
. Dataproc lo define automáticamente, pero, si usas clústeres de Hadoop genéricos, debes asegurarte de que esté habilitado .
Estos parámetros se pueden definir en la configuración del motor de una canalización específica o en las propiedades del clúster de un clúster de Dataproc estático.
Siguientes pasos
- Consulta información sobre el procesamiento en paralelo de las operaciones de
JOIN
.