Dicas de ajuste de jobs do Spark

As seções a seguir oferecem dicas para ajudar você a ajustar seu Dataproc aplicativos Spark.

Usar clusters temporários

Ao usar a configuração "efêmero" do Dataproc, modelo de cluster, você cria um cluster dedicado para cada job e, quando o job terminar, ele será excluído. No modelo temporário, é possível tratar o armazenamento e a computação separadamente, salvar dados de entrada e saída do job no Cloud Storage ou no BigQuery; usando o cluster apenas para computação e armazenamento de dados temporário.

Armadilhas de clusters permanentes

O uso de clusters efêmeros de um job evita as armadilhas a seguir e as possíveis problemas associados ao uso de "persistentes" compartilhados e clusters:

  • Pontos únicos de falha: um estado de erro de cluster compartilhado pode fazer com que todos os jobs falhem. bloquear um pipeline de dados inteiro. A investigação e recuperação de um erro pode levar horas. Como os clusters efêmeros mantêm apenas estados temporários neles, quando ocorre um erro, elas podem ser rapidamente excluídas e recriadas.
  • Dificuldade em manter e migrar estados de cluster no HDFS, MySQL ou sistemas de arquivos locais
  • Contenções de recursos entre jobs que afetam negativamente os SLOs
  • Demônios de serviço sem resposta causados pela pressão de memória
  • Acúmulo de registros e arquivos temporários que podem exceder a capacidade do disco
  • Falha no escalonamento vertical devido ao estoque esgotado da zona do cluster
  • Falta de suporte para versões desatualizadas da imagem do cluster.

Benefícios do cluster efêmero

Pelo lado positivo, os clusters efêmeros permitem fazer o seguinte:

  • Configure diferentes permissões do IAM para diferentes jobs com diferentes contas de serviço de VM do Dataproc.
  • Otimize as configurações de hardware e software de um cluster para cada job, mudando as configurações do cluster conforme necessário.
  • Faça upgrade das versões de imagem em novos clusters para receber os patches de segurança, correções de bugs e otimizações mais recentes.
  • Resolva problemas mais rapidamente em um cluster isolado de job único.
  • Economize custos pagando apenas pelo tempo de execução do cluster efêmero, não pelo tempo ocioso entre os jobs em um cluster compartilhado.

Usar o Spark SQL

A API DataFrame do Spaark SQL é uma otimização significativa da API RDD. Se você interagir com o código que usa RDDs, considere ler dados como um DataFrame antes de transmitir um RDD no código. Em código Java ou Scala, use a API Dataset Spark SQL como um superconjunto de RDDs e DataFrames.

Usar o Apache Spark 3

O Dataproc 2.0 instala o Spark 3, que inclui os seguintes recursos e melhorias de desempenho:

  • Suporte a GPUs
  • Capacidade de ler arquivos binários
  • Melhorias de desempenho
  • Remoção dinâmica de partições
  • Execução de consulta adaptável, que otimiza jobs do Spark em tempo real

Usar alocação dinâmica

O Apache Spark inclui um recurso de alocação dinâmica que escalona o número de executores do Spark nos workers de um cluster. Esse recurso permite que um job use o cluster completo do Dataproc, mesmo quando ele é escalonado verticalmente. Esse recurso é ativado por padrão no Dataproc (spark.dynamicAllocation.enabled é definido como true). Consulte Alocação dinâmica do Spark para mais informações.

Usar o escalonamento automático do Dataproc.

O escalonamento automático do Dataproc adiciona e remove dinamicamente workers do Dataproc de um cluster para garantir que os jobs do Spark tenham os recursos necessários para serem concluídos rapidamente.

É uma prática recomendada configurar a política de escalonamento automático para escalonar apenas workers secundários.

Use o modo de flexibilidade aprimorado do Dataproc

Os clusters com VMs preemptivas ou uma política de escalonamento automático podem receber exceções FetchFailed quando os workers forem interrompidos ou removidos antes de terminarem de exibir dados de embaralhamento para redutores. Essa exceção pode causar novas tentativas de tarefas e tempos de conclusão de jobs mais longos.

Recomendação: use o modo de flexibilidade aprimorado do Dataproc, que não armazena dados de embaralhamento intermediários em workers secundários, para que os workers secundários possam ser interrompidos ou reduzidos com segurança.

Configurar o particionamento e o embaralhamento

O Spark armazena dados em partições temporárias no cluster. Se o aplicativo agrupar ou mesclar DataFrames, os dados serão embaralhados em novas partições de acordo com o agrupamento e a configuração de baixo nível.

O particionamento de dados afeta significativamente o desempenho do aplicativo: um número muito pequeno de partições limita o paralelismo de jobs e a utilização de recursos do cluster. Muitas partições reduzem o job devido ao processamento e particionamento adicionais de partição.

Como configurar partições

As seguintes propriedades regem o número e o tamanho das suas partições:

  • spark.sql.files.maxPartitionBytes: o tamanho máximo das partições referentes à leitura de dados do Cloud Storage. O padrão é 128 MB, o que é suficientemente grande para a maioria dos aplicativos que processam menos de 100 TB.

  • spark.sql.shuffle.partitions: o número de partições após a execução de uma ordem aleatória. O padrão é 200, o que é adequado para clusters com menos de 100 vCPUs. Recomendação: defina como três vezes o número de vCPUs no cluster.

  • spark.default.parallelism: o número de partições retornadas após a execução de transformações RDD que exigem embaralhamentos, como join, reduceByKey e parallelize. O padrão é o número total de vCPUs no cluster. Ao usar RDDs em jobs do Spark, é possível definir esse número como três vezes suas vCPUs

Limitar o número de arquivos

Há uma perda de desempenho quando o Spark lê muitos arquivos pequenos. Armazene dados em arquivos maiores, por exemplo, tamanhos de arquivo na faixa de 256 MB a 512 MB. Da mesma forma, limite o número de arquivos de saída. Para forçar uma reprodução aleatória, consulte Evitar embaralhamentos desnecessários.

Configurar execução de consulta adaptável (Spark 3)

A execução de consulta adaptável (ativada por padrão na versão de imagem 2.0 do Dataproc) fornece melhorias de desempenho do job do Spark, incluindo:

Embora as configurações padrão sejam corretas para a maioria dos casos de uso, definir spark.sql.adaptive.advisoryPartitionSizeInBytes como spark.sqlfiles.maxPartitionBytes (padrão de 128 MB) pode ser útil.

Evite embaralhamentos desnecessários

O Spark permite que os usuários acionem manualmente uma ordem aleatória para reequilibrar os dados com a função repartition. Como os embaralhamentos são caros, a reprodução aleatória de dados precisa ser usada com cuidado. Definir as configurações de partição de maneira apropriada deve ser suficiente para permitir que o Spark particionar automaticamente os dados.

Exceção: ao gravar dados particionados em colunas no Cloud Storage, o reparticionamento em uma coluna específica evita a gravação de muitos arquivos pequenos para conseguir tempos de gravação mais rápidos.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Armazenar dados em Parquet ou Avro

Por padrão, o Spark SQL lê e grava dados em arquivos Parquet compactados Snappy. O Parquet está em um formato de arquivo em colunas eficiente que permite ao Spark ler apenas os dados necessários para executar um aplicativo. Essa é uma vantagem importante ao trabalhar com grandes conjuntos de dados. Outros formatos de colunas, como o Apache ORC, também têm bom desempenho.

Para dados em colunas, o Apache Avro fornece um formato de arquivo de linha binária eficiente. Embora normalmente seja mais lento que o Parquet, o desempenho do Avro é melhor do que os formatos baseados em texto,como CSV ou JSON.

Otimizar o tamanho do disco

A capacidade dos discos permanentes é escalonada de acordo com o tamanho do disco, o que pode afetar o desempenho do job do Spark. Isso acontece porque os jobs gravam metadados e embaralham os dados no disco. Ao usar discos permanentes padrão, o tamanho do disco precisa ser de pelo menos 1 terabytes por worker. Consulte Desempenho por tamanho do disco permanente.

Para monitorar a capacidade do disco do worker no console do Google Cloud:

  1. Clique no nome do cluster na página Clusters.
  2. Clique na guia INSTÂNCIAS de VM.
  3. Clique no nome de qualquer worker.
  4. Clique na guia "MONITORAMENTO" e role a tela para baixo até "Capacidade do disco" para ver a capacidade do worker.

Considerações sobre disco

Clusters temporários do Dataproc, que não se beneficiam do armazenamento permanente. É possível usar SSDs locais. Os SSDs locais estão fisicamente conectados ao cluster e fornecem maior capacidade do que os discos permanentes. Consulte a Tabela de desempenho. Os SSDs locais estão disponíveis em um tamanho fixo de 375 gigabytes, mas é possível adicionar vários SSDs para melhorar o desempenho.

Os SSDs locais não mantêm os dados depois que um cluster é desligado. Se você precisar de armazenamento permanente, use discos permanentes SSD, que oferecem mais capacidade para o tamanho deles do que os discos permanentes padrão. Os discos permanentes SSD também são uma boa opção se o tamanho da partição for menor que 8 KB. No entanto, evite pequenos pares.

Anexe GPUs ao cluster

O Spark 3 é compatível com GPUs. Usar GPUs com a Ação de inicialização do RAPIDS para acelerar jobs do Spark usando o Acelerador de SQL do RAPIDS O Ação de inicialização do driver da GPU para configurar um cluster com GPUs.

Falhas e correções comuns de jobs

Memória insuficiente

Exemplos:

  • "Executor perdido"
  • "java.lang.OutOfMemoryError: limite de sobrecarga do GC excedido"
  • "Contêiner encerrado pelo YARN por exceder os limites de memória"

Correções possíveis:

Falhas na busca aleatória

Exemplos:

  • "FetchFailedException" (erro do Spark)
  • "Falha na conexão com..." (Erro do Spark)
  • "Falha ao buscar" (erro MapReduce)

Geralmente, é causado pela remoção prematura de workers que ainda têm dados embaralhados para serem exibidos.

Causas e correções possíveis:

  • VMs de workers preemptivos foram recuperadas, ou VMs de workers não preemptivos foram removidas pelo escalonador automático. Solução: use o Modo de flexibilidade aprimorado para tornar os workers secundários preemptivos ou escalonáveis com segurança.
  • O executor ou mapeador falhou devido a um erro de OutOfMemory. Solução: aumentar a memória do executor ou mapeador.
  • O serviço do Spark Shuffle pode estar sobrecarregado. Solução: diminuir o número de partições do job.

Os nós YARN estão NÃO ÍNTEGROS

Exemplos (de registros YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Muitas vezes, relacionados ao espaço em disco insuficiente para a reprodução aleatória de dados. Faça o diagnóstico visualizando os arquivos de registro:

  • Abra o Clusters página no console do Google Cloud e, em seguida, clique no nome do cluster.
  • Clique em VER REGISTROS.
  • Filtre os registros por hadoop-yarn-nodemanager.
  • Pesquise "NÃO-ÍNTEGROS".

Possíveis correções:

  • O cache do usuário é armazenado no diretório especificado pela propriedade yarn.nodemanager.local-dirs no yarn-site.xml file. Esse arquivo está localizado em /etc/hadoop/conf/yarn-site.xml: É possível verificar o espaço livre no caminho /hadoop/yarn/nm-local-dir e libere espaço em excluindo a pasta de cache do usuário /hadoop/yarn/nm-local-dir/usercache.
  • Se o registro informar o status "NÃO INTEGROS", recrie o cluster com espaço em disco maior, o que aumentará o limite de capacidade.

O job falha devido à memória insuficiente do driver

Ao executar jobs no modo de cluster, o job falhará se o tamanho da memória do mestre é significativamente maior do que o tamanho da memória do nó de trabalho.

Exemplo de registros do driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possíveis correções:

  • Defina spark:spark.driver.memory como menor que yarn:yarn.scheduler.maximum-allocation-mb.
  • Use o mesmo tipo de máquina para os nós mestre e worker.

Para mais informações