En esta página se explica cómo optimizar el rendimiento de las operaciones de JOIN
en Cloud Data Fusion.
Las operaciones JOIN
pueden ser la parte más cara de una canalización. Al igual que el resto de los elementos de una canalización, las operaciones se ejecutan en paralelo. El primer paso de un JOIN
es redistribuir los datos para que cada registro con la misma clave de JOIN
se envíe al mismo ejecutor. Una vez que se han mezclado todos los datos, se unen y la salida continúa por la canalización.
Ejemplo de procesamiento en paralelo en operaciones de JOIN
Por ejemplo, supongamos que realiza una operación JOIN
en los conjuntos de datos Purchases
y Items
. Cada registro de compra contiene el nombre y el número de artículos comprados. Cada registro de artículo contiene el nombre del artículo y su precio. Se realiza una A
JOIN
en el nombre del artículo para calcular el precio total de cada compra. Cuando se combinan los datos, estos se barajan en todo el clúster de forma que los registros con el mismo ID acaben en el mismo ejecutor.
Cuando las claves JOIN
están distribuidas de forma más o menos uniforme, las operaciones JOIN
funcionan bien porque se pueden ejecutar en paralelo.
Al igual que cualquier otro tipo de aleatorización, el desequilibrio de datos afecta negativamente al rendimiento. En el ejemplo anterior, los huevos se compran con mucha más frecuencia que el pollo o la leche, lo que significa que el ejecutor que se une a las compras de huevos hace más trabajo que los otros ejecutores. Si observas que un JOIN
está sesgado, hay dos formas de mejorar el rendimiento.
Dividir automáticamente las particiones sesgadas
Con la ejecución adaptativa de consultas, las desviaciones muy grandes se gestionarán automáticamente.
En cuanto un JOIN
produce algunas particiones mucho más grandes que otras, se dividen en particiones más pequeñas. Para confirmar que tienes habilitada la ejecución adaptativa de consultas, consulta Ajuste automático.
Usar una JOIN
en memoria
Se puede realizar una JOIN
en memoria si uno de los lados de la JOIN
es lo suficientemente pequeño como para caber en la memoria. En esta situación, el conjunto de datos pequeño se carga en la memoria y, a continuación, se difunde a todos los ejecutores. El conjunto de datos grande no se aleatoriza en absoluto, lo que elimina las particiones desiguales que se generan al aleatorizar en la clave JOIN
.
En el ejemplo anterior, el conjunto de datos de elementos se carga primero en la memoria del controlador de Spark. A continuación, se emite a cada ejecutor. Ahora, los ejecutores pueden combinar los datos sin aleatorizar el conjunto de datos de compra.
Con este enfoque, debes asignar suficiente memoria al controlador y a los ejecutores de Spark para que puedan almacenar el conjunto de datos de difusión en la memoria. De forma predeterminada, Spark reserva algo menos del 30% de su memoria para almacenar este tipo de datos. Cuando se usan JOIN
s en memoria, multiplica el tamaño del conjunto de datos por cuatro y
establece ese valor como memoria del ejecutor y del controlador. Por ejemplo, si el tamaño del conjunto de datos de elementos fuera de 1 GB, tendríamos que asignar al menos 4 GB a la memoria del ejecutor y del controlador. Los conjuntos de datos de más de 8 GB no se pueden cargar en la memoria.
Distribución de claves
Cuando ambos lados de JOIN
son demasiado grandes para caber en la memoria, se puede usar otra técnica para dividir cada clave JOIN
en varias claves y aumentar el nivel de paralelismo. Esta técnica se puede aplicar a las operaciones INNER JOIN
y LEFT OUTER JOIN
. No se puede usar para operaciones de FULL OUTER JOIN
.
Con este método, el lado sesgado se cifra con una nueva columna de números enteros con un número aleatorio del 1 al N. El lado sin sesgar se descompone, y cada fila genera N
filas nuevas. Se añade una nueva columna al lado desglosado, que se rellena con cada número del 1 al N. A continuación, se realiza una JOIN
normal, excepto que la nueva columna se añade como parte de la clave JOIN
. De esta forma, todos los datos que antes se enviaban a una sola partición ahora se distribuyen en hasta N
particiones diferentes.
En el ejemplo anterior, el factor de distribución N
se ha definido como 3
. Los conjuntos de datos originales se muestran a la izquierda. Las versiones saladas y explotadas del conjunto de datos se muestran en el centro. Los datos aleatorios se muestran a la derecha, con tres ejecutores diferentes que se unen a las compras de huevos en lugar de uno.
Para conseguir un mayor paralelismo, se aumentan las distribuciones. Sin embargo, esto tiene el coste de que se expanda un lado de la JOIN
, lo que provoca que se barajen más datos en el clúster. Por este motivo, la ventaja disminuye a medida que aumenta la distribución. En la mayoría de los casos, debe ser 20 o menos.
Siguientes pasos
- Consulta más información sobre el procesamiento en paralelo en Cloud Data Fusion.