Modo de flexibilidade aprimorado do Dataproc

O modo de flexibilidade otimizada (EFM, na sigla em inglês) do Dataproc gerencia dados embaralhados para minimizar os atrasos de progresso do job causados pela remoção de nós de um cluster em execução. O EFM grava dados de embaralhamento do Spark nos workers principais. Os workers extraem esses nós remotos durante a fase de redução.

Como o EFM não armazena dados de embaralhamento intermediários em workers secundários, ele é adequado para uso em clusters que usam VMs preemptivas ou apenas escalona automaticamente o grupo de workers secundários.

Limitações:

  • Os jobs do Apache Hadoop Hadoop que não são compatíveis com a realocação do AppMaster podem falhar no modo de flexibilidade avançada. Consulte Quando esperar a conclusão do AppMasters.
  • O modo de flexibilidade aprimorado não é recomendado:
    • em um cluster que tem apenas workers principais.
  • O modo de flexibilidade aprimorado não tem suporte:
    • quando o escalonamento automático do worker primário está ativado. Na maioria dos casos, os workers principais ainda armazenam dados embaralhados que não são migrados automaticamente. A redução do grupo de workers primário diminui os benefícios do EFM.
    • quando jobs do Spark são executados em um cluster com a desativação otimizada ativada. A desativação otimizada e o EFM podem funcionar em diferentes finalidades, já que o mecanismo de desativação otimizada do YaN mantém os nós de DECOMISSÃO até que todos os aplicativos envolvidos sejam concluídos.

Como usar o modo de flexibilidade aprimorada

O modo de flexibilidade aprimorado é configurado por mecanismo de execução e precisa ser configurado durante a criação do cluster. A implementação do EFM do Spark é configurada com a propriedade de cluster dataproc:efm.spark.shuffle=primary-worker.

Exemplo:crie um cluster com o embaralhamento do worker primário para o Spark:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Exemplo do Apache Spark

  1. Execute um job WordCount no texto público de Shakespeare usando o jar de exemplos do Spark no cluster EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Como configurar SSDs locais para embaralhamento de worker primário

As implementações de embaralhamento primário e de HDFS gravam dados embaralhados intermediários em discos conectados à VM e se beneficiam da capacidade e das IOPS adicionais oferecidas pelos SSDs locais. Para facilitar a alocação de recursos, segmente uma meta de aproximadamente uma partição SSD local por quatro vCPUs ao configurar máquinas de trabalho principais.

Para anexar SSDs locais, transmita a sinalização --num-worker-local-ssds para o comando gcloud dataproc clusters create.

Proporção de workers secundários

Como os workers secundários gravam dados de embaralhamento nos workers principais, o cluster precisa conter um número suficiente de workers principais com recursos suficientes de CPU, memória e disco para acomodar a carga de embaralhamento do job. Para clusters de escalonamento automático, para impedir que o grupo principal seja escalonado e cause comportamentos indesejados, defina minInstances como o valor maxInstances na política de escalonamento automático do grupo de workers principal.

Se você tiver uma alta proporção de workers secundários em relação ao primário, por exemplo, 10:1, monitore a utilização da CPU, a rede e o uso de disco para workers principais para determinar se eles estão sobrecarregados. Para isso, siga estas etapas:

  1. Acesse a página Instâncias de VM no console do Google Cloud.

  2. Clique na caixa de seleção do lado esquerdo do worker primário.

  3. Clique na guia "MONITORAMENTO" para visualizar a utilização da CPU do worker primário, as IOPS de disco, os bytes de rede e outras métricas.

Se os workers principais estiverem sobrecarregados, considere aumentar os workers manualmente manualmente.

Como redimensionar o grupo de trabalho principal

O grupo de workers primário pode ser escalonado com segurança, mas a redução do grupo de workers primário pode afetar negativamente o progresso do job. As operações que reduzem o grupo do worker primário precisam usar a desativação otimizada, que é ativada definindo a sinalização --graceful-decommission-timeout.

Clusters com escalonamento automático: o escalonamento do grupo de workers principais é desativado em clusters de EFM com políticas de escalonamento automático. Para redimensionar o grupo de workers primário em um cluster com escalonamento automático:

  1. Desative o escalonamento automático.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Dimensionar o grupo principal.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Reative o escalonamento automático:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Como monitorar o uso do disco do worker principal

Os workers principais precisam ter espaço em disco suficiente para os dados embaralhados do cluster. É possível monitorar esse recurso indiretamente por meio da métrica remaining HDFS capacity. Conforme o disco local é preenchido, o espaço fica indisponível para o HDFS, e a capacidade restante diminui.

Por padrão, quando o disco local de um worker primário excede 90% da capacidade, o nó é marcado como UNHEALTHY na IU do nó YARN. Se você tiver problemas de capacidade do disco, exclua dados não utilizados do HDFS ou escalone o pool de workers principal.

Os dados de embaralhamento intermediários geralmente não são limpos até o final de um job. Ao usar o embaralhamento do worker primário com o Spark, isso pode levar até 30 minutos após a conclusão de um job.

Configuração avançada

Particionamento e paralelismo

Ao enviar um job do MapReduce ou Spark, configure um nível apropriado de particionamento. A decisão sobre o número de partições de entrada e saída para um estágio de embaralhamento envolve um equilíbrio entre diferentes características de desempenho. É melhor testar valores que funcionem para as formas do job.

Partições de entrada

O particionamento de entrada do MapReduce e do Spark é determinado pelo conjunto de dados de entrada. Durante a leitura de arquivos do Cloud Storage, cada tarefa processa aproximadamente um "tamanho de bloco" de dados.

  • Para jobs do Spark SQL, o tamanho máximo da partição é controlado por spark.sql.files.maxPartitionBytes. Considere aumentá-lo para 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Para jobs do MapReduce e Spark RDDs, o tamanho da partição normalmente é controlado com fs.gs.block.size, que tem um padrão de 128 MB. Considere aumentá-lo para 1 GB. Também é possível definir propriedades específicas de InputFormat, como mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize.

    • Para jobs do MapReduce: --properties fs.gs.block.size=1073741824
    • Para RDDs do Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partições de saída

O número de tarefas em estágios subsequentes é controlado por várias propriedades. Em jobs maiores que processam mais de 1 TB, considere ter pelo menos 1 GB por partição.

  • Para jobs do MapReduce, o número de partições de saída é controlado por mapreduce.job.reduces.

  • No Spark SQL, o número de partições de saída é controlado por spark.sql.shuffle.partitions.

  • Para jobs do Spark usando a API RDD, é possível especificar o número de partições de saída ou definir spark.default.parallelism.

Ordem de embaralhamento para embaralhamento de worker primário

A propriedade mais significativa é --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Observe que essa é uma propriedade YARN no nível do cluster, já que o servidor do Shuffle no Spark é executado como parte do Node Manager. O padrão é o dobro de vezes (2x) de núcleos na máquina (por exemplo, 16 linhas de execução em um n1-highmem-8). Se "Tempo bloqueado de leitura de embaralhamento" for maior que 1 segundo e os workers principais não atingirem os limites de rede, CPU ou de disco, considere aumentar o número de linhas de execução do servidor embaralhadas.

Em tipos de máquinas maiores, considere aumentar spark.shuffle.io.numConnectionsPerPeer, que tem o padrão 1. Por exemplo, defina-o com cinco conexões por par de hosts.

Como aumentar as tentativas

É possível configurar o número máximo de tentativas permitidas para mestres, tarefas e etapas de apps, definindo as seguintes propriedades:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Como as tarefas e os mestres do app são encerrados com mais frequência em clusters que usam muitas VMs preemptivas ou escalonamento automático sem desativação otimizada, aumentar os valores das propriedades anteriores nesses clusters pode ajudar. não há suporte ao uso de EFM com Spark e desativação otimizada.

Desativação otimizada do YARN em clusters de EFM

A Desativação otimizada do YARN pode ser usada para remover rapidamente os nós com impacto mínimo na execução de aplicativos. Para clusters de escalonamento automático, o tempo limite de desativação otimizada pode ser definido em uma AutoscalingPolicy anexada ao cluster de EFM.

Melhorias no EFM para desativação otimizada

  1. Como os dados intermediários são armazenados em um sistema de arquivos distribuído, os nós podem ser removidos de um cluster de EFM assim que todos os contêineres em execução nesses nós forem concluídos. Em comparação, os nós não são removidos nos clusters padrão do Dataproc até que o aplicativo seja concluído.

  2. A remoção do nó não aguarda a conclusão dos mestres de aplicativos em um nó. Quando o contêiner mestre do aplicativo é encerrado, ele é reprogramado em outro nó que não está sendo desativado. O progresso do job não é perdido: o novo mestre do app recupera rapidamente o estado do mestre anterior do app lendo o histórico do job.