Modo de flexibilidade aprimorado do Dataproc

O modo de flexibilidade otimizada (EFM, na sigla em inglês) do Dataproc gerencia dados embaralhados para minimizar os atrasos de progresso do job causados pela remoção de nós de um cluster em execução. O EFM descarrega dados embaralhados em um dos dois modos selecionáveis pelo usuário:

  1. Embaralhamento do worker principal. Mapeadores gravam dados nos principais workers. Os workers extraem esses nós remotos durante a fase de redução. Esse modo só está disponível e é recomendado para jobs do Spark.

  2. Embaralhamento de HCFS (Hadoop Compatible File System) Mapeadores gravam dados em uma implementação do HCFS (HDFS por padrão). Assim como no modo de worker primário, somente os workers principais participam das implementações do HDFS e do HCFS. Se o HCFS usar o Conector do Cloud Storage, os dados serão armazenados fora do cluster. Esse modo pode beneficiar jobs com pequenas quantidades de dados, mas, devido a limitações de escalonamento, ele não é recomendado para jobs maiores.

Como os dois modos de EFM não armazenam dados de shuffle intermediários em workers secundários, o EFM é adequado para clusters que usam VMs preemptivas ou apenas escalonam automaticamente o grupo de workers secundário.

Limitações:

  • Os jobs do Apache Hadoop YARN que não são compatíveis com a realocação do AppMaster podem falhar no modo de flexibilidade aprimorada. Consulte Quando esperar que o AppMasters seja concluído.
  • O modo de flexibilidade aprimorada não é recomendado:
    • em um cluster que tem apenas workers principais
    • em jobs de streaming, já que pode levar até 30 minutos após a conclusão do job para limpar os dados de embaralhamento intermediários.
  • O Modo de flexibilidade aprimorado não é compatível:
    • quando o escalonamento automático do worker primário está ativado. Na maioria dos casos, os workers principais ainda armazenam dados embaralhados que não são migrados automaticamente. A redução do grupo de workers primário diminui os benefícios do EFM.
    • quando os jobs do Spark são executados em um cluster com desativação otimizada ativada. A desativação otimizada e a EFM podem funcionar de forma cruzada, já que o mecanismo de desativação otimizada do YARN mantém os nós DECOMMISSIONING até que todos os aplicativos envolvidos sejam concluídos.

Como usar o modo de flexibilidade aprimorada

O modo de flexibilidade aprimorado é configurado por mecanismo de execução e precisa ser configurado durante a criação do cluster.

  • A implementação do EEFM do Spark é configurada com a propriedade de cluster dataproc:efm.spark.shuffle. Valores de propriedade válidos:

    • primary-worker para embaralhamento de worker primário (recomendado)
    • hcfs para embaralhamento baseado no HCFS. Esse modo foi descontinuado e está disponível apenas em clusters que executam a versão de imagem 1.5. Não recomendado para novos fluxos de trabalho.
  • A implementação do Hadoop MapReduce é configurada com a propriedade de cluster dataproc:efm.mapreduce.shuffle. Valores de propriedade válidos:

    • hcfs

Exemplo: crie um cluster com o embaralhamento principal do worker para o Spark e o HCFS embaralhado para o MapReduce:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Exemplo do Apache Spark

  1. Execute um job WordCount em texto de Shakespeare público usando o jar de exemplos do Spark no cluster do EFM.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Exemplo de MapReduce do Apache Hadoop

  1. Execute um job pequeno do teragen para gerar dados de entrada no Cloud Storage para um job do terater posterior usando o jar de exemplos de MapReduce no cluster do EFM.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. Execute um job de terasort nos dados.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Como configurar SSDs locais para embaralhamento de worker primário

As implementações de embaralhamento primário e de HDFS gravam dados embaralhados intermediários em discos conectados à VM e se beneficiam da capacidade e das IOPS adicionais oferecidas pelos SSDs locais. Para facilitar a alocação de recursos, segmente uma meta de aproximadamente uma partição SSD local por quatro vCPUs ao configurar máquinas de trabalho principais.

Para anexar SSDs locais, transmita a flag --num-worker-local-ssds para o comando gcloud dataproc clusters create.

Geralmente, não é necessário ter SSDs locais em workers secundários. Adicionar SSDs locais aos workers secundários de um cluster (usando a flag --num-secondary-worker-local-ssds) geralmente é menos importante porque os workers secundários não gravam dados embaralhados localmente. No entanto, como os SSDs locais melhoram o desempenho do disco local, você pode adicionar SSDs locais a workers secundários se espera que os jobs sejam limitados a E/S devido ao uso do disco local: o job usa um disco local significativo para espaço de trabalho ou suas partições são grandes demais para caber na memória e serão transferidas para o disco.

Proporção de workers secundários

Como os workers secundários gravam os dados embaralhados nos workers principais, o cluster precisa conter um número suficiente de workers principais com recursos de CPU, memória e disco suficientes para acomodar a carga de embaralhamento do job. Para clusters de escalonamento automático, para impedir que o grupo principal seja escalonado e cause um comportamento indesejado, defina minInstances como o valor maxInstances na política de escalonamento automático para o grupo de workers principal.

Se você tiver uma alta proporção de workers secundários em relação ao primário, por exemplo, 10:1, monitore a utilização da CPU, a rede e o uso de disco para workers principais para determinar se eles estão sobrecarregados. As etapas para fazer isso:

  1. Acesse a página Instâncias de VM no console do Google Cloud .

  2. Clique na caixa de seleção do lado esquerdo do worker primário.

  3. Clique na guia "MONITORAMENTO" para visualizar a utilização da CPU do worker primário, as IOPS de disco, os bytes de rede e outras métricas.

Se os workers principais estiverem sobrecarregados, considere aumentar os workers manualmente manualmente.

Como redimensionar o grupo de trabalho principal

O grupo de workers primário pode ser escalonado com segurança, mas a redução do grupo de workers primário pode afetar negativamente o progresso do job. As operações que reduzem o grupo do worker primário precisam usar a desativação otimizada, que é ativada definindo a flag --graceful-decommission-timeout.

Clusters com escalonamento automático: o escalonamento do grupo de workers principais é desativado em clusters de EFM com políticas de escalonamento automático. Para redimensionar o grupo de workers primário em um cluster com escalonamento automático:

  1. Desative o escalonamento automático.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. Escalone o grupo principal.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. Reative o escalonamento automático:

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

Como monitorar o uso do disco do worker principal

Os workers principais precisam ter espaço em disco suficiente para os dados embaralhados do cluster. É possível monitorar esse recurso indiretamente por meio da métrica remaining HDFS capacity. Conforme o disco local é preenchido, o espaço fica indisponível para o HDFS, e a capacidade restante diminui.

Por padrão, quando o disco local de um worker primário excede 90% da capacidade, o nó é marcado como UNHEALTHY na IU do nó YARN. Se você tiver problemas de capacidade do disco, exclua dados não utilizados do HDFS ou escalone o pool de workers principal.

Configuração avançada

Particionamento e paralelismo

Ao enviar um job do MapReduce ou Spark, configure um nível apropriado de particionamento. A decisão sobre o número de partições de entrada e saída para um estágio de embaralhamento envolve um equilíbrio entre diferentes características de desempenho. É melhor testar valores que funcionem para as formas do job.

Partições de entrada

O particionamento de entrada do MapReduce e do Spark é determinado pelo conjunto de dados de entrada. Durante a leitura de arquivos do Cloud Storage, cada tarefa processa aproximadamente um "tamanho de bloco" de dados.

  • Para jobs do Spark SQL, o tamanho máximo da partição é controlado por spark.sql.files.maxPartitionBytes. Considere aumentá-lo para 1 GB: spark.sql.files.maxPartitionBytes=1073741824.

  • Para jobs do MapReduce e Spark RDDs, o tamanho da partição normalmente é controlado com fs.gs.block.size, que tem um padrão de 128 MB. Considere aumentá-lo para 1 GB. Também é possível definir propriedades específicas de InputFormat, como mapreduce.input.fileinputformat.split.minsize e mapreduce.input.fileinputformat.split.maxsize.

    • Para jobs do MapReduce: --properties fs.gs.block.size=1073741824
    • Para RDDs do Spark: --properties spark.hadoop.fs.gs.block.size=1073741824

Partições de saída

O número de tarefas em estágios subsequentes é controlado por várias propriedades. Em jobs maiores que processam mais de 1 TB, considere ter pelo menos 1 GB por partição.

  • Para jobs do MapReduce, o número de partições de saída é controlado por mapreduce.job.reduces.

  • No Spark SQL, o número de partições de saída é controlado por spark.sql.shuffle.partitions.

  • Para jobs do Spark usando a API RDD, é possível especificar o número de partições de saída ou definir spark.default.parallelism.

Ordem de embaralhamento para embaralhamento de worker primário

A propriedade mais significativa é --properties yarn:spark.shuffle.io.serverThreads=<num-threads>. Observe que essa é uma propriedade YARN no nível do cluster, já que o servidor do Shuffle no Spark é executado como parte do Node Manager. O padrão é o dobro de vezes (2x) de núcleos na máquina (por exemplo, 16 linhas de execução em um n1-highmem-8). Se "Tempo bloqueado de leitura de embaralhamento" for maior que 1 segundo e os workers principais não atingirem os limites de rede, CPU ou de disco, considere aumentar o número de linhas de execução do servidor embaralhadas.

Em tipos de máquinas maiores, considere aumentar spark.shuffle.io.numConnectionsPerPeer, que tem o padrão 1. Por exemplo, defina-o com cinco conexões por par de hosts.

Como aumentar as tentativas

É possível configurar o número máximo de tentativas permitidas para mestres, tarefas e etapas de apps, definindo as seguintes propriedades:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

Como os mestres e as tarefas dos aplicativos são frequentemente encerrados em clusters que usam muitas VMs preemptivas ou escalonamento automático sem desativação otimizada, o aumento dos valores das propriedades acima nesses clusters pode ajudar. Observe que o uso do EFM com o Spark e a desativação otimizada não são compatíveis.

Como configurar o HDFS para embaralhamento de HCFS

Para melhorar o desempenho de embaralhamentos grandes, é possível diminuir a contenção de bloqueio no NameNode definindo dfs.namenode.fslock.fair=false. Observe que isso corre o risco de atender a solicitações individuais, mas pode melhorar a capacidade em todo o cluster. Para melhorar ainda mais o desempenho do NameNode, é possível anexar SSDs locais ao nó mestre definindo --num-master-local-ssds. Também é possível adicionar SSDs locais a workers primários para melhorar o desempenho do DataNode. Basta definir --num-worker-local-ssds.

Outros sistemas de arquivos compatíveis com Hadoop para embaralhamento de HCFS

Por padrão, os dados de embaralhamento EFM HCFS são gravados no HDFS, mas é possível usar qualquer Sistema de arquivos compatível com Hadoop (HCFS, na sigla em inglês). Por exemplo, você pode gravar aleatoriamente no Cloud Storage ou no HDFS de um cluster diferente. Para especificar um sistema de arquivos, aponte fs.defaultFS para o sistema de arquivos de destino ao enviar um job para o cluster.

Desativação otimizada do YARN em clusters de EFM

A Desativação otimizada do YARN pode ser usada para remover rapidamente os nós com impacto mínimo na execução de aplicativos. Para clusters de escalonamento automático, o tempo limite de desativação otimizada pode ser definido em uma AutoscalingPolicy anexada ao cluster de EFM.

Melhorias na EFM do MapReduce para a desativação otimizada

  1. Como os dados intermediários são armazenados em um sistema de arquivos distribuído, os nós podem ser removidos de um cluster de EFM assim que todos os contêineres em execução nesses nós forem concluídos. Em comparação, os nós não são removidos nos clusters padrão do Dataproc até que o aplicativo seja concluído.

  2. A remoção do nó não aguarda a conclusão dos mestres de aplicativos em um nó. Quando o contêiner mestre do aplicativo é encerrado, ele é reprogramado em outro nó que não está sendo desativado. O progresso do job não é perdido: o novo mestre do app recupera rapidamente o estado do mestre anterior do app lendo o histórico do job.

Como usar a desativação otimizada em um cluster de EFM com MapReduce

  1. Crie um cluster da EFM com o mesmo número de workers principais e secundários.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. Execute um job mapreduce que calcula o valor de pi usando o jar de exemplos do mapreduce no cluster.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. Enquanto o job estiver em execução, reduza o cluster usando a desativação otimizada.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    Os nós serão removidos do cluster rapidamente, antes de o job ser concluído, minimizando a perda de progresso do job. As pausas temporárias no progresso do job podem ocorrer devido a estes fatores:

    • Failover de mestre do aplicativo. Se o progresso do job cair para 0% e, em seguida, ir imediatamente para o valor da ação de soltar, o mestre do aplicativo poderá ter sido encerrado e um novo mestre do aplicativo pode ter recuperado o estado. Isso não deve afetar significativamente o progresso do job, já que o failover acontece rapidamente.
    • Preempção de VM: Como o HDFS preserva apenas as saídas de tarefas de mapeamento completas e não parciais, as pausas temporárias no andamento do job podem ocorrer quando uma VM é interrompida enquanto trabalha em uma tarefa de mapeamento.

Para acelerar a remoção de nós, é possível reduzir escala vertical o cluster sem a desativação otimizada omitindo a flag --graceful-decommission-timeout no exemplo de comando gcloud anterior. O progresso do job de tarefas de mapa concluídas será preservado, mas a saída de tarefa de mapa parcialmente concluída será perdida (as tarefas de mapa serão executadas novamente).