O modo de flexibilidade otimizada (EFM, na sigla em inglês) do Dataproc gerencia dados embaralhados para minimizar os atrasos de progresso do job causados pela remoção de nós de um cluster em execução. O EFM descarrega dados embaralhados em um dos dois modos selecionáveis pelo usuário:
Embaralhamento do worker principal. Mapeadores gravam dados nos principais workers. Os workers extraem esses nós remotos durante a fase de redução. Esse modo só está disponível e é recomendado para jobs do Spark.
Embaralhamento de HCFS (Hadoop Compatible File System) Mapeadores gravam dados em uma implementação do HCFS (HDFS por padrão). Assim como no modo de worker primário, somente os workers principais participam das implementações do HDFS e do HCFS. Se o HCFS usar o Conector do Cloud Storage, os dados serão armazenados fora do cluster. Esse modo pode beneficiar jobs com pequenas quantidades de dados, mas, devido a limitações de escalonamento, ele não é recomendado para jobs maiores.
Como os dois modos de EFM não armazenam dados de shuffle intermediários em workers secundários, o EFM é adequado para clusters que usam VMs preemptivas ou apenas escalonam automaticamente o grupo de workers secundário.
- Os jobs do Apache Hadoop YARN que não são compatíveis com a realocação do AppMaster podem falhar no modo de flexibilidade aprimorada. Consulte Quando esperar que o AppMasters termine.
- O modo de flexibilidade aprimorada não é recomendado:
- em um cluster que tem apenas workers principais
- em jobs de streaming, já que pode levar até 30 minutos após a conclusão do job para limpar os dados de embaralhamento intermediários.
- O Modo de flexibilidade aprimorado não é compatível:
- quando o escalonamento automático do worker primário está ativado. Na maioria dos casos, os workers principais ainda armazenam dados embaralhados que não são migrados automaticamente. A redução do grupo de workers primário diminui os benefícios do EFM.
- quando os jobs do Spark são executados em um cluster com desativação otimizada ativada. A desativação otimizada e a EFM podem funcionar de forma cruzada, já que o mecanismo de desativação otimizada do YARN mantém os nós DECOMMISSIONING até que todos os aplicativos envolvidos sejam concluídos.
Como usar o modo de flexibilidade aprimorada
O modo de flexibilidade aprimorado é configurado por mecanismo de execução e precisa ser configurado durante a criação do cluster.
A implementação do EEFM do Spark é configurada com a propriedade de cluster
dataproc:efm.spark.shuffle
. Valores de propriedade válidos:primary-worker
para embaralhamento de worker primário (recomendado)hcfs
para embaralhamento baseado no HCFS. Esse modo foi descontinuado e está disponível apenas em clusters que executam a versão de imagem 1.5. Não recomendado para novos fluxos de trabalho.
A implementação do Hadoop MapReduce é configurada com a propriedade de cluster
dataproc:efm.mapreduce.shuffle
. Valores de propriedade válidos:hcfs
Exemplo: crie um cluster com o embaralhamento principal do worker para o Spark e o HCFS embaralhado para o MapReduce:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --worker-machine-type=n1-highmem-8 \ --num-workers=25 \ --num-worker-local-ssds=2 \ --secondary-worker-type=preemptible \ --secondary-worker-boot-disk-size=500GB \ --num-secondary-workers=25
Exemplo do Apache Spark
- Execute um job WordCount em texto de Shakespeare público usando o jar de exemplos do Spark
no cluster do EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Exemplo de MapReduce do Apache Hadoop
Execute um job pequeno do teragen para gerar dados de entrada no Cloud Storage para um job do terater posterior usando o jar de exemplos de MapReduce no cluster do EFM.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
Execute um job de terasort nos dados.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- terasort gs://terasort/input gs://terasort/output
Como configurar SSDs locais para embaralhamento de worker primário
As implementações de embaralhamento primário e de HDFS gravam dados embaralhados intermediários em discos conectados à VM e se beneficiam da capacidade e das IOPS adicionais oferecidas pelos SSDs locais. Para facilitar a alocação de recursos, segmente uma meta de aproximadamente uma partição SSD local por quatro vCPUs ao configurar máquinas de trabalho principais.
Para anexar SSDs locais, transmita a flag --num-worker-local-ssds
para o comando
gcloud dataproc clusters create.
Geralmente, não é necessário ter SSDs locais em workers secundários.
Adicionar SSDs locais aos workers secundários de um cluster (usando a
flag --num-secondary-worker-local-ssds
)
geralmente é menos importante porque os workers secundários não gravam dados embaralhados localmente.
No entanto, como os SSDs locais melhoram o desempenho do disco local, você pode adicionar
SSDs locais a workers secundários se espera que
os jobs sejam limitados a E/S
devido ao uso do disco local: o job usa um disco local significativo para
espaço de trabalho ou suas partições são
grandes demais para caber na memória e serão transferidas para o disco.
Proporção de workers secundários
Como os workers secundários gravam os dados embaralhados nos workers principais, o
cluster precisa conter um número suficiente de workers principais com recursos de CPU,
memória e disco suficientes para acomodar a carga de embaralhamento do job. Para
clusters de escalonamento automático, para impedir que o grupo principal seja
escalonado e cause um comportamento indesejado, defina minInstances
como o valor maxInstances
na
política de escalonamento automático
para o grupo de workers principal.
Se você tiver uma alta proporção de workers secundários em relação ao primário, por exemplo, 10:1, monitore a utilização da CPU, a rede e o uso de disco para workers principais para determinar se eles estão sobrecarregados. Para fazer isso:
Acesse a página Instâncias de VM no console do Google Cloud.
Clique na caixa de seleção do lado esquerdo do worker primário.
Clique na guia "MONITORAMENTO" para visualizar a utilização da CPU do worker primário, as IOPS de disco, os bytes de rede e outras métricas.
Se os workers principais estiverem sobrecarregados, considere aumentar os workers manualmente manualmente.
Como redimensionar o grupo de trabalho principal
O grupo de workers primário pode ser escalonado com segurança, mas a redução do grupo de workers primário
pode afetar negativamente o progresso do job. As operações que reduzem o
grupo do worker primário precisam usar a
desativação otimizada, que é ativada definindo a flag --graceful-decommission-timeout
.
Clusters com escalonamento automático: o escalonamento do grupo de workers principais é desativado em clusters de EFM com políticas de escalonamento automático. Para redimensionar o grupo de workers primário em um cluster com escalonamento automático:
Desative o escalonamento automático.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Escalone o grupo principal.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Reative o escalonamento automático:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Como monitorar o uso do disco do worker principal
Os workers principais precisam ter espaço em disco suficiente para os dados embaralhados do cluster.
É possível monitorar esse recurso indiretamente por meio da métrica remaining HDFS capacity
.
Conforme o disco local é preenchido, o espaço fica indisponível para o HDFS, e
a capacidade restante diminui.
Por padrão, quando o disco local de um worker primário excede 90% da capacidade, o nó é marcado como UNHEALTHY na IU do nó YARN. Se você tiver problemas de capacidade do disco, exclua dados não utilizados do HDFS ou escalone o pool de workers principal.
Configuração avançada
Particionamento e paralelismo
Ao enviar um job do MapReduce ou Spark, configure um nível apropriado de particionamento. A decisão sobre o número de partições de entrada e saída para um estágio de embaralhamento envolve um equilíbrio entre diferentes características de desempenho. É melhor testar valores que funcionem para as formas do job.
Partições de entrada
O particionamento de entrada do MapReduce e do Spark é determinado pelo conjunto de dados de entrada. Durante a leitura de arquivos do Cloud Storage, cada tarefa processa aproximadamente um "tamanho de bloco" de dados.
Para jobs do Spark SQL, o tamanho máximo da partição é controlado por
spark.sql.files.maxPartitionBytes
. Considere aumentá-lo para 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Para jobs do MapReduce e Spark RDDs, o tamanho da partição normalmente é controlado com
fs.gs.block.size
, que tem um padrão de 128 MB. Considere aumentá-lo para 1 GB. Também é possível definir propriedades específicas deInputFormat
, comomapreduce.input.fileinputformat.split.minsize
emapreduce.input.fileinputformat.split.maxsize
.- Para jobs do MapReduce:
--properties fs.gs.block.size=1073741824
- Para RDDs do Spark:
--properties spark.hadoop.fs.gs.block.size=1073741824
- Para jobs do MapReduce:
Partições de saída
O número de tarefas em estágios subsequentes é controlado por várias propriedades. Em jobs maiores que processam mais de 1 TB, considere ter pelo menos 1 GB por partição.
Para jobs do MapReduce, o número de partições de saída é controlado por
mapreduce.job.reduces
.No Spark SQL, o número de partições de saída é controlado por
spark.sql.shuffle.partitions
.Para jobs do Spark usando a API RDD, é possível especificar o número de partições de saída ou definir
spark.default.parallelism
.
Ordem de embaralhamento para embaralhamento de worker primário
A propriedade mais significativa é --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Observe que essa é uma propriedade YARN no nível do cluster, já que o servidor do
Shuffle no Spark é executado como parte do Node Manager. O padrão é o dobro de vezes (2x) de núcleos na máquina
(por exemplo, 16 linhas de execução em um n1-highmem-8). Se "Tempo bloqueado de leitura de embaralhamento" for
maior que 1 segundo e os workers principais não atingirem os limites de rede, CPU ou de disco,
considere aumentar o número de linhas de execução do servidor embaralhadas.
Em tipos de máquinas maiores, considere aumentar spark.shuffle.io.numConnectionsPerPeer
,
que tem o padrão 1. Por exemplo, defina-o com cinco conexões por par de hosts.
Como aumentar as tentativas
É possível configurar o número máximo de tentativas permitidas para mestres, tarefas e etapas de apps, definindo as seguintes propriedades:
yarn:yarn.resourcemanager.am.max-attempts mapred:mapreduce.map.maxattempts mapred:mapreduce.reduce.maxattempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Como os mestres e as tarefas dos aplicativos são frequentemente encerrados em clusters que usam muitas VMs preemptivas ou escalonamento automático sem desativação otimizada, o aumento dos valores das propriedades acima nesses clusters pode ajudar. Observe que o uso do EFM com o Spark e a desativação otimizada não são compatíveis.
Como configurar o HDFS para embaralhamento de HCFS
Para melhorar o desempenho de embaralhamentos grandes, é possível diminuir a contenção de bloqueio no
NameNode definindo dfs.namenode.fslock.fair=false
. Observe que isso corre o risco
de atender a solicitações individuais, mas pode melhorar a capacidade em todo o cluster.
Para melhorar ainda mais o desempenho do NameNode, é possível anexar SSDs locais ao
nó mestre definindo --num-master-local-ssds
. Também é possível adicionar SSDs locais a
workers primários para melhorar o desempenho do DataNode. Basta definir
--num-worker-local-ssds
.
Outros sistemas de arquivos compatíveis com Hadoop para embaralhamento de HCFS
Por padrão, os dados de embaralhamento EFM HCFS são gravados no HDFS, mas é possível usar qualquer
Sistema de arquivos compatível com Hadoop (HCFS, na sigla em inglês).
Por exemplo, você pode gravar aleatoriamente no Cloud Storage ou no HDFS de um cluster diferente. Para especificar um sistema de arquivos, aponte
fs.defaultFS
para o sistema de arquivos de destino ao enviar um job para o cluster.
Desativação otimizada do YARN em clusters de EFM
A Desativação otimizada do YARN pode ser usada para remover rapidamente os nós com impacto mínimo na execução de aplicativos. Para clusters de escalonamento automático, o tempo limite de desativação otimizada pode ser definido em uma AutoscalingPolicy anexada ao cluster de EFM.
Melhorias na EFM do MapReduce para a desativação otimizada
Como os dados intermediários são armazenados em um sistema de arquivos distribuído, os nós podem ser removidos de um cluster de EFM assim que todos os contêineres em execução nesses nós forem concluídos. Em comparação, os nós não são removidos nos clusters padrão do Dataproc até que o aplicativo seja concluído.
A remoção do nó não aguarda a conclusão dos mestres de aplicativos em um nó. Quando o contêiner mestre do aplicativo é encerrado, ele é reprogramado em outro nó que não está sendo desativado. O progresso do job não é perdido: o novo mestre do app recupera rapidamente o estado do mestre anterior do app lendo o histórico do job.
Como usar a desativação otimizada em um cluster de EFM com MapReduce
Crie um cluster da EFM com o mesmo número de workers principais e secundários.
gcloud dataproc clusters create cluster-name \ --properties=dataproc:efm.mapreduce.shuffle=hcfs \ --region=region \ --num-workers=5 \ --num-secondary-workers=5
Execute um job mapreduce que calcula o valor de pi usando o jar de exemplos do mapreduce no cluster.
gcloud dataproc jobs submit hadoop \ --cluster=cluster-name \ --region=region \ --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \ -- pi 1000 10000000
Enquanto o job estiver em execução, reduza o cluster usando a desativação otimizada.
Os nós serão removidos do cluster rapidamente, antes de o job ser concluído, minimizando a perda de progresso do job. As pausas temporárias no progresso do job podem ocorrer devido a estes fatores:gcloud dataproc clusters update cluster-name \ --region=region \ --num-secondary-workers=0 \ --graceful-decommission-timeout=1h
- Failover de mestre do aplicativo. Se o progresso do job cair para 0% e, em seguida, ir imediatamente para o valor da ação de soltar, o mestre do aplicativo poderá ter sido encerrado e um novo mestre do aplicativo pode ter recuperado o estado. Isso não deve afetar significativamente o progresso do job, já que o failover acontece rapidamente.
- Preempção de VM: Como o HDFS preserva apenas as saídas de tarefas de mapeamento completas e não parciais, as pausas temporárias no andamento do job podem ocorrer quando uma VM é interrompida enquanto trabalha em uma tarefa de mapeamento.
Para acelerar a remoção de nós, é possível reduzir escala vertical o
cluster sem a desativação otimizada omitindo a
flag --graceful-decommission-timeout
no exemplo de comando
gcloud
anterior. O progresso do job de tarefas de mapa concluídas será preservado, mas a saída de tarefa de mapa parcialmente concluída será perdida (as tarefas de mapa serão executadas novamente).