Esta página explica o ajuste do desempenho para operações JOIN
no Cloud Data Fusion.
As operações de JOIN
podem ser a parte mais cara de um pipeline. Tal como tudo o resto num pipeline, as operações são executadas em paralelo. O primeiro passo de uma
JOIN
é atribuir os dados aleatoriamente para que todos os registos com a mesma chave JOIN
sejam enviados
para o mesmo executor. Depois de todos os dados serem baralhados, são unidos e o resultado continua através do pipeline.
Exemplo de processamento paralelo em operações JOIN
Por exemplo, suponhamos que executa uma operação JOIN
em conjuntos de dados denominados Purchases
e Items
. Cada registo de compra contém o nome e o número de um item comprado. Cada registo de artigo contém o nome do artigo e o preço desse artigo. É realizada uma
JOIN
no nome do artigo para calcular o preço total de cada compra. Quando os dados são unidos, os dados são atribuídos aleatoriamente no cluster de forma que os registos com o mesmo ID acabem no mesmo executor.
Quando as chaves JOIN
estão distribuídas de forma relativamente uniforme, as operações JOIN
têm um bom desempenho porque podem ser executadas em paralelo.
Tal como qualquer aleatorização, a distorção de dados afeta negativamente o desempenho. No exemplo anterior, os ovos são comprados com muito mais frequência do que o frango ou o leite, o que significa que o executor que se junta às compras de ovos faz mais trabalho do que os outros executores. Se notar que um JOIN
está distorcido, existem duas formas de melhorar o desempenho.
Divida automaticamente partições enviesadas
Com a execução de consultas adaptativa, as distorções realmente pesadas são processadas automaticamente.
Assim que um JOIN
produz algumas partições muito maiores do que outras, estas são divididas em partições mais pequenas. Para confirmar que tem a execução de consultas adaptativa ativada,
consulte a secção Ajuste automático.
Use um JOIN
na memória
Pode ser realizada uma JOIN
na memória se um dos lados da JOIN
for suficientemente pequeno para caber na memória. Nesta situação, o pequeno conjunto de dados é carregado na memória e, em seguida, é transmitido para todos os executores. O conjunto de dados grande não é baralhado de todo, o que remove as partições desiguais geradas quando o baralhamento é feito na chave JOIN
.
No exemplo anterior, o conjunto de dados de artigos é carregado primeiro na memória do controlador do Spark. Em seguida, é transmitida a cada executor. Os executores podem agora juntar-se aos dados sem baralhar nenhum dos conjuntos de dados de compras.
Esta abordagem requer que atribua memória suficiente ao controlador do Spark e aos executores para lhes permitir armazenar o conjunto de dados de transmissão na memória. Por predefinição, o Spark reserva ligeiramente menos de 30% da respetiva memória para armazenar este tipo de dados. Quando usar JOIN
s na memória, multiplique o tamanho do conjunto de dados por quatro e
defina esse valor como a memória do executor e do controlador. Por exemplo, se o conjunto de dados de artigos tiver um tamanho de 1 GB, temos de definir a memória do executor e do controlador para, pelo menos, 4 GB. Não é possível carregar para a memória conjuntos de dados com mais de 8 GB.
Distribuição de chaves
Quando ambos os lados do JOIN
são demasiado grandes para caber na memória, pode usar-se uma técnica diferente para dividir cada chave JOIN
em várias chaves para aumentar o nível de paralelismo. Esta técnica pode ser aplicada a operações INNER JOIN
e LEFT OUTER JOIN
. Não pode ser usado para operações do FULL OUTER JOIN
.
Nesta abordagem, o lado enviesado é salgado com uma nova coluna de números inteiros com um número aleatório de 1 a N. O lado não distorcido é expandido, com cada linha existente a gerar N
novas linhas. É adicionada uma nova coluna ao lado expandido, preenchida com cada número de 1 a N. Em seguida, é realizada uma JOIN
normal, exceto que a nova coluna é adicionada como parte da chave JOIN
. Desta forma, todos os dados que eram enviados para uma única partição são agora distribuídos por até N
partições diferentes.
No exemplo anterior, o fator de distribuição N
está definido como 3
. Os conjuntos de dados originais são apresentados à esquerda. As versões com sal e expandidas do conjunto de dados são apresentadas no centro. Os dados aleatórios são apresentados à direita, com três executores diferentes a juntarem-se às compras de ovos, em vez de um.
O paralelismo é maior quando se aumentam as distribuições. No entanto, isto tem o custo de expandir um dos lados da JOIN
, o que resulta numa maior mistura de dados no cluster. Por este motivo, a vantagem diminui à medida que a distribuição aumenta. Na maioria das situações, defina este valor como 20 ou menos.
O que se segue?
- Saiba mais sobre o processamento paralelo no Cloud Data Fusion.