Os pipelines são executados em clusters de máquinas. Eles atingem uma alta capacidade de processamento dividir o trabalho que precisa ser feito e executá-lo em em paralelo nos vários executores espalhados pelo cluster. Em geral, quanto maior o número de divisões (também chamadas de partições), mais rápido pipeline pode ser executado. O nível de paralelismo do pipeline é determinado pela as origens e os estágios de embaralhamento no pipeline.
Fontes
No início de cada execução de pipeline, todas as origens no pipeline calculam quais dados precisam ser lidos e como eles podem ser divididos. Por exemplo, considere um pipeline básico que lê do Cloud Storage, realiza algumas transformações do Wrangler e grava de volta no Cloud Storage.
Quando o pipeline é iniciado, a origem do Cloud Storage examina a entrada arquivos e os divide em divisões com base no tamanho do arquivo. Por exemplo, um arquivo de um gigabyte pode ser dividido em 100 divisões de 10 MB. Cada executor lê os dados da divisão, executa as transformações do Wrangler e grava a saída em um arquivo part.
Se o pipeline estiver sendo executado lentamente, uma das primeiras coisas a verificar é se suas origens estão criando divisões suficientes para aproveitar ao máximo o paralelismo. Por exemplo, alguns tipos de compactação tornam os arquivos de texto simples indivisíveis. Se você estiver lendo arquivos compactados com gzip, seu pipeline vai ser executado muito mais lentamente do que se você estivesse lendo arquivos descompactados ou compactados com BZIP (que é divisível). Da mesma forma, se você estiver usando a fonte do banco de dados e tiver configurado para usar apenas uma divisão, ela será executada muito mais lentamente do que se você configurar para usar mais divisões.
Embaralhamentos
Alguns tipos de plug-ins fazem com que os dados sejam misturados no cluster. Isso acontece quando os registros processados por um executor precisam ser enviados a outro executor para executar a computação. As ordenações aleatórias são operações caras porque envolvem muitas E/S. Os plug-ins que causam o embaralhamento de dados aparecem em a seção Analytics do Pipeline Studio. Isso inclui plug-ins como "Group By", "Deduplicate", "Distinct" e "Joiner". Por exemplo, suponha que uma opção Group By é adicionado ao pipeline no exemplo anterior.
Suponha também que os dados lidos representem compras feitas em um mercado.
Cada registro contém um campo item
e um campo num_purchased
. No estágio Agrupar por, configuramos o pipeline para agrupar registros no campo item
e calcular a soma do campo num_purchased
.
Quando o pipeline é executado, os arquivos de entrada são divididos conforme descrito anteriormente. Depois Dessa forma, cada registro é embaralhado no cluster de modo que todos os registros com a mesmo item pertence ao mesmo executor.
Conforme ilustrado no exemplo anterior, os registros de compras de maçã foram originalmente distribuído entre vários executores. Para realizar a agregação, desses registros precisavam ser enviados no cluster para o mesmo executor.
A maioria dos plug-ins que exigem uma ordem aleatória permite especificar o número de partições. para usar no embaralhamento de dados. Isso controla quantos executores são usados para processar os dados embaralhados.
No exemplo anterior, se o número de partições for definido como 2
, cada executor vai calcular agregações para dois itens, em vez de um.
É possível diminuir o paralelismo do pipeline depois disso fase de Por exemplo, considere a visualização lógica do pipeline:
Se a fonte dividir os dados em 500 partições, mas o agrupamento por vai usar 200 partições, o nível máximo de paralelismo após o agrupamento por vai cair de 500 para 200. Em vez de 500 arquivos de partes diferentes gravados em Cloud Storage, você só terá 200.
Como escolher as partições
Se o número de partições for muito baixo, você não usará a capacidade total seu cluster para carregar o máximo de trabalho em paralelo. Definir as partições muito altas aumenta a quantidade de sobrecarga desnecessária. Em geral, é melhor usar muitas partições do que poucas. A sobrecarga extra é algo com que se preocupar se o seu pipeline demorar alguns minutos para ser executado e você estiver tentando eliminar um alguns minutos. Se o pipeline levar horas para ser executado, a sobrecarga geralmente não é algo com o qual você precisa se preocupar.
Uma forma útil, mas simplista demais de determinar o número de partições para
usado é defini-lo como max(cluster CPUs, input records / 500,000)
. Em outras palavras, divida o número de registros de entrada por 500.000. Se esse número for
que seja maior que o número de CPUs do cluster, use-o para o número de partições.
Caso contrário, use o número de CPUs do cluster. Por exemplo, se o cluster tiver
100 CPUs e a etapa de embaralhamento tiver 100 milhões de registros
de entrada, use 200 partições.
Uma resposta mais completa é que os embaralhamentos têm melhor desempenho quando os dados de embaralhamento de cada partição podem caber completamente na memória do executor. para que nada precise ser derramado no disco. O Spark reserva pouco menos de 30% do memória do executor para armazenar dados de embaralhamento. O número exato é (memória total - 300 MB) * 30%. Se presumirmos que cada executor está configurado para usar 2 GB de memória, Isso significa que cada partição não pode conter mais de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Se presumimos que cada registro é compactado para 1 KB, então (500 MB / partição) / (1 KB / registro) = 500.000 registros por partição. Se os executores estiverem usando mais memória ou se os registros forem menores, ajuste esse número de acordo.
Desvio de dados
No exemplo anterior, as compras de vários itens foram feitas de maneira uniforme distribuídos. Ou seja, houve três compras de maçãs, bananas cenouras e ovos. A ordenação aleatória em uma chave distribuída de maneira uniforme é o tipo de ordenação aleatória com melhor desempenho, mas muitos conjuntos de dados não têm essa propriedade. Continuando o compra de supermercado no exemplo anterior, você esperaria ter muitos mais compras de ovos do que cartões de casamento. Quando há algumas mudanças que são muito mais comuns do que outras, você está lidando com chaves dados. Os dados distorcidos podem ter um desempenho significativamente pior do que os dados não distorcidos, porque uma quantidade desproporcional de trabalho está sendo realizada por um pequeno número de executores. Isso faz com que um pequeno subconjunto de partições seja muito maior do que todas as demais.
Nesse exemplo, há cinco vezes mais compras de ovos do que de cartões, o que significa que o agregado de ovos leva cerca de cinco vezes mais tempo para ser calculado. Ela não importa muito ao lidar com apenas 10 registros em vez de dois, mas faz uma grande diferença ao lidar com cinco bilhões de registros em vez de um bilhão. Quando há distorção de dados, o número de partições usadas em uma mistura não tem um grande impacto no desempenho do pipeline.
É possível reconhecer a distorção dos dados examinando o gráfico de registros de saída ao longo do tempo. Se o estágio estiver gerando registros em um ritmo muito mais elevado no início do ao executar um pipeline e, de repente, desacelera. Isso pode significar que os dados estão distorcidos.
Também é possível reconhecer o desvio de dados examinando o uso da memória do cluster ao longo do tempo. Se seu cluster atingiu a capacidade por algum tempo, mas de repente começou a usar pouca memória para um período, isso também é um sinal de que você está lidando com desvios de dados.
Os dados desviados afetam mais o desempenho quando uma mesclagem está sendo feita.
realizada. Algumas técnicas podem ser usadas para melhorar o desempenho
para junções distorcidas. Para mais informações, consulte
Processamento paralelo para operações JOIN
.
Ajuste adaptável para execução
Para ajustar a execução de maneira adaptável, especifique o intervalo de partições a serem usadas, não o e o número exato da partição. O número exato da partição, mesmo que definido na configuração do pipeline, é ignorado quando a execução adaptativa é ativada.
Se você estiver usando um cluster temporário do Dataproc, O Cloud Data Fusion define a configuração adequada automaticamente, mas para do Dataproc ou do Hadoop, as próximas duas configurações podem ser definidos:
spark.default.parallelism
: defina como o número total de vCores disponíveis no cluster. Isso garante que o cluster não seja descarregado e define o limite inferior para o número de partições.spark.sql.adaptive.coalescePartitions.initialPartitionNum
: definir como 32x do número de vCores disponíveis no cluster. Isso define a parte superior é vinculado ao número de partições.Spark.sql.adaptive.enabled
: para ativar as otimizações, defina esse valor comotrue
. O Dataproc a define automaticamente, mas se você estiver usando clusters genéricos do Hadoop, verifique se ele está ativado .
Esses parâmetros podem ser definidos na configuração do mecanismo de um pipeline ou nas propriedades do cluster de um balanceador de carga do Dataproc cluster.
A seguir
- Saiba mais sobre o processamento paralelo para operações
JOIN
.