Como fazer o escalonamento automático de clusters

É difícil estimar o número "certo" de workers de cluster (nodes) para uma carga de trabalho e o uso de uma configuração única para um canal inteiro não é o procedimento ideal. Há duas maneiras de enfrentar esse desafio com o Cloud Dataproc:

  1. Escalonamento de cluster iniciado pelo Usuário
  2. Escalonamento automático de cluster, que escalona os clusters para mais e para menos, de forma dinâmica, a partir dos limites mínimo e máximo definidos pelo usuário, após avaliar as métricas do cluster do da YARN do Apache Hadoop encontradas no Stackdriver Monitoring

Recomendações para o escalonamento automático

Use o escalonamento automático com:

clusters que armazenam dados em serviços externos, como o Cloud Storage ou o BigQuery

clusters que processam muitos jobs

Use o escalonamento automático para:

escalonar aumento de clusters de job único

O escalonamento automático não é recomendado com/para:

  • HDFS: o escalonamento automático não se destina ao escalonamento de HDFS no cluster. Se você usar o escalonamento automático com HDFS, verifique se o número mínimo de workers principais é suficiente para lidar com todos os dados do HDFS. Observe também que, desativar os Datanodes HDFS pode atrasar o processo de remoção de workers.

  • Structured Streaming do Spark: o escalonamento automático não é compatível com o Structured Streaming do Spark. Veja Escalonamento automático e Structured Streaming do Spark.

  • Clusters inativos: o escalonamento automático não é recomendado para escalonar um cluster até seu tamanho mínimo, quando ele estiver ocioso. O tempo necessário para a criação ou o redimensionamento de um cluster é o mesmo, por isso, pense em excluir clusters inativos e criar novos. As seguintes ferramentas são compatíveis com esse modelo “efêmero”:

    Para agendar um grupo de jobs em um cluster dedicado, use os fluxos de trabalho do Cloud Dataproc e exclua o cluster quando as tarefas terminarem. Para uma orquestração mais avançada, use o Cloud Composer, baseado no Apache Airflow.

    Para clusters que processam consultas ad hoc ou cargas de trabalho agendadas externamente, use a Exclusão agendada de cluster para excluir o cluster após um determinado período de ociosidade ou em um horário específico.

Como criar um cluster de escalonamento automático

É possível criar um cluster de escalonamento automático definindo todas as propriedades de escalonamento automático obrigatórias. As propriedades são especificadas com um prefixo "dataproc: alpha.autoscaling.". Os sufixos de propriedade de escalonamento automático estão listados na tabela Propriedades de escalonamento automático.

No exemplo a seguir, é criado um cluster de escalonamento automático com a definição das propriedades de escalonamento automático necessárias:

gcloud beta dataproc clusters create cluster-name --properties "\
dataproc:alpha.autoscaling.enabled=true,\
dataproc:alpha.autoscaling.primary.max_workers=100,\
dataproc:alpha.autoscaling.secondary.max_workers=100,\
dataproc:alpha.autoscaling.cooldown_period=1h,\
dataproc:alpha.autoscaling.scale_up.factor=0.05,\
dataproc:alpha.autoscaling.graceful_decommission_timeout=1h"

Tamanho inicial do cluster

Quando um cluster de escalonamento automático é criado, não é necessário definir o número inicial de workers principais e secundários. O escalonamento automático será iniciado a partir do tamanho padrão do cluster (2 workers principais e 0 secundários) e será escalonado conforme necessário. No entanto, a definição de um número inicial de workers principais pode melhorar o desempenho. Por exemplo, se você tiver certeza de que precisa de um cluster grande, será possível defini-lo com um tamanho grande na criação. Assim, evita-se a necessidade de espera durante os frequentes períodos de escalonamento automático, necessários para que o cluster seja ampliado.

Como o escalonamento automático funciona

As métricas da YARN do Hadoop do cluster são verificadas pelo escalonamento automático, no decorrer de cada período de "resfriamento", para definir se o cluster deve ser escalonado e qual a magnitude da atualização.

  1. Em cada avaliação, a média da memória de cluster pendente e disponível durante o último cooldown_period é examinada pelo escalonamento automático, para determinar a alteração exata necessária no número de workers:

    exact Δworkers = avg(pending memory - available memory) / memory per worker

    • pending memory é um sinal de que há tarefas em fila no cluster, ainda não executadas, e que talvez seja necessário escaloná-lo para que haja melhor tratamento da respectiva carga de trabalho.
    • available memory é um sinal de que o cluster tem largura de banda extra e pode ser necessário reduzi-lo para economizar recursos.
    • Para mais informações sobre essas métricas do YARN do Apache Hadoop, veja Escalonamento automático com o Hadoop e o Spark.
  2. Uma vez definida a alteração necessária no número de workers, é usado um scale_up.factor ou scale_down.factor no escalonamento automático para calcular qual é de fato a alteração a ser feita no número de workers:

    actual Δworkers = exact Δworkers * scale_factor

    Um fator de escala de 1.0 mostra que o escalonamento automático será feito de forna que a memória pendente/disponível seja 0, utilização perfeita.

  3. Uma vez definida a alteração necessária no número de workers, o min_worker_fraction é usado como um limite para determinar se o cluster será escalonado pelo escalonamento automático. Um min_worker_fraction pequeno mostra que o escalonamento automático deve ser realizado mesmo que o Δworkers seja pequeno. Um min_worker_fraction maior mostra que o escalonamento só deve ocorrer quando o Δworkers for grande.

    if (Δworkers > min_worker_fraction * cluster size) then scale

  4. Se o número de trabalhadores a escalonar for grande o suficiente para acionar o processo, no escalonamento automático são usados os limites min/max dos grupos de workers e o secondary_worker_fraction configurado para determinar como dividir o número de trabalhadores entre os grupos de instâncias principais e secundárias. O resultado desses cálculos é a alteração final de escalonamento automático no cluster para o período de escalonamento.

Propriedades do escalonamento automático

O escalonamento automático é configurado a partir da definição de propriedades. Na tabela a seguir:

  • os workers padrão (nodes) são chamados de workers "principais";
  • os workers preemptivos (nodes) são chamados de workers "secundários";
  • as propriedades obrigatórias não têm padrão;
  • os sufixos de propriedade estão listados na tabela. Os sufixos são adicionados no final do prefixo dataproc:alpha.autoscaling. para formar o nome da propriedade (por exemplo, o sufixo da propriedade usada para ativar o escalonamento automático é enabled e o nome completo da propriedade é dataproc:alpha.autoscaling.enabled).
Sufixo de propriedade Descrição
enabled Permite escalonamento automático
Obrigatório: booleano - true ou false
primary.min_workers Número mínimo de workers principais
Opcional: Limites int: [2, primary.max_workers] Padrão: 2
primary.max_workers Número máximo de workers principais
Obrigatório : Limites int: [primary.min_workers,)
min_workers Número mínimo de workers secundários
Opcional: Limites int: [0, secondary.max_workers] Padrão: 0
secondary.max_workers Número máximo de workers secundários. Como as VMs secundárias são encerradas pelo menos a cada 24 horas, elas são mais adequadas para cargas de trabalho curtas e não críticas. Defina como 0 para evitar o uso de workers secundários.
Obrigatório: Limites int: [secondary.max_workers,)
secondary_worker_fraction Fração alvo de workers secundários. O cluster pode não atingir essa fração se os limites de contagem de workers não permitirem isso. Por exemplo, se secondary.max_workers=0, apenas workers principais serão adicionados. O cluster também pode ficar desbalanceado quando for criado.
Opcional: Limites duplos : [0,0, 1,0] Padrão: 0,5
cooldown_period Duração entre eventos de escalonamento. Um período de escalonamento começa após a conclusão da atualização do evento anterior.
Obrigatório: Limites de duração (s, m, h, d): [10m, )
scale_up.factor Fração de memória pendente média no último período de "resfriamento" para adicionar workers. Um fator de aumento de escala de 1,0 resultará no escalonamento para que não haja memória pendente restante após a atualização (escalonamento mais agressivo). Um fator de aumento de escala mais próximo de 0 resultará em uma magnitude menor de escalonamento (escalonamento menos agressivo).
Obrigatório: Limites duplos : [0,0, 1,0]
scale_up.min_worker_fraction Limite mínimo de aumento de escala como uma fração do tamanho total do cluster antes de ocorrer o escalonamento. Por exemplo, em um cluster com 20 workers, um limite de 0,1 mostra que o autoescalador precisa recomendar um aumento de pelo menos 2 workers, para que o cluster escalone. Um limite de 0 mostra que o autoescalador escalonará um aumento em qualquer alteração recomendada.
Opcional: Limites duplos: [0,0, 1,0] Padrão: 0,0
scale_down.factor Fração de memória pendente média no último período de resfriamento a ser usada para remover workers. Um fator de redução de escala 1 resultará na redução de escala para que não haja memória disponível restante após a atualização (escalonamento mais agressivo). Um fator de redução de escala de 0 desabilita a remoção de workers, o que pode ser benéfico para o escalonamento automático de um único job.
Opcional :Limites duplos: [0,0, 1,0] Padrão: 1,0
min_worker_fraction Limite mínimo de redução de escala como uma fração do tamanho total do cluster antes de ocorrer o escalonamento. Por exemplo, em um cluster de 20 workers, um limite de 0,1 mostra que o autoescalador precisa recomendar que o cluster escalone uma redução de pelo menos 2 workers. Um limite de 0 mostra que o autoescalador reduzirá em qualquer alteração recomendada.
Opcional: Limites duplos: [0,0, 1,0] Padrão: 0,0
graceful_decommission_timeout Tempo limite para desativação otimizada da YARN dos administradores do node. Especifica o tempo de espera para que os jobs sejam concluídos antes de remover os workers e potencialmente interromper os jobs. Aplicável apenas a operações de redução da necessidade de escalonamento.
Obrigatório: Limites de duração (s, m, h, d): [0s, 1d]

Como fazer o escalonamento automático com o Apache Hadoop e o Apache Spark

Nas seções a seguir, abordamos a maneira de interoperação do escalonamento automático com o YARN do Hadoop e o Hadoop Mapreduce e com o Apache Spark, o Spark Streaming e o Spark Structured Streaming.

Métricas do YARN do Hadoop

A configuração do YARN do Hadoop para agendar jobs com base nas solicitações de memória do YARN é feita no escalonamento automático e não nas solicitações principais do YARN.

O escalonamento automático é centralizado nas seguintes métricas do YARN do Hadoop:

  1. Allocated memory com que é feita referência à memória total do YARN, ocupada com a execução de contêineres em todo o cluster. Se houver 6 contêineres em execução que podem usar até 1 GB, haverá 6 GB de memória alocada.

  2. Available memory é a memória YARN no cluster, não utilizada pelos contêineres alocados. Se houver 10 GB de memória em todos os gerenciadores de node e 6 GB de memória alocada, há 4 GB de memória disponível. Se houver memória disponível (não utilizada) no cluster, com o escalonamento automático é possível remover workers do cluster.

  3. Pending memory é a soma das solicitações de memória do YARN para contêineres pendentes. Os contêineres pendentes aguardam espaço para serem executados no YARN. A memória pendente só é diferente de zero se a memória disponível for zero ou pequena demais para ser alocada no próximo container. Se houver contêineres pendentes, com o escalonamento automático será possível adicionar trabalhadores ao cluster.

É possível visualizar essas métricas no Stackdriver monitoring. A memória do YARN será 0,8 * de memória total no cluster. A memória restante é reservada para outros daemons e para o uso do sistema operacional, por exemplo, para realizar o cache de páginas.

Escalonamento automático e o Hadoop MapReduce

Com o MapReduce é executado cada mapa e reduzida a tarefa como um contêiner YARN separado. Quando um job começa, por meio do MapReduce são enviadas solicitações de contêiner para cada tarefa do mapa, resultando em grande aumento na memória pendente do YARN. À medida que as tarefas do mapa terminam, a memória pendente diminui.

Quando mapreduce.job.reduce.slowstart.completedmaps são concluídos (95% por padrão no Cloud Dataproc), com o MapReduce são enfileiradas as solicitações de contêiner para todos os redutores, resultando em outro pico na memória pendente.

A menos que o mapeamento e a tarefas de redução demorem vários minutos ou mais, não defina um valor alto para dataproc:alpha.autoscaling.scale_up.factor. Adicionar workers ao cluster demora pelo menos 1,5 minuto, portanto, verifique se há trabalho pendente suficiente para utilizar o novo worker por vários minutos. Um bom ponto de partida é definir dataproc:alpha.autoscaling.scale_up.factor como 0,05 (5%) ou 0,1 (10%) da memória pendente.

Escalonamento automático e o Spark

Por meio do Spark, uma camada extra de agendamento é adicionada no topo do YARN. Com a alocação dinâmica do Spark Core, são feitas solicitações ao YARN para que os contêineres rodem os executores do Spark e, em seguida, é feito o agendamento das tarefas do Spark, nas threads desses executores. Com os clusters do Cloud Dataproc é feita a alocação dinâmica por padrão, portanto, os executores são adicionados e removidos conforme necessário.

No Spark, sempre é feita a solicitação de contêineres para o YARN, mas sem a alocação dinâmica, a solicitação só é feita no início do job. Com a alocação dinâmica, será feita a solicitação para a remoção de contêineres ou criação de novos, conforme a necessidade.

O Spark é iniciado com um número reduzido de executores - 2, em clusters de escalonamento automático - e o número de executores é duplicado enquanto houver tarefas acumuladas. Isso suaviza a memória pendente (menor número de picos de memória pendentes). É recomendado definir dataproc:alpha.autoscaling.scale_up.factor para um número grande, como 1.0 (100%), para jobs do Spark.

Como um padrão do Spark, os executores com dados armazenados em cache não saem (spark.dynamicAllocation.cachedExecutorIdleTimeout=0) e os executores sem dados em cache sairão após 60 segundos (spark.dynamicAllocation.executorIdleTimeout=60). No Cloud Dataproc, a configuração do serviço de shuffle é baseada em YARN externo, por isso, os executores geralmente saem antes que seus dados de reprodução aleatória sejam exibidos. Portanto, defina um cachedExecutorIdleTimeout diferente de zero para que os executores com dados em cache saiam. Defina também um dataproc:alpha.autoscaling.graceful_decommission_timeout diferente de zero para drenar dados aleatórios dos workers antes que o escalonamento automático os elimine.

Como desativar a alocação dinâmica do Spark

Se você estiver executando trabalhos separados do Spark, que não aproveitem a alocação dinâmica do Spark, será possível desabilitar essa funcionalidade configurando spark.dynamicAllocation.enabled=false e definindo spark.executor.instances. É possível ainda usar o escalonamento automático para escalonar os clusters para mais ou para menos, enquanto os jobs separados do Spark são executados.

Escalonamento automático e o Spark Streaming

Para usar o Spark Streaming com o escalonamento automático:

  1. Como o Spark Streaming tem a própria versão de alocação dinâmica, que usa sinais específicos de fluxo para adicionar e remover executores, defina spark.streaming.dynamicAllocation.enabled=true e desative a alocação dinâmica do Spark Core definindo spark.dynamicAllocation.enabled=false.

  2. Até que o problema da alocação dinâmica do Spark Streaming deve respeitar spark.executor.instances seja corrigido, use uma ação de inicialização para remover spark.executor.instances de /etc/spark/conf/spark-defaults.conf.

  3. A desativação otimizada (dataproc:alpha.autoscaling.graceful_decommission_timeout) não se aplica a tarefas do Spark Streaming. Em vez disso, para remover com segurança o worker com o escalonamento automático, configure o ponto de verificação para tolerância a falhas.

Como alternativa, para usar o Spark Streaming sem o escalonamento automático:

  1. desative a alocação dinâmica do Spark Core (spark.dynamicAllocation.enabled=false);
  2. defina o número de executores (spark.executor.instances) para seu job. Veja Propriedades do cluster.

Escalonamento automático e o Spark Structured Streaming

O escalonamento automático não é compatível com o Spark Structured Streaming, porque este último não oferece suporte à alocação dinâmica. Veja SPARK-24815: o Structured Streaming deve suportar a alocação dinâmica.

Considerações e recomendações da redução da necessidade de escalonamento

  • Dados do shuffle: no Hadoop MapReduce, os dados do shuffle são gravados no disco local pelas tarefas do mapa. O disco local é veiculado para tarefas menores por meio de um servidor em execução no gerenciador de nodes. A remoção de workers com dados do shuffle é problemática, mesmo quando as tarefas não estão em execução, porque isso pode atrasar o andamento do job se for necessário executar as tarefas do mapa novamente. No Spark, os dados também são embaralhados entre os limites do cenário. Se for detectada ausência de arquivos do shuffle, todo o cenário será executado novamente.

    Recomendação: em clusters com vários jobs, defina dataproc:alpha.autoscaling.graceful_decommission_timeout para que os jobs em andamento tenham tempo suficiente para serem concluídos antes que o escalonamento automático remova os workers. É melhor definir o tempo limite conforme a duração do job mais demorado, para garantir que todos os trabalhos sejam concluídos antes que os workers sejam removidos.

    Estratégia alternativa 1: para que os workers não sejam removidos, independentemente do tamanho da memória disponível, desative a redução da necessidade do escalonamento automático definindo dataproc:alpha.autoscaling.scale_down.factor=0.0. Essa estratégia pode ser útil para clusters de trabalho único, em que redução da necessidade de escalonamento pode atrasar o andamento do job.

    Estratégia alternativa 2: para reduzir o escalonamento apenas quando o cluster estiver inativo, configure o cluster definindo dataproc:alpha.autoscaling.scale_down.factor=1.0 e dataproc:alpha.autoscaling.scale_down.min_worker_fraction=1.0.

  • Dados em cache: com o Spark, é possível armazenar conjuntos de dados em cache, na memória do executor ou no disco. Os executores do Spark saem quando não há trabalho para processar, no entanto, se houver dados em cache, por padrão, eles nunca saem. Portanto, para notebooks e aplicativos que mantenham dados em cache, os executores permanecerão ativos depois que os aplicativos não forem mais necessários, e o escalonamento automático não escalonará o cluster.

    Recomendação: para garantir que os executores com dados em cache saiam, configure spark.dynamicAllocation.cachedExecutorIdleTimeout ou retire os conjuntos de dados do cache quando não forem mais necessários.

Como controlar o escalonamento automático com particionamento e paralelismo

Enquanto o paralelismo é geralmente definido ou determinado por recursos de cluster (por exemplo, o número de HDFS bloqueia os controles pelo número de tarefas), com o escalonamento automático ocorre o contrário. Os recursos de cluster (workers) são definidos no escalonamento automático, de acordo com o paralelismo de jobs. A seguir, as diretrizes para definir o paralelismo de jobs:

  • Embora o Cloud Dataproc defina o número padrão de tarefas de redução do MapReduce com base no tamanho inicial do cluster, é possível definir mapreduce.job.reduces para aumentar o paralelismo da fase de redução.
  • O paralelismo do Spark SQL e do Dataframe é determinado por spark.sql.shuffle.partitions e o padrão é 200.
  • As funções RDD do Spark são padronizadas para spark.default.parallelism, que está relacionado ao número de executores quando o job é iniciado. No entanto, todas as funções RDD que criam shuffles usam um parâmetro para o número de partições, o que substitui o spark.default.parallelism.

É preciso garantir que os dados sejam particionados de maneira uniforme. Se houver uma distorção significativa de chave, uma ou mais tarefas podem demorar bem mais do que outras, resultando em baixa utilização.

Configurações de propriedades padrão de escalonamento automático do Spark/Hadoop

Os clusters de escalonamento automático têm valores de propriedade de cluster padrão que ajudam a evitar a falha nos jobs, quando os workers principais são removidos ou quando os workers secundários são preteridos. É possível modificar os valores padrão ao criar um cluster com escalonamento automático. Veja Propriedades do cluster.

Padrões para aumentar o número máximo de novas tentativas para tarefas, mestres de aplicativos e cenários:

yarn:yarn.resourcemanager.am.max-attempts=10
mapred:mapreduce.map.maxattempts=10
mapred:mapreduce.reduce.maxattempts=10
spark:spark.task.maxFailures=10
spark:spark.stage.maxConsecutiveAttempts=10

Padrões para redefinir os contadores de novas tentativas (útil para jobs de execução lenta do Spark Streaming):

spark:spark.yarn.am.attemptFailuresValidityInterval=1h
spark:spark.yarn.executor.failuresValidityInterval=1h

Padrões para fazer com que o mecanismo de alocação dinâmica do Spark de inicialização lenta comece com um tamanho pequeno:

spark:spark.executor.instances=2

Métricas e registros de escalonamento automático

Os recursos e ferramentas a seguir podem ajudá-lo a monitorar operações de escalonamento automático e respectivos efeitos no seu cluster e nos seus jobs.

Stackdriver Monitoring

Use o Stackdriver Monitoring para:

  • ver as métricas usadas pelo escalonamento automático;
  • ver o número de administradores de nodes no cluster;
  • entender porque o escalonamento automático fez ou não o escalonamento do cluster.autoscaling-stackdriver1autoscaling-stackdriver2autoscaling-stackdriver3

Registro de auditoria do Stackdriver

Use o Cloud Audit Logging para verificar como o escalonamento automático redimensionou seu cluster.

Perguntas frequentes

O escalonamento automático pode ser ativado em clusters de alta disponibilidade e em clusters de node único?

O escalonamento automático pode ser ativado em clusters de alta disponibilidade, mas não em clusters de node único, porque estes não são compatíveis com redimensionamento.

É possível modificar as propriedades de escalonamento automático após a criação do cluster ou em um cluster em execução?

Não. As propriedades de escalonamento automático não podem ser modificadas após a criação de um cluster, nem podem ser adicionadas a clusters em execução.

É possível redimensionar manualmente um cluster de escalonamento automático?

Sim. É possível redimensionar manualmente um cluster para interromper intervalos, se o escalonamento automático estiver tomando decisões erradas ou para ajustar as configurações de carga de trabalho. No entanto, essas alterações terão apenas um efeito temporário, e o escalonamento automático acabará redimensionando novamente o cluster.

Em vez de redimensionar manualmente um cluster, pense em:

excluir o cluster e recriá-lo com uma configuração de carga de trabalho melhor.

Como acessar a ajuda do Cloud Dataproc.

Quais versões de imagens são compatíveis com o escalonamento automático? Quais são as versões da API?

O escalonamento automático é compatível com as versões 1.2.22+ e 1.3.0+. O escalonamento automático não é compatível com as versões 1.0 e 1.1. Veja a lista de versões do Cloud Dataproc . O escalonamento automático está na versão Alfa, mas pode ser ativado por meio das APIs do Cloud Dataproc v1 ou v1beta2.

Qual a diferença entre o escalonamento automático do Cloud Dataproc e do Cloud Dataflow?

Veja Comparação entre o escalonamento automático do Cloud Dataflow, do Spark e do Hadoop

O escalonamento automático tem muitas propriedades configuráveis. Existe maneira mais simples de criar clusters de escalonamento automático?

É possível criar clusters a partir de arquivos do YAML. O formato do arquivo corresponde diretamente à API REST. Você precisa codificar as Propriedades de escalonamento automático como strings de propriedades do cluster.

Exemplo:

gcloud beta dataproc clusters create-from-file --file=cluster.yaml
$ cat cluster.yaml
projectId: PROJECT
clusterName: NAME
config:
  gceClusterConfig:
    zoneUri: us-central1-a
  softwareConfig:
    properties:
      dataproc:alpha.autoscaling.enabled: 'true'
      dataproc:alpha.autoscaling.primary.max_workers: '100'
      dataproc:alpha.autoscaling.secondary.max_workers: '100'
      dataproc:alpha.autoscaling.cooldown_period: '1h'
      dataproc:alpha.autoscaling.scale_up.factor: '0.05'
      dataproc:alpha.autoscaling.graceful_decommission_timeout: '1h'
Esta página foi útil? Conte sua opinião sobre:

Enviar comentários sobre…

Documentação do Cloud Dataproc
Precisa de ajuda? Acesse nossa página de suporte.