Dicas de ajuste de jobs do Spark

As seções a seguir fornecem dicas para ajudar você a ajustar seus aplicativos Spark do Dataproc.

Usar clusters temporários

Ao usar o modelo de cluster "temporário" do Dataproc, você cria um cluster dedicado para cada job e, quando o job termina, exclui o cluster. Com o modelo temporário, é possível tratar o armazenamento e a computação separadamente, salvando dados de entrada e saída de jobs no Cloud Storage ou no BigQuery, usando o cluster apenas para computação e armazenamento temporário de dados.

Armadilhas dos clusters persistentes

O uso de clusters temporários de um job evita as armadilhas e os possíveis problemas associados ao uso de clusters "persistentes" compartilhados e de longa duração:

  • Pontos únicos de falha: um estado de erro de cluster compartilhado pode fazer com que todos os jobs falhem, bloqueando um pipeline de dados inteiro. A investigação e a recuperação de um erro podem levar horas. Como os clusters temporários mantêm apenas os estados temporários no cluster, quando ocorre um erro, eles podem ser rapidamente excluídos e recriados.
  • Dificuldade para manter e migrar estados do cluster em HDFS, MySQL ou sistemas de arquivos locais
  • Contenções de recursos entre jobs que afetam negativamente os SLOs
  • Daemons de serviço sem resposta causados pela pressão da memória
  • Acúmulo de registros e arquivos temporários que podem exceder a capacidade do disco
  • Falha no escalonamento vertical devido ao estoque esgotado da zona do cluster
  • falta de suporte para versões desatualizadas de imagem de cluster;

Benefícios do cluster temporário

No lado positivo, os clusters efêmeros permitem:

  • Configurar diferentes permissões do IAM para jobs distintos com contas de serviço da VM do Dataproc diferentes.
  • Otimizar as configurações de hardware e software de um cluster para cada job, alterando as configurações de cluster conforme necessário.
  • Faça upgrade das versões de imagem em novos clusters para receber os patches de segurança, as correções de bugs e as otimizações mais recentes.
  • Resolver problemas mais rapidamente em um cluster isolado de job único.
  • Economize custos pagando apenas pelo tempo de execução do cluster temporário, não pelo tempo de inatividade entre jobs em um cluster compartilhado.

Usar o Spark SQL

A API DataFrame do Spaark SQL é uma otimização significativa da API RDD. Se você interagir com o código que usa RDDs, considere ler dados como um DataFrame antes de transmitir um RDD no código. Em código Java ou Scala, use a API Dataset Spark SQL como um superconjunto de RDDs e DataFrames.

Usar o Apache Spark 3

O Dataproc 2.0 instala o Spark 3, que inclui os seguintes recursos e melhorias de desempenho:

  • Suporte a GPUs
  • Capacidade de ler arquivos binários
  • Melhorias de desempenho
  • Remoção dinâmica de partições
  • Execução de consulta adaptável, que otimiza jobs do Spark em tempo real

Usar alocação dinâmica

O Apache Spark inclui um recurso de alocação dinâmica que escalona o número de executores do Spark nos workers de um cluster. Esse recurso permite que um job use o cluster completo do Dataproc, mesmo quando ele é escalonado verticalmente. Esse recurso é ativado por padrão no Dataproc (spark.dynamicAllocation.enabled é definido como true). Consulte Alocação dinâmica do Spark para mais informações.

Usar o escalonamento automático do Dataproc.

O escalonamento automático do Dataproc adiciona e remove dinamicamente workers do Dataproc de um cluster para garantir que os jobs do Spark tenham os recursos necessários para serem concluídos rapidamente.

É uma prática recomendada configurar a política de escalonamento automático para escalonar apenas workers secundários.

Use o modo de flexibilidade aprimorado do Dataproc

Os clusters com VMs preemptivas ou uma política de escalonamento automático podem receber exceções FetchFailed quando os workers forem interrompidos ou removidos antes de terminarem de exibir dados de embaralhamento para redutores. Essa exceção pode causar novas tentativas de tarefas e tempos de conclusão de jobs mais longos.

Recomendação: use o modo de flexibilidade aprimorado do Dataproc, que não armazena dados de embaralhamento intermediários em workers secundários, para que os workers secundários possam ser interrompidos ou reduzidos com segurança.

Configurar o particionamento e o embaralhamento

O Spark armazena dados em partições temporárias no cluster. Se o aplicativo agrupar ou mesclar DataFrames, os dados serão embaralhados em novas partições de acordo com o agrupamento e a configuração de baixo nível.

O particionamento de dados afeta significativamente o desempenho do aplicativo: um número muito pequeno de partições limita o paralelismo de jobs e a utilização de recursos do cluster. Muitas partições reduzem o job devido ao processamento e particionamento adicionais de partição.

Como configurar partições

As seguintes propriedades regem o número e o tamanho das suas partições:

  • spark.sql.files.maxPartitionBytes: o tamanho máximo das partições referentes à leitura de dados do Cloud Storage. O padrão é 128 MB, o que é suficientemente grande para a maioria dos aplicativos que processam menos de 100 TB.

  • spark.sql.shuffle.partitions: o número de partições após a execução de uma ordem aleatória. O padrão é 200, o que é adequado para clusters com menos de 100 vCPUs. Recomendação: defina como três vezes o número de vCPUs no cluster.

  • spark.default.parallelism: o número de partições retornadas após a execução de transformações RDD que exigem embaralhamentos, como join, reduceByKey e parallelize. O padrão é o número total de vCPUs no cluster. Ao usar RDDs em jobs do Spark, é possível definir esse número como três vezes suas vCPUs

Limitar o número de arquivos

Há uma perda de desempenho quando o Spark lê muitos arquivos pequenos. Armazene dados em tamanhos de arquivo maiores, por exemplo, no intervalo de 256 MB a 512 MB. Da mesma forma, limite o número de arquivos de saída. Para forçar um embaralhamento, consulte Evitar embaralhamentos desnecessários.

Configurar execução de consulta adaptável (Spark 3)

A execução de consulta adaptável (ativada por padrão na versão de imagem 2.0 do Dataproc) fornece melhorias de desempenho do job do Spark, incluindo:

Embora as configurações padrão sejam corretas para a maioria dos casos de uso, definir spark.sql.adaptive.advisoryPartitionSizeInBytes como spark.sqlfiles.maxPartitionBytes (padrão de 128 MB) pode ser útil.

Evite embaralhamentos desnecessários

O Spark permite que os usuários acionem manualmente uma ordem aleatória para reequilibrar os dados com a função repartition. Como os embaralhamentos são caros, a reprodução aleatória de dados precisa ser usada com cuidado. Definir as configurações de partição de maneira apropriada deve ser suficiente para permitir que o Spark particionar automaticamente os dados.

Exceção: ao gravar dados particionados em colunas no Cloud Storage, o reparticionamento em uma coluna específica evita a gravação de muitos arquivos pequenos para conseguir tempos de gravação mais rápidos.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

Armazenar dados em Parquet ou Avro

Por padrão, o Spark SQL lê e grava dados em arquivos Parquet compactados Snappy. O Parquet está em um formato de arquivo em colunas eficiente que permite ao Spark ler apenas os dados necessários para executar um aplicativo. Essa é uma vantagem importante ao trabalhar com grandes conjuntos de dados. Outros formatos de colunas, como o Apache ORC, também têm bom desempenho.

Para dados em colunas, o Apache Avro fornece um formato de arquivo de linha binária eficiente. Embora normalmente seja mais lento que o Parquet, o desempenho do Avro é melhor do que os formatos baseados em texto,como CSV ou JSON.

Otimizar o tamanho do disco

A capacidade dos discos permanentes é escalonada de acordo com o tamanho do disco, o que pode afetar o desempenho do job do Spark. Isso acontece porque os jobs gravam metadados e embaralham os dados no disco. Ao usar discos permanentes padrão, o tamanho do disco precisa ser de pelo menos 1 terabytes por worker. Consulte Desempenho por tamanho do disco permanente.

Para monitorar a capacidade do disco do worker no Console do Google Cloud:

  1. Clique no nome do cluster na página Clusters.
  2. Clique na guia INSTÂNCIAS de VM.
  3. Clique no nome de qualquer worker.
  4. Clique na guia "MONITORAMENTO" e role a tela para baixo até "Capacidade do disco" para ver a capacidade do worker.

Considerações sobre disco

Os clusters temporários do Dataproc, que não se beneficiam do armazenamento permanente, podem usar SSDs locais. Os SSDs locais estão fisicamente conectados ao cluster e fornecem maior capacidade do que os discos permanentes. Consulte a Tabela de desempenho. Os SSDs locais estão disponíveis em um tamanho fixo de 375 gigabytes, mas é possível adicionar vários SSDs para melhorar o desempenho.

Os SSDs locais não mantêm os dados depois que um cluster é encerrado. Se você precisar de armazenamento permanente, use discos permanentes SSD, que oferecem uma capacidade maior para o tamanho do que os discos permanentes padrão. Os discos permanentes SSD também são uma boa opção se o tamanho da partição for menor que 8 KB. No entanto, evite pequenos pares.

Anexe GPUs ao cluster

O Spark 3 é compatível com GPUs. Use GPUs com a ação de inicialização RAPIDS para acelerar jobs do Spark usando o RAPIDS SQL Accelerator. A ação de inicialização do driver da GPU para configurar um cluster com GPUs.

Falhas e correções comuns de jobs

Memória insuficiente

Exemplos:

  • "Executor perdido"
  • "java.lang.OutOfMemoryError: limite de sobrecarga do GC excedido"
  • "Contêiner encerrado pelo YARN por exceder os limites de memória"

Correções possíveis:

Falhas na busca aleatória

Exemplos:

  • "FetchFailedException" (erro do Spark)
  • "Falha na conexão com..." (Erro do Spark)
  • "Falha ao buscar" (erro MapReduce)

Geralmente, é causado pela remoção prematura de workers que ainda têm dados embaralhados para serem exibidos.

Causas e correções possíveis:

  • VMs de workers preemptivos foram recuperadas, ou VMs de workers não preemptivos foram removidas pelo escalonador automático. Solução: use o Modo de flexibilidade aprimorado para tornar os workers secundários preemptivos ou escalonáveis com segurança.
  • O executor ou mapeador falhou devido a um erro de OutOfMemory. Solução: aumentar a memória do executor ou mapeador.
  • O serviço do Spark Shuffle pode estar sobrecarregado. Solução: diminuir o número de partições do job.

Os nós YARN estão NÃO ÍNTEGROS

Exemplos (de registros YARN):

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

Muitas vezes, relacionados ao espaço em disco insuficiente para a reprodução aleatória de dados. Faça o diagnóstico visualizando os arquivos de registro:

  • Abra a página Clusters do projeto no console do Google Cloud e clique no nome do cluster.
  • Clique em VER REGISTROS.
  • Filtre os registros por hadoop-yarn-nodemanager.
  • Pesquise "NÃO-ÍNTEGROS".

Possíveis correções:

  • O cache do usuário é armazenado no diretório especificado pela propriedade yarn.nodemanager.local-dirs no yarn-site.xml file. Esse arquivo está localizado em /etc/hadoop/conf/yarn-site.xml. Verifique o espaço livre no caminho /hadoop/yarn/nm-local-dir e libere espaço excluindo a pasta de cache do usuário /hadoop/yarn/nm-local-dir/usercache.
  • Se o registro relatar o status "UNHEALTHY", recrie o cluster com espaço em disco maior, o que aumentará o limite da capacidade.

O job falha devido à memória insuficiente do driver

Na execução de jobs no modo de cluster, o job falhará se o tamanho da memória do nó mestre for significativamente maior que o do nó de trabalho.

Exemplo de registros do driver:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

Possíveis correções:

  • Defina spark:spark.driver.memory menor que yarn:yarn.scheduler.maximum-allocation-mb.
  • Use o mesmo tipo de máquina para os nós mestre e de trabalho.

Para mais informações