Processamento paralelo

Os pipelines são executados em clusters de máquinas. Elas atingem uma alta capacidade de processamento dividir o trabalho que precisa ser feito e executá-lo em em paralelo nos vários executores espalhados pelo cluster. Em geral, quanto maior o número de divisões (também chamadas de partições), mais rápido pipeline pode ser executado. O nível de paralelismo do pipeline é determinado pela as origens e os estágios de embaralhamento no pipeline.

Fontes

No início de cada execução de pipeline, cada origem no pipeline calcula o que dados precisam ser lidos e como esses dados podem ser divididos. Para exemplo, considere um pipeline básico que faz leituras do Cloud Storage, executa algumas transformações do Wrangler e, em seguida, grava de volta no Cloud Storage.

Pipeline básico mostrando a origem do Cloud Storage, a transformação do Wrangler e o coletor do Cloud Storage

Quando o pipeline é iniciado, a origem do Cloud Storage examina a entrada arquivos e os divide em divisões com base no tamanho do arquivo. Por exemplo, um arquivo de 1 Gbps pode ser dividido em 100 divisões, cada uma com 10 MB tamanho. Cada executor lê os dados dessa divisão e executa o Wrangler. transformações e grava a saída em um arquivo part.

Dados particionados no Cloud Storage em transformações paralelas do Wrangler em arquivos partes

Se o pipeline estiver sendo executado lentamente, uma das primeiras coisas a verificar é se suas origens estão criando divisões suficientes para aproveitar ao máximo o paralelismo. Por exemplo, alguns tipos de compactação tornam os arquivos de texto simples indivisíveis. Se você ler arquivos que foram compactados em gzip, talvez você perceba que o pipeline é muito mais lento do que se estivesse lendo arquivos não compactados compactado com BZIP (que é divisível). Da mesma forma, se você estiver usando o fonte de banco de dados e o configurou para usar apenas uma divisão, ele executa muito mais lento do que se você o configurasse para usar mais divisões.

Embaralhamentos

Certos tipos de plug-ins fazem com que os dados sejam embaralhados no cluster. Isso acontece quando os registros processados por um executor precisam ser enviados a outro executor para realizar a computação. Os embaralhamentos são operações caras porque porque envolvem muita E/S. Os plug-ins que causam o embaralhamento de dados aparecem em a seção Analytics do Pipeline Studio. Isso inclui plug-ins, como Agrupar por, Eliminar duplicação, Distinto e Combinador. Por exemplo, suponha que uma opção Group By é adicionado ao pipeline no exemplo anterior.

Suponha também que os dados lidos representam compras feitas em um mercado. Cada registro contém um campo item e um num_purchased. No campo Grupo Por etapa, configuramos o pipeline para agrupar registros no campo item e calcular a soma do campo num_purchased.

Quando o pipeline é executado, os arquivos de entrada são divididos conforme descrito anteriormente. Depois Dessa forma, cada registro é embaralhado no cluster de modo que todos os registros com a mesmo item pertence ao mesmo executor.

Conforme ilustrado no exemplo anterior, os registros de compras de maçã foram originalmente distribuído entre vários executores. Para realizar a agregação, desses registros precisavam ser enviados no cluster para o mesmo executor.

A maioria dos plug-ins que exigem uma ordem aleatória permite especificar o número de partições. para usar no embaralhamento de dados. Isso controla quantos executores são usados para processar os dados embaralhados.

No exemplo anterior, se o número de partições for definido como 2, cada executor vai calcular agregações para dois itens, em vez de um.

É possível diminuir o paralelismo do pipeline depois disso fase de Por exemplo, considere a visualização lógica do pipeline:

Se a origem dividir os dados em 500 partições, mas a função "Agrupar por" for embaralhada usando 200 partições, o nível máximo de paralelismo após "agrupar por" cai de de 500 a 200. Em vez de 500 arquivos de partes diferentes gravados em Cloud Storage, você só terá 200.

Como escolher partições

Se o número de partições for muito baixo, você não usará a capacidade total seu cluster para carregar o máximo de trabalho em paralelo. Como definir as partições também alto aumenta a quantidade de overhead desnecessária. Em geral, é melhor usar muitas partições do que poucas. A sobrecarga extra é algo com que se preocupar se o seu pipeline demorar alguns minutos para ser executado e você estiver tentando eliminar um alguns minutos. Se o pipeline levar horas para ser executado, a sobrecarga geralmente não é algo com o qual você precisa se preocupar.

Uma forma útil, mas simplista demais de determinar o número de partições para usado é defini-lo como max(cluster CPUs, input records / 500,000). Em outras palavras, pegue o número de registros de entrada e divida por 500.000. Se esse número for que seja maior que o número de CPUs do cluster, use-o para o número de partições. Caso contrário, use o número de CPUs do cluster. Por exemplo, se o cluster tiver 100 CPUs, e o estágio de embaralhamento terá 100 milhões de entradas registros, use 200 partições.

Uma resposta mais completa é que os embaralhamentos têm melhor desempenho quando os dados de embaralhamento de cada partição podem caber completamente na memória do executor. para que nada precise ser derramado no disco. O Spark reserva pouco menos de 30% do memória do executor para armazenar dados de embaralhamento. O número exato é (memória total - 300 MB) * 30%. Se presumirmos que cada executor está configurado para usar 2 GB de memória, Isso significa que cada partição não pode conter mais de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Se presumimos que cada registro é compactado para 1 KB, o que significa (500 MB / partição) / (1 KB / registro) = 500.000 registros por partição. Se os executores estiverem usando mais memória ou seus registros forem menores, ajuste esse número.

Desvio de dados

No exemplo anterior, as compras de vários itens foram feitas de maneira uniforme distribuídos. Ou seja, houve três compras de maçãs, bananas cenouras e ovos. O embaralhamento em uma tecla distribuída de maneira uniforme tem o melhor desempenho tipo de embaralhamento, mas muitos conjuntos de dados não têm essa propriedade. Continuando o compra de supermercado no exemplo anterior, você esperaria ter muitos mais compras de ovos do que cartões de casamento. Quando há algumas mudanças que são muito mais comuns do que outras, você está lidando com chaves dados. Dados desviados podem ter um desempenho significativamente pior do que dados não distorcidos, porque uma que uma quantidade desproporcional de trabalho está sendo realizada por um pequeno punhado executores. Isso faz com que um pequeno subconjunto de partições seja muito maior do que todas outros.

Neste exemplo, há cinco vezes mais compras de ovos do que compras com cartão, o que significa que o agregado de ovo leva cerca de cinco vezes mais tempo para ser calculado. Ela não importa muito ao lidar com apenas 10 registros em vez de dois, mas faz uma grande diferença ao lidar com cinco bilhões de registros em vez de um bilhão. Quando há desvio de dados, o número de partições usadas em um embaralhamento não têm um grande impacto no desempenho do pipeline.

Para reconhecer o desvio dos dados, procure no gráfico os registros de saída ao longo do tempo. Se o estágio estiver gerando registros em um ritmo muito mais elevado no início do execução de pipeline e, de repente, desacelera, isso pode significar que os dados estão distorcidos.

Também é possível reconhecer o desvio de dados examinando o uso da memória do cluster ao longo do tempo. Se seu cluster atingiu a capacidade por algum tempo, mas de repente ficou baixo no uso da memória por um período, isso também é um sinal de que você está lidando com desvios de dados.

Os dados desviados afetam mais o desempenho quando uma mesclagem está sendo feita. realizada. Algumas técnicas podem ser usadas para melhorar o desempenho para junções distorcidas. Para mais informações, consulte Processamento paralelo para operações JOIN.

Ajuste adaptável para execução

Para ajustar a execução de maneira adaptável, especifique o intervalo de partições a serem usadas, não o e o número exato da partição. O número exato da partição, mesmo se definido no pipeline é ignorada quando a execução adaptável é ativada.

Se você estiver usando um cluster temporário do Dataproc, O Cloud Data Fusion define a configuração adequada automaticamente, mas para do Dataproc ou do Hadoop, as próximas duas configurações podem ser definidos:

  • spark.default.parallelism: defina-o como o número total de vCores disponíveis. no cluster. Isso garante que o cluster não fique sobrecarregado e define os limite inferior para o número de partições.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: definir como 32x do número de vCores disponíveis no cluster. Isso define a parte superior é vinculado ao número de partições.
  • Spark.sql.adaptive.enabled: para ativar as otimizações, defina esse valor como true. O Dataproc a define automaticamente, mas se você estiver usando clusters genéricos do Hadoop, é preciso garantir que ele esteja ativado .

Esses parâmetros podem ser definidos na configuração do mecanismo de um pipeline ou nas propriedades do cluster de um balanceador de carga do Dataproc aglomerado.

A seguir