Processamento paralelo

Os pipelines são executados em clusters de máquinas. Alcançam um elevado débito dividindo o trabalho que tem de ser feito e, em seguida, executando o trabalho em paralelo nos vários executores distribuídos pelo cluster. Em geral, quanto maior for o número de divisões (também denominadas partições), mais rapidamente a pipeline pode ser executada. O nível de paralelismo no pipeline é determinado pelas origens e pelas fases de aleatorização no pipeline.

Fontes

No início de cada execução do pipeline, cada origem no pipeline calcula os dados que têm de ser lidos e como esses dados podem ser divididos em divisões. Por exemplo, considere um pipeline básico que lê a partir do Cloud Storage, faz algumas transformações do Wrangler e, em seguida, escreve novamente no Cloud Storage.

Pipeline básico que mostra a origem do Cloud Storage, a transformação do Wrangler e o destino do Cloud Storage

Quando o pipeline é iniciado, a origem do Cloud Storage examina os ficheiros de entrada e divide-os em divisões com base nos tamanhos dos ficheiros. Por exemplo, um ficheiro de um gigabyte pode ser dividido em 100 partes, cada uma com 10 MB. Cada executor lê os dados dessa divisão, executa as transformações do Wrangler e, em seguida, escreve o resultado num ficheiro part.

Dados particionados no Cloud Storage em transformações paralelas do Wrangler em ficheiros de partes

Se o pipeline estiver a ser executado lentamente, uma das primeiras coisas a verificar é se as suas origens estão a criar divisões suficientes para tirar total partido do paralelismo. Por exemplo, alguns tipos de compressão tornam os ficheiros de texto simples indivisíveis. Se estiver a ler ficheiros que foram comprimidos com gzip, pode reparar que o seu pipeline é executado muito mais lentamente do que se estivesse a ler ficheiros não comprimidos ou ficheiros comprimidos com BZIP (que é divisível). Da mesma forma, se estiver a usar a origem da base de dados e a tiver configurado para usar apenas uma divisão, é executada muito mais lentamente do que se a configurar para usar mais divisões.

Modo aleatório

Determinados tipos de plug-ins fazem com que os dados sejam baralhados no cluster. Isto acontece quando os registos processados por um executor têm de ser enviados para outro executor para realizar o cálculo. As misturas são operações dispendiosas porque envolvem muitas operações de I/O. Os plug-ins que fazem com que os dados sejam baralhados aparecem todos na secção Analytics do Pipeline Studio. Estes incluem plug-ins, como Group By, Deduplicate, Distinct e Joiner. Por exemplo, suponhamos que é adicionada uma fase Agrupar por ao pipeline no exemplo anterior.

Suponhamos também que os dados lidos representam compras feitas num supermercado. Cada registo contém um campo item e um campo num_purchased. Na fase Agrupar por, configuramos o pipeline para agrupar registos no campo item e calcular a soma do campo num_purchased.

Quando o pipeline é executado, os ficheiros de entrada são divididos conforme descrito anteriormente. Depois disso, cada registo é baralhado no cluster de forma que todos os registos com o mesmo item pertençam ao mesmo executor.

Conforme ilustrado no exemplo anterior, os registos de compras de maçãs foram originalmente distribuídos por vários executores. Para realizar a agregação, todos esses registos tinham de ser enviados através do cluster para o mesmo executor.

A maioria dos plug-ins que requer uma mistura permite-lhe especificar o número de partições a usar quando mistura os dados. Este parâmetro controla o número de executores usados para processar os dados aleatorizados.

No exemplo anterior, se o número de partições estiver definido como 2, cada executor calcula os dados agregados para dois itens em vez de um.

Tenha em atenção que é possível diminuir o paralelismo do pipeline após essa fase. Por exemplo, considere a vista lógica do pipeline:

Se a origem dividir os dados em 500 partições, mas o Group By baralhar os dados com 200 partições, o nível máximo de paralelismo após o Group By desce de 500 para 200. Em vez de 500 ficheiros de partes diferentes escritos no Cloud Storage, só tem 200.

Escolher partições

Se o número de partições for demasiado baixo, não vai estar a usar a capacidade total do cluster para paralelizar o máximo de trabalho possível. A definição de partições demasiado elevada aumenta a quantidade de sobrecarga desnecessária. Em geral, é melhor usar demasiadas partições do que poucas. A sobrecarga adicional é algo com que se deve preocupar se a sua pipeline demorar alguns minutos a ser executada e estiver a tentar reduzir alguns minutos. Se o seu pipeline demorar horas a ser executado, geralmente, não tem de se preocupar com a sobrecarga.

Uma forma útil, mas demasiado simplista, de determinar o número de partições a usar é defini-lo como max(cluster CPUs, input records / 500,000). Por outras palavras, pegue no número de registos de entrada e divida-o por 500 000. Se esse número for superior ao número de CPUs do cluster, use-o para o número de partições. Caso contrário, use o número de CPUs do cluster. Por exemplo, se o cluster tiver 100 CPUs e a fase de mistura tiver 100 milhões de registos de entrada, use 200 partições.

Uma resposta mais completa é que as misturas têm o melhor desempenho quando os dados de mistura intermédios para cada partição cabem completamente na memória de um executor, para que nada precise de ser derramado no disco. O Spark reserva pouco menos de 30% da memória de um executor para armazenar dados de mistura. O número exato é (memória total - 300 MB) * 30%. Se assumirmos que cada executor está definido para usar 2 GB de memória, isso significa que cada partição não deve conter mais de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registos. Se assumirmos que cada registo é comprimido para 1 KB, isso significa que (500 MB / partição) / (1 KB/registo) = 500 000 registos por partição. Se os executores estiverem a usar mais memória ou os registos forem mais pequenos, pode ajustar este número em conformidade.

Desvio de dados

Tenha em atenção que, no exemplo anterior, as compras de vários artigos foram distribuídas uniformemente. Ou seja, houve três compras de maçãs, bananas, cenouras e ovos. A ordenação aleatória numa chave distribuída uniformemente é o tipo de ordenação aleatória com melhor desempenho, mas muitos conjuntos de dados não têm esta propriedade. Continuando com o exemplo da compra de uma loja de mercearia, seria de esperar que tivesse muito mais compras de ovos do que de cartões de casamento. Quando existem algumas chaves de aleatorização muito mais comuns do que outras chaves, está a lidar com dados enviesados. Os dados distorcidos podem ter um desempenho significativamente pior do que os dados não distorcidos, porque um número desproporcionado de tarefas está a ser realizado por um pequeno número de executores. Faz com que um pequeno subconjunto de partições seja muito maior do que todas as outras.

Neste exemplo, existem cinco vezes mais compras de ovos do que compras de cartões, o que significa que a agregação de ovos demora aproximadamente cinco vezes mais a calcular. Não faz muita diferença quando trabalha apenas com 10 registos, em vez de dois, mas faz uma grande diferença quando trabalha com cinco mil milhões de registos, em vez de mil milhões. Quando tem uma distorção de dados, o número de partições usadas numa ordenação aleatória não tem um grande impacto no desempenho do pipeline.

Pode reconhecer a distorção de dados examinando o gráfico de registos de saída ao longo do tempo. Se a fase estiver a gerar registos a um ritmo muito mais elevado no início da execução do pipeline e, de repente, abrandar, isto pode significar que tem dados distorcidos.

Também pode reconhecer a distorção de dados examinando a utilização da memória do cluster ao longo do tempo. Se o cluster estiver no limite da capacidade durante algum tempo, mas, de repente, tiver uma utilização de memória baixa durante um período, isto também é um sinal de que está a lidar com a distorção de dados.

Os dados distorcidos afetam o desempenho de forma mais significativa quando é feita uma junção. Existem algumas técnicas que podem ser usadas para melhorar o desempenho para junções enviesadas. Para mais informações, consulte o artigo Processamento paralelo para operações JOIN.

Ajuste adaptável para execução

Para ajustar a execução de forma adaptativa, especifique o intervalo de partições a usar e não o número exato da partição. O número exato da partição, mesmo que definido na configuração do pipeline, é ignorado quando a execução adaptativa está ativada.

Se estiver a usar um cluster do Dataproc efémero, o Cloud Data Fusion define automaticamente a configuração adequada, mas para clusters estáticos do Dataproc ou Hadoop, é possível definir os dois parâmetros de configuração seguintes:

  • spark.default.parallelism: defina-o como o número total de vCores disponíveis no cluster. Isto garante que o cluster não está com carga insuficiente e define o limite inferior para o número de partições.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: defina-o como 32 vezes o número de vCores disponíveis no cluster. Isto define o limite superior para o número de partições.
  • Spark.sql.adaptive.enabled: para ativar as otimizações, defina este valor como true. O Dataproc define-o automaticamente, mas se estiver a usar clusters Hadoop genéricos, tem de garantir que está ativado .

Estes parâmetros podem ser definidos na configuração do motor de um pipeline específico ou nas propriedades do cluster de um cluster do Dataproc estático.

O que se segue?