As seções a seguir fornecem dicas para ajudar você a ajustar seus aplicativos do Dataproc Spark.
Usar clusters temporários
Ao usar o modelo de cluster "transitório" do Dataproc, você cria um cluster dedicado para cada job e, quando ele é concluído, o cluster é excluído. Com o modelo temporário, é possível tratar o armazenamento e a computação separadamente, salvando dados de entrada e saída do job no Cloud Storage ou no BigQuery, usando o cluster apenas para computação e armazenamento de dados temporários.
Armadilhas de clusters permanentes
O uso de clusters temporários de um único job evita as seguintes armadilhas e possíveis problemas associados ao uso de clusters "persistentes" compartilhados e de longa duração:
- Pontos únicos de falha: um estado de erro de cluster compartilhado pode fazer com que todos os jobs falhem, bloqueando todo o pipeline de dados. A investigação e recuperação de um erro pode levar horas. Como os clusters temporários mantêm apenas estados temporários no cluster, quando ocorre um erro, eles podem ser excluídos e recriados rapidamente.
- Dificuldade em manter e migrar estados de cluster no HDFS, MySQL ou sistemas de arquivos locais
- Contenções de recursos entre jobs que afetam negativamente os SLOs
- Demônios de serviço sem resposta causados pela pressão de memória
- Acúmulo de registros e arquivos temporários que podem exceder a capacidade do disco
- Falha no aumento de escala devido ao esgotamento de estoque da zona do cluster
- Falta de suporte para versões desatualizadas da imagem do cluster.
Benefícios do cluster efêmero
Por outro lado, os clusters temporários permitem:
- Configure diferentes permissões do IAM para diferentes jobs com diferentes contas de serviço de VM do Dataproc.
- Otimize as configurações de hardware e software de um cluster para cada job, mudando as configurações do cluster conforme necessário.
- Faça upgrade das versões de imagem em novos clusters para receber os patches de segurança, correções de bugs e otimizações mais recentes.
- Resolva problemas mais rapidamente em um cluster isolado de job único.
- Economize custos pagando apenas pelo tempo de execução do cluster efêmero, não pelo tempo ocioso entre os jobs em um cluster compartilhado.
Usar o Spark SQL
A API DataFrame do Spaark SQL é uma otimização significativa da API RDD. Se você interagir com o código que usa RDDs, considere ler dados como um DataFrame antes de transmitir um RDD no código. Em código Java ou Scala, use a API Dataset Spark SQL como um superconjunto de RDDs e DataFrames.
Usar o Apache Spark 3
O Dataproc 2.0 instala o Spark 3, que inclui os seguintes recursos e melhorias de desempenho:
- Suporte a GPUs
- Capacidade de ler arquivos binários
- Melhorias de desempenho
- Remoção dinâmica de partições
- Execução de consulta adaptável, que otimiza jobs do Spark em tempo real
Usar alocação dinâmica
O Apache Spark inclui um recurso de alocação dinâmica que escalona
o número de executores do Spark nos workers de um cluster. Esse recurso permite que um job use o cluster completo do Dataproc, mesmo quando ele é escalonado verticalmente. Esse recurso é ativado por padrão no Dataproc (spark.dynamicAllocation.enabled
é definido como true
). Consulte
Alocação dinâmica do Spark
para mais informações.
Usar o escalonamento automático do Dataproc.
O escalonamento automático do Dataproc adiciona e remove dinamicamente workers do Dataproc de um cluster para garantir que os jobs do Spark tenham os recursos necessários para serem concluídos rapidamente.
É uma prática recomendada configurar a política de escalonamento automático para escalonar apenas workers secundários.
Use o modo de flexibilidade aprimorado do Dataproc
Os clusters com VMs preemptivas ou uma política de escalonamento automático podem receber exceções FetchFailed quando os workers forem interrompidos ou removidos antes de terminarem de exibir dados de embaralhamento para redutores. Essa exceção pode causar novas tentativas de tarefas e tempos de conclusão de jobs mais longos.
Recomendação: use o modo de flexibilidade aprimorado do Dataproc, que não armazena dados de embaralhamento intermediários em workers secundários, para que os workers secundários possam ser interrompidos ou reduzidos com segurança.
Configurar o particionamento e o embaralhamento
O Spark armazena dados em partições temporárias no cluster. Se o aplicativo agrupar ou mesclar DataFrames, os dados serão embaralhados em novas partições de acordo com o agrupamento e a configuração de baixo nível.
O particionamento de dados afeta significativamente o desempenho do aplicativo: um número muito pequeno de partições limita o paralelismo de jobs e a utilização de recursos do cluster. Muitas partições reduzem o job devido ao processamento e particionamento adicionais de partição.
Como configurar partições
As seguintes propriedades regem o número e o tamanho das suas partições:
spark.sql.files.maxPartitionBytes
: o tamanho máximo das partições referentes à leitura de dados do Cloud Storage. O padrão é 128 MB, o que é suficientemente grande para a maioria dos aplicativos que processam menos de 100 TB.spark.sql.shuffle.partitions
: o número de partições após a execução de uma ordem aleatória. O padrão é 200, o que é adequado para clusters com menos de 100 vCPUs. Recomendação: defina como três vezes o número de vCPUs no cluster.spark.default.parallelism
: o número de partições retornadas após a execução de transformações RDD que exigem embaralhamentos, comojoin
,reduceByKey
eparallelize
. O padrão é o número total de vCPUs no cluster. Ao usar RDDs em jobs do Spark, é possível definir esse número como três vezes suas vCPUs
Limitar o número de arquivos
Há uma perda de desempenho quando o Spark lê muitos arquivos pequenos. Armazene dados em arquivos maiores, por exemplo, tamanhos de arquivo entre 256 MB e 512 MB. Da mesma forma, limite o número de arquivos de saída. Para forçar uma reprodução aleatória, consulte Evitar embaralhamentos desnecessários.
Configurar execução de consulta adaptável (Spark 3)
A execução de consulta adaptável (ativada por padrão na versão de imagem 2.0 do Dataproc) fornece melhorias de desempenho do job do Spark, incluindo:
- Como separar partições após embaralhamentos
- Como converter mesclagens de mesclagem para mesclar junções
- Otimizações para junções de desvio.
Embora as configurações padrão sejam corretas para a maioria dos casos de uso,
definir spark.sql.adaptive.advisoryPartitionSizeInBytes
como
spark.sqlfiles.maxPartitionBytes
(padrão de 128 MB) pode ser útil.
Evite embaralhamentos desnecessários
O Spark permite que os usuários acionem manualmente uma ordem aleatória para reequilibrar os dados com
a função repartition
. Como os embaralhamentos são caros, a reprodução aleatória de dados precisa ser usada com cuidado. Definir as configurações de partição de maneira apropriada deve ser suficiente para permitir que o Spark particionar automaticamente os dados.
Exceção: ao gravar dados particionados em colunas no Cloud Storage, o reparticionamento em uma coluna específica evita a gravação de muitos arquivos pequenos para conseguir tempos de gravação mais rápidos.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Armazenar dados em Parquet ou Avro
Por padrão, o Spark SQL lê e grava dados em arquivos Parquet compactados Snappy. O Parquet está em um formato de arquivo em colunas eficiente que permite ao Spark ler apenas os dados necessários para executar um aplicativo. Essa é uma vantagem importante ao trabalhar com grandes conjuntos de dados. Outros formatos de colunas, como o Apache ORC, também têm bom desempenho.
Para dados em colunas, o Apache Avro fornece um formato de arquivo de linha binária eficiente. Embora normalmente seja mais lento que o Parquet, o desempenho do Avro é melhor do que os formatos baseados em texto,como CSV ou JSON.
Otimizar o tamanho do disco
A capacidade dos discos permanentes é escalonada de acordo com o tamanho do disco, o que pode afetar o desempenho do job do Spark. Isso acontece porque os jobs gravam metadados e embaralham os dados no disco. Ao usar discos permanentes padrão, o tamanho do disco precisa ser de pelo menos 1 terabytes por worker. Consulte Desempenho por tamanho do disco permanente.
Para monitorar a capacidade do disco do worker no console do Google Cloud:
- Clique no nome do cluster na página Clusters.
- Clique na guia INSTÂNCIAS de VM.
- Clique no nome de qualquer worker.
- Clique na guia "MONITORAMENTO" e role a tela para baixo até "Capacidade do disco" para ver a capacidade do worker.
Considerações sobre disco
Clusters temporários do Dataproc, que não se beneficiam do armazenamento permanente. É possível usar SSDs locais. Os SSDs locais estão fisicamente conectados ao cluster e fornecem maior capacidade do que os discos permanentes. Consulte a Tabela de desempenho. Os SSDs locais estão disponíveis em um tamanho fixo de 375 gigabytes, mas é possível adicionar vários SSDs para melhorar o desempenho.
Os SSDs locais não mantêm os dados depois que um cluster é desligado. Se você precisar de armazenamento permanente, use discos permanentes SSD, que oferecem mais capacidade para o tamanho deles do que os discos permanentes padrão. Os discos permanentes SSD também são uma boa opção se o tamanho da partição for menor que 8 KB. No entanto, evite pequenos pares.
Anexe GPUs ao cluster
O Spark 3 é compatível com GPUs. Use GPUs com a ação de inicialização RAPIDS para acelerar jobs do Spark usando o RAPIDS SQL Accelerator. A ação de inicialização do driver da GPU para configurar um cluster com GPUs.
Falhas e correções comuns de jobs
Memória insuficiente
Exemplos:
- "Executor perdido"
- "java.lang.OutOfMemoryError: limite de sobrecarga do GC excedido"
- "Contêiner encerrado pelo YARN por exceder os limites de memória"
Correções possíveis:
- Se estiver usando o PySpark, aumente
spark.executor.memoryOverhead
e diminuaspark.executor.memory
. - Use tipos de máquina com alta memória.
- Use partições menores.
Falhas na busca aleatória
Exemplos:
- "FetchFailedException" (erro do Spark)
- "Falha na conexão com..." (Erro do Spark)
- "Falha ao buscar" (erro MapReduce)
Geralmente, é causado pela remoção prematura de workers que ainda têm dados embaralhados para serem exibidos.
Causas e correções possíveis:
- VMs de workers preemptivos foram recuperadas, ou VMs de workers não preemptivos foram removidas pelo escalonador automático. Solução: use o Modo de flexibilidade aprimorado para tornar os workers secundários preemptivos ou escalonáveis com segurança.
- O executor ou mapeador falhou devido a um erro de OutOfMemory. Solução: aumentar a memória do executor ou mapeador.
- O serviço do Spark Shuffle pode estar sobrecarregado. Solução: diminuir o número de partições do job.
Os nós YARN estão NÃO ÍNTEGROS
Exemplos (de registros YARN):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Muitas vezes, relacionados ao espaço em disco insuficiente para a reprodução aleatória de dados. Faça o diagnóstico visualizando os arquivos de registro:
- Abra a página Clusters do projeto no console do Google Cloud e clique no nome do cluster.
- Clique em VER REGISTROS.
- Filtre os registros por
hadoop-yarn-nodemanager
. - Pesquise "NÃO-ÍNTEGROS".
Possíveis correções:
- O cache do usuário é armazenado no diretório especificado pela
propriedade
yarn.nodemanager.local-dirs
noyarn-site.xml file
. Esse arquivo está localizado em/etc/hadoop/conf/yarn-site.xml
. Você pode verificar o espaço livre no caminho/hadoop/yarn/nm-local-dir
e liberar espaço excluindo a pasta de cache do usuário/hadoop/yarn/nm-local-dir/usercache
. - Se o registro informar o status "NÃO INTEGROS", recrie o cluster com espaço em disco maior, o que aumentará o limite de capacidade.
O job falha devido à memória insuficiente do driver
Ao executar jobs no modo cluster, o job falha se o tamanho da memória do nó mestre for significativamente maior que o tamanho da memória do nó de trabalho.
Exemplo de registros do driver:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Possíveis correções:
- Defina
spark:spark.driver.memory
como menor queyarn:yarn.scheduler.maximum-allocation-mb
. - Use o mesmo tipo de máquina para os nós mestre e worker.
Para mais informações
- Consulte Ajuste de desempenho do Spark.