As secções seguintes oferecem sugestões para ajudar a otimizar as suas aplicações Dataproc Spark.
Use clusters efémeros
Quando usa o modelo de cluster "efémero" do Dataproc, cria um cluster dedicado para cada tarefa e, quando a tarefa termina, elimina o cluster. Com o modelo efémero, pode tratar o armazenamento e a computação separadamente, guardando os dados de entrada e saída das tarefas no Cloud Storage ou no BigQuery, usando o cluster apenas para computação e armazenamento de dados temporários.
Armadilhas de clusters persistentes
A utilização de clusters temporários de uma tarefa evita os seguintes problemas e potenciais problemas associados à utilização de clusters "persistentes" partilhados e de longa duração:
- Pontos únicos de falha: um estado de erro de cluster partilhado pode fazer com que todas as tarefas falhem, bloqueando um pipeline de dados completo. A investigação e a recuperação de um erro podem demorar horas. Uma vez que os clusters efémeros mantêm apenas estados temporários no cluster, quando ocorre um erro, podem ser rapidamente eliminados e recriados.
- Dificuldade em manter e migrar estados de clusters no HDFS, MySQL ou sistemas de ficheiros locais
- Conflitos de recursos entre tarefas que afetam negativamente os SLOs
- Os daemons de serviço sem resposta causados pela pressão da memória
- Acumulação de registos e ficheiros temporários que podem exceder a capacidade do disco
- Falha na expansão devido à indisponibilidade de stock na zona do cluster
- Falta de suporte para versões de imagens de clusters desatualizadas.
Vantagens do cluster efémero
Por outro lado, os clusters efémeros permitem-lhe fazer o seguinte:
- Configure diferentes autorizações do IAM para diferentes tarefas com diferentes contas de serviço de VMs do Dataproc.
- Otimizar as configurações de hardware e software de um cluster para cada tarefa, alterando as configurações do cluster conforme necessário.
- Atualize as versões das imagens em novos clusters para receber os patches de segurança, as correções de erros e as otimizações mais recentes.
- Resolva problemas mais rapidamente num cluster isolado de tarefa única.
- Poupe custos pagando apenas o tempo de execução do cluster efémero e não o tempo de inatividade entre tarefas num cluster partilhado.
Use o Spark SQL
A API DataFrame do Spark SQL é uma otimização significativa da API RDD. Se interagir com código que usa RDDs, considere ler dados como um DataFrame antes de passar um RDD no código. No código Java ou Scala, considere usar a API de conjuntos de dados Spark SQL como um superconjunto de RDDs e DataFrames.
Use o Apache Spark 3
O Dataproc 2.0 instala o Spark 3, que inclui as seguintes funcionalidades e melhorias de desempenho:
- Suporte de GPU
- Capacidade de ler ficheiros binários
- Melhorias no desempenho
- Poda de partições dinâmicas
- Execução de consultas adaptável, que otimiza as tarefas do Spark em tempo real
Use a atribuição dinâmica
O Apache Spark inclui uma funcionalidade de atribuição dinâmica que dimensiona o número de executores do Spark nos trabalhadores num cluster. Esta funcionalidade permite que uma tarefa use o cluster do Dataproc completo, mesmo quando o cluster é dimensionado. Esta funcionalidade está ativada por predefinição no Dataproc (spark.dynamicAllocation.enabled
está definido como true
). Consulte o artigo
Atribuição dinâmica do Spark
para mais informações.
Use a escala automática do Dataproc
O Dataproc com dimensionamento automático adiciona e remove dinamicamente trabalhadores do Dataproc de um cluster para ajudar a garantir que as tarefas do Spark têm os recursos necessários para serem concluídas rapidamente.
É uma prática recomendada configurar a política de dimensionamento automático para dimensionar apenas trabalhadores secundários.
Use o modo de flexibilidade melhorada do Dataproc
Os clusters com VMs preemptíveis ou uma política de escalamento automático podem receber exceções FetchFailed quando os trabalhadores são preemptíveis ou removidos antes de terminarem de fornecer dados aleatórios aos redutores. Esta exceção pode causar novas tentativas de tarefas e tempos de conclusão de trabalhos mais longos.
Recomendação: use o modo de flexibilidade melhorada do Dataproc, que não armazena dados de mistura intermédios em trabalhadores secundários, para que os trabalhadores secundários possam ser interrompidos ou reduzidos em segurança.
Configure a partição e a aleatorização
O Spark armazena dados em partições temporárias no cluster. Se a sua aplicação agrupar ou juntar DataFrames, baralha os dados em novas partições de acordo com o agrupamento e a configuração de baixo nível.
A partição de dados afeta significativamente o desempenho da aplicação: um número demasiado baixo de partições limita o paralelismo das tarefas e a utilização de recursos do cluster; um número demasiado elevado de partições abranda a tarefa devido ao processamento e à atribuição aleatória de partições adicionais.
Configurar partições
As seguintes propriedades regem o número e o tamanho das suas partições:
spark.sql.files.maxPartitionBytes
: o tamanho máximo das partições quando lê dados do Cloud Storage. A predefinição é de 128 MB, o que é suficientemente grande para a maioria das aplicações que processam menos de 100 TB.spark.sql.shuffle.partitions
: o número de partições após a execução de uma mistura. A predefinição é1000
para clusters de versões de imagens2.2
e posteriores. Recomendação: defina este valor como 3 vezes o número de vCPUs no cluster.spark.default.parallelism
: o número de partições devolvidas após executar transformações de RDD que requerem misturas, comojoin
,reduceByKey
eparallelize
. O valor predefinido é o número total de vCPUs no cluster. Quando usar RDDs em tarefas do Spark, pode definir este número para 3 vezes as suas vCPUs
Limite o número de ficheiros
Existe uma perda de desempenho quando o Spark lê um grande número de ficheiros pequenos. Armazenar dados em tamanhos de ficheiros maiores, por exemplo, tamanhos de ficheiros no intervalo de 256 MB a 512 MB. Da mesma forma, limite o número de ficheiros de saída (para forçar uma ordenação aleatória, consulte a secção Evite ordenações aleatórias desnecessárias).
Configure a execução de consultas adaptativa (Spark 3)
A execução de consultas adaptativa (ativada por predefinição na versão 2.0 da imagem do Dataproc) oferece melhorias no desempenho das tarefas do Spark, incluindo:
- Unir partições após as misturas
- Converter junções de ordenação e união em junções de transmissão
- Otimizações para junções de desvio.
Embora as predefinições de configuração sejam adequadas para a maioria dos exemplos de utilização, pode ser vantajoso definir spark.sql.adaptive.advisoryPartitionSizeInBytes
como spark.sqlfiles.maxPartitionBytes
(predefinição de 128 MB).
Evite misturas desnecessárias
O Spark permite que os utilizadores acionem manualmente uma mistura para reequilibrar os respetivos dados com a função repartition
. As misturas são dispendiosas, pelo que a mistura de dados deve ser usada com cautela. A definição adequada das configurações
de partição deve ser suficiente para permitir que o Spark particione automaticamente
os seus dados.
Exceção: quando escreve dados particionados por colunas no Cloud Storage, a repartição numa coluna específica evita a escrita de muitos ficheiros pequenos para conseguir tempos de escrita mais rápidos.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Armazene dados em Parquet ou Avro
Por predefinição, o Spark SQL lê e escreve dados em ficheiros Parquet comprimidos com Snappy. O Parquet é um formato de ficheiro colunar eficiente que permite ao Spark ler apenas os dados de que precisa para executar uma aplicação. Esta é uma vantagem importante quando trabalha com grandes conjuntos de dados. Outros formatos de colunas, como o Apache ORC, também têm um bom desempenho.
Para dados não colunares, o Apache Avro oferece um formato de ficheiro de linhas binárias eficiente. Embora normalmente seja mais lento do que o Parquet, o desempenho do Avro é melhor do que o dos formatos baseados em texto,como CSV ou JSON.
Otimize o tamanho do disco
A taxa de transferência dos discos persistentes é dimensionada com o tamanho do disco, o que pode afetar o desempenho da tarefa do Spark, uma vez que as tarefas escrevem metadados e misturam dados no disco. Quando usar discos persistentes padrão, o tamanho do disco deve ser de, pelo menos, 1 terabyte por trabalhador (consulte Desempenho por tamanho do disco persistente).
Para monitorizar o débito do disco do trabalhador na Google Cloud consola:
- Clique no nome do cluster na página Clusters.
- Clique no separador INSTÂNCIAS DE VM.
- Clique no nome de qualquer trabalhador.
- Clique no separador MONITORIZAÇÃO e, de seguida, desloque a página para baixo até Disk Throughput para ver o débito do trabalhador.
Considerações sobre o disco
Os clusters Dataproc efémeros, que não beneficiam do armazenamento persistente, podem usar SSDs locais. Os SSDs locais estão fisicamente associados ao cluster e oferecem um débito superior ao dos discos persistentes (consulte a tabela de desempenho). Os SSDs locais estão disponíveis num tamanho fixo de 375 gigabytes, mas pode adicionar vários SSDs para aumentar o desempenho.
Os SSDs locais não mantêm os dados após o encerramento de um cluster. Se precisar de armazenamento persistente, pode usar discos persistentes SSD, que oferecem um débito mais elevado para o respetivo tamanho do que os discos persistentes padrão. Os discos persistentes SSD também são uma boa escolha se o tamanho da partição for inferior a 8 KB (no entanto, evite partições pequenas).
Anexe GPUs ao cluster
O Spark 3 suporta GPUs. Use GPUs com a ação de inicialização do RAPIDS para acelerar as tarefas do Spark com o acelerador SQL do RAPIDS. A ação de inicialização do controlador da GPU para configurar um cluster com GPUs.
Falhas e correções comuns de tarefas
Sem memória
Exemplos:
- "Lost executor"
- "java.lang.OutOfMemoryError: GC overhead limit exceeded"
- "Container killed by YARN for exceeding memory limits" (Contentor terminado pelo YARN por exceder os limites de memória)
Possíveis correções:
- Se estiver a usar o PySpark, aumente
spark.executor.memoryOverhead
e diminuaspark.executor.memory
. - Use tipos de máquinas com muita memória.
- Use partições mais pequenas.
Falhas de obtenção aleatória
Exemplos:
- "FetchFailedException" (erro do Spark)
- "Não foi possível estabelecer ligação a…" (Erro do Spark)
- "Failed to fetch" (Falha ao obter) (erro do MapReduce)
Normalmente, é causado pela remoção prematura de trabalhadores que ainda têm dados de aleatorização para disponibilizar.
Causas e correções possíveis:
- As VMs de trabalho preemptíveis foram recuperadas ou as VMs de trabalho não preemptíveis foram removidas pelo escalador automático. Solução: use o modo de flexibilidade melhorado para tornar os trabalhadores secundários seguros, escaláveis ou com prioridade.
- O executor ou o mapeador falhou devido a um erro OutOfMemory. Solução: Aumente a memória do executor ou do mapeador.
- O serviço de aleatorização do Spark pode estar sobrecarregado. Solução: diminua o número de partições de tarefas.
Os nós do YARN estão em mau estado
Exemplos (dos registos do YARN):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Muitas vezes, está relacionado com espaço em disco insuficiente para os dados de reprodução aleatória. Diagnostique através da visualização de ficheiros de registo:
- Abra a página Clusters do projeto na Google Cloud consola e, de seguida, clique no nome do cluster.
- Clique em VER REGISTOS.
- Filtre os registos por
hadoop-yarn-nodemanager
. - Pesquise "UNHEALTHY".
Possíveis correções:
- A cache do utilizador é armazenada no diretório especificado pela propriedade
yarn.nodemanager.local-dirs
noyarn-site.xml file
. Este ficheiro encontra-se em/etc/hadoop/conf/yarn-site.xml
. Pode verificar o espaço livre no caminho/hadoop/yarn/nm-local-dir
e libertar espaço eliminando a pasta de cache do utilizador/hadoop/yarn/nm-local-dir/usercache
. - Se o registo comunicar o estado "UNHEALTHY", recrie o cluster com um espaço em disco maior, o que aumenta o limite de débito.
A tarefa falha devido a memória do controlador insuficiente
Quando executa trabalhos no modo de cluster, o trabalho falha se o tamanho da memória do nó de trabalho for superior ao tamanho da memória do nó principal.
Exemplo de registos do condutor:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Possíveis correções:
- Defina
spark:spark.driver.memory
como inferior ayarn:yarn.scheduler.maximum-allocation-mb
. - Use o mesmo tipo de máquina para os nós principais e de trabalho.
O que se segue?
- Saiba mais sobre a otimização do desempenho do Spark.