Processamento paralelo

Os pipelines são executados em clusters de máquinas. Eles alcançam um alto throughput dividindo o trabalho que precisa ser feito e executando-o em paralelo nos vários executores espalhados pelo cluster. Em geral, quanto maior o número de divisões (também chamadas de partições), mais rápido o pipeline pode ser executado. O nível de paralelismo no pipeline é determinado pelas origens e estágios de embaralhamento no pipeline.

Fontes

No início de cada execução de pipeline, todas as origens no pipeline calculam quais dados precisam ser lidos e como eles podem ser divididos. Por exemplo, considere um pipeline básico que lê do Cloud Storage, realiza algumas transformações do Wrangler e grava de volta no Cloud Storage.

Pipeline básico mostrando a origem do Cloud Storage, a transformação do Wrangler e a sink do Cloud Storage

Quando o pipeline é iniciado, a origem do Cloud Storage examina os arquivos de entrada e os divide com base no tamanho deles. Por exemplo, um arquivo de um gigabyte pode ser dividido em 100 divisões de 10 MB. Cada executor lê os dados da divisão, executa as transformações do Wrangler e grava a saída em um arquivo part.

Dados particionados no Cloud Storage em transformações paralelas do Wrangler em arquivos de parte

Se o pipeline estiver lento, uma das primeiras coisas a verificar é se as origens estão criando divisões suficientes para aproveitar ao máximo o paralelismo. Por exemplo, alguns tipos de compactação impedem a divisão de arquivos de texto simples. Se você estiver lendo arquivos compactados com gzip, seu pipeline vai ser executado muito mais lentamente do que se você estivesse lendo arquivos descompactados ou compactados com BZIP (que é divisível). Da mesma forma, se você estiver usando a fonte do banco de dados e tiver configurado para usar apenas uma divisão, ela será executada muito mais lentamente do que se você configurar para usar mais divisões.

Embaralhamentos

Alguns tipos de plug-ins fazem com que os dados sejam misturados no cluster. Isso acontece quando os registros processados por um executor precisam ser enviados a outro executor para realizar o cálculo. As ordenações aleatórias são operações caras porque envolvem muitas E/S. Os plug-ins que causam a mistura de dados aparecem na seção Analytics do Pipeline Studio. Eles incluem plug-ins como Group By, Deduplicate, Distinct e Joiner. Por exemplo, suponha que uma etapa Group By seja adicionada ao pipeline no exemplo anterior.

Suponha também que os dados lidos representem compras feitas em um supermercado. Cada registro contém um campo item e um campo num_purchased. No estágio Agrupar por, configuramos o pipeline para agrupar registros no campo item e calcular a soma do campo num_purchased.

Quando o pipeline é executado, os arquivos de entrada são divididos conforme descrito anteriormente. Depois disso, cada registro é embaralhado no cluster para que cada registro com o mesmo item pertença ao mesmo executor.

Como ilustrado no exemplo anterior, os registros de compras da Apple foram originalmente distribuídos entre vários executores. Para realizar a agregação, todos esses registros precisavam ser enviados pelo cluster para o mesmo executor.

A maioria dos plug-ins que exigem uma ordem aleatória permitem que você especifique o número de partições a serem usadas ao embaralhar os dados. Isso controla quantos executores são usados para processar os dados embaralhados.

No exemplo anterior, se o número de partições for definido como 2, cada executor vai calcular agregações para dois itens em vez de um.

É possível diminuir o paralelismo do pipeline após essa etapa. Por exemplo, considere a visualização lógica do pipeline:

Se a fonte dividir os dados em 500 partições, mas o agrupamento por vai usar 200 partições, o nível máximo de paralelismo depois do agrupamento por vai cair de 500 para 200. Em vez de 500 arquivos de parte diferentes gravados no Cloud Storage, você tem apenas 200.

Como escolher partições

Se o número de partições for muito baixo, você não vai usar toda a capacidade do cluster para paralelizar o máximo de trabalho possível. Definir as partições muito altas aumenta a quantidade de sobrecarga desnecessária. Em geral, é melhor usar muitas partições do que poucas. O overhead extra é algo com que se preocupar se o pipeline leva alguns minutos para ser executado e você está tentando economizar alguns minutos. Se o pipeline leva horas para ser executado, geralmente não é necessário se preocupar com a sobrecarga.

Uma maneira útil, mas muito simplista, de determinar o número de partições a ser usado é definir como max(cluster CPUs, input records / 500,000). Em outras palavras, divida o número de registros de entrada por 500.000. Se esse número for maior que o número de CPUs do cluster, use esse número para o número de partições. Caso contrário, use o número de CPUs do cluster. Por exemplo, se o cluster tiver 100 CPUs e a etapa de embaralhamento tiver 100 milhões de registros de entrada, use 200 partições.

Uma resposta mais completa é que os embaralhamentos têm melhor desempenho quando os dados de embaralhamento intermediário de cada partição podem caber completamente na memória de um executor para que nada precise ser despejado no disco. O Spark reserva pouco menos de 30% da memória de um executor para armazenar dados de shuffle. O número exato é (memória total - 300 MB) * 30%. Se presumirmos que cada executor está configurado para usar 2 GB de memória, isso significa que cada partição não pode conter mais do que (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Se presumirmos que cada registro é comprimido para 1 KB, isso significa (500 MB / partição) / (1 KB / registro) = 500.000 registros por partição. Se os executores estiverem usando mais memória ou se os registros forem menores, ajuste esse número de acordo.

Desvio de dados

No exemplo anterior, as compras de vários itens foram distribuídas de maneira uniforme. Ou seja, houve três compras de cada um dos itens: maçãs, bananas, cenouras e ovos. A ordenação em uma chave distribuída de maneira uniforme é o tipo de ordenação com melhor desempenho, mas muitos conjuntos de dados não têm essa propriedade. Continuando com a compra de supermercado no exemplo anterior, você esperaria ter muito mais compras de ovos do que de convites de casamento. Quando há algumas chaves de embaralhamento muito mais comuns do que outras, você está lidando com dados desequilibrados. Os dados distorcidos podem ter um desempenho significativamente pior do que os dados não distorcidos, porque uma quantidade desproporcional de trabalho está sendo realizada por um pequeno número de executores. Isso faz com que um pequeno subconjunto de partições seja muito maior do que todas as demais.

Nesse exemplo, há cinco vezes mais compras de ovos do que de cartões, o que significa que o agregado de ovos leva cerca de cinco vezes mais tempo para ser calculado. Isso não é muito importante quando se trata de apenas 10 registros, em vez de dois, mas faz uma grande diferença quando se trata de cinco bilhões de registros em vez de um bilhão. Quando há distorção de dados, o número de partições usadas em uma mistura aleatória não tem um grande impacto no desempenho do pipeline.

É possível reconhecer a distorção dos dados examinando o gráfico para registros de saída ao longo do tempo. Se a etapa estiver gerando registros em um ritmo muito mais rápido no início da execução do pipeline e, de repente, ficar mais lenta, isso pode significar que há dados distorcidos.

Também é possível reconhecer o desvio de dados examinando o uso da memória do cluster ao longo do tempo. Se o cluster estiver com capacidade por algum tempo, mas de repente tiver um uso de memória baixo por um período, isso também é um sinal de que você está lidando com distorção de dados.

Os dados distorcidos afetam de forma mais significativa a performance quando uma mesclagem está sendo realizada. Há algumas técnicas que podem ser usadas para melhorar o desempenho de mesclagens distorcidas. Para mais informações, consulte Processamento paralelo para operações JOIN.

Ajuste adaptativo para execução

Para ajustar a execução de forma adaptativa, especifique o intervalo de partições a serem usadas, não o número exato da partição. O número exato da partição, mesmo que definido na configuração do pipeline, é ignorado quando a execução adaptativa está ativada.

Se você estiver usando um cluster do Dataproc temporário, o Cloud Data Fusion vai definir a configuração adequada automaticamente. No entanto, para clusters estáticos do Dataproc ou do Hadoop, os dois parâmetros de configuração a seguir podem ser definidos:

  • spark.default.parallelism: defina como o número total de vCores disponíveis no cluster. Isso garante que o cluster não seja descarregado e define o limite inferior para o número de partições.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: defina como 32 vezes o número de vCores disponíveis no cluster. Isso define o limite superior para o número de partições.
  • Spark.sql.adaptive.enabled: para ativar as otimizações, defina esse valor como true. O Dataproc define automaticamente, mas, se você estiver usando clusters genéricos do Hadoop, é necessário garantir que ele esteja ativado .

Esses parâmetros podem ser definidos na configuração do mecanismo de um pipeline específico ou nas propriedades do cluster de um cluster estático do Dataproc.

A seguir