Procesamiento paralelo para operaciones JOIN

En esta página, se explica el ajuste de rendimiento para operaciones de JOIN en Cloud Data Fusion.

Las operaciones JOIN pueden ser la parte más costosa de una canalización. Al igual que todo lo demás en una canalización, las operaciones se ejecutan en paralelo. El primer paso de una JOIN es redistribuir los datos para que cada registro con la misma clave JOIN se envíe al mismo ejecutor. Después de que todos los datos se mezclan, se unen y el resultado continúa a través de la canalización.

Ejemplo de procesamiento paralelo en operaciones JOIN

Por ejemplo, supongamos que realizas una operación JOIN en conjuntos de datos llamada Purchases y Items. Cada registro de compra contiene un nombre de artículo y un número de compras. Cada registro de artículo contiene el nombre y el precio del artículo. Se ejecuta una JOIN en el nombre del artículo para calcular el precio total de cada compra. Cuando los datos se unen, los datos se mezclan en el clúster, de modo que los registros con el mismo ID terminan en el mismo ejecutor.

Cuando las claves JOIN están distribuidas de manera bastante uniforme, las operaciones JOIN tienen un buen rendimiento porque se pueden ejecutar en paralelo.

Al igual que cualquier aleatorización, el sesgo de los datos tiene un impacto negativo en el rendimiento. En el ejemplo anterior, los huevos se compran con mucha más frecuencia que el pollo o la leche, lo que significa que el ejecutor que se une a las compras de huevos hace más trabajo que los otros ejecutores. Si notas que un JOIN está sesgado, hay dos formas de mejorar el rendimiento.

Divide automáticamente las particiones sesgadas

Con la ejecución de consultas adaptables, los sesgos muy pesados se controlarán automáticamente. En cuanto un JOIN produce algunas particiones mucho más grandes que otras, se dividen en otras más pequeñas. Para confirmar que tienes habilitada la ejecución de consultas adaptables, consulta Ajuste automático.

Cómo usar un JOIN en la memoria

Se puede realizar una JOIN en la memoria si un lado del JOIN es lo suficientemente pequeño como para caber en la memoria. En esta situación, el conjunto de datos pequeño se carga en la memoria y, luego, se transmite a cada ejecutor. El conjunto de datos grande no se redistribuye en absoluto, lo que quita las particiones desiguales que se generan cuando se redistribuye en la clave JOIN.

En el ejemplo anterior, el conjunto de datos de elementos se carga primero en la memoria del controlador de Spark. Luego, se transmite a cada ejecutor. Los ejecutores ahora pueden unir los datos sin mezclar el conjunto de datos de compra.

Este enfoque requiere que otorgues suficiente memoria al controlador y a los ejecutores de Spark para permitirles almacenar el conjunto de datos de transmisión en la memoria. De forma predeterminada, Spark reserva un poco menos del 30% de su memoria para almacenar este tipo de datos. Cuando uses JOIN en la memoria, multiplica el tamaño del conjunto de datos por cuatro y configúralo como la memoria del ejecutor y del controlador. Por ejemplo, si el conjunto de datos de los elementos tenía un tamaño de 1 GB, tendríamos que establecer la memoria del ejecutor y del controlador en al menos 4 GB. Los conjuntos de datos de más de 8 GB no se pueden cargar en la memoria.

Distribución de claves

Cuando ambos lados de JOIN son demasiado grandes para caber en la memoria, se puede usar una técnica diferente para dividir cada clave JOIN en varias claves y aumentar el nivel de paralelismo. Esta técnica se puede aplicar a las operaciones INNER JOIN y LEFT OUTER JOIN. No se puede usar para operaciones FULL OUTER JOIN.

En este enfoque, el lado sesgado se sala con una columna de número entero nueva con un número al azar del 1 al N. El lado no sesgado se expande y cada fila existente genera N filas nuevas. Se agrega una nueva columna en el lado desglosado, propagado con cada número del 1 al N. Luego, se realiza una JOIN normal, excepto que la columna nueva se agrega como parte de la clave JOIN. De esta manera, todos los datos que solían ir a una sola partición ahora se distribuyen hasta N particiones diferentes.

En el ejemplo anterior, el factor de distribución N se establece en 3. Los conjuntos de datos originales se muestran a la izquierda. Las versiones con sal y expandidas del conjunto de datos se muestran en el medio. Los datos aleatorios se muestran a la derecha, con tres ejecutores diferentes que unen las compras de huevos, en lugar de uno.

Se logra un mayor paralelismo mediante el aumento de las distribuciones. Sin embargo, esto tiene el costo de expandir un lado de JOIN, lo que da como resultado más datos redistribuidos en todo el clúster. Debido a esto, el beneficio disminuye a medida que aumenta la distribución. En la mayoría de los casos, configúralo en 20 o menos.

¿Qué sigue?