Ajustar o escalonamento automático horizontal para pipelines de streaming

Em pipelines de streaming com um alto volume de dados de entrada, geralmente há uma compensação entre custo e latência. Para manter a baixa latência, o Dataflow precisa adicionar workers à medida que o volume de tráfego aumenta. Outro fator é a rapidez com que o pipeline precisa escalonar verticalmente em resposta a alterações na taxa de dados de entrada.

O escalonador automático do Dataflow tem configurações padrão adequadas para muitas cargas de trabalho. No entanto, é recomendável ajustar esse comportamento para seu cenário específico. Por exemplo, uma latência média mais alta pode ser aceitável para reduzir custos, ou você pode querer que o Dataflow aumente mais rapidamente em resposta a picos de tráfego.

Para otimizar o escalonamento automático horizontal, ajuste os seguintes parâmetros:

Definir o intervalo de escalonamento automático

Ao criar um novo job de streaming, é possível definir o número inicial e máximo de workers. Para fazer isso, especifique as seguintes opções de pipeline:

Java

  • --numWorkers: o número inicial de workers disponíveis quando o pipeline começa a ser executado.
  • --maxNumWorkers: o número máximo de workers disponíveis para o pipeline.

Python

  • --num_workers: o número inicial de workers disponíveis quando o pipeline começa a ser executado.
  • --max_num_workers: o número máximo de workers disponíveis para o pipeline.

Go

  • --num_workers: o número inicial de workers disponíveis quando o pipeline começa a ser executado.
  • --max_num_workers: o número máximo de workers disponíveis para o pipeline.

Para jobs de streaming que usam o Streaming Engine, a sinalização --maxNumWorkers é opcional. O padrão é 100. Para jobs de streaming que não usam o Streaming Engine, --maxNumWorkers é necessário quando o Escalonamento automático horizontal está ativado.

O valor inicial de --maxNumWorkers também determina quantos discos permanentes são alocados para o job. Os pipelines de streaming são implantados com um pool fixo de discos permanentes equivalente ao número de --maxNumWorkers. Durante o streaming, os discos permanentes são redistribuídos para que cada worker tenha um número igual de discos anexados.

Se você definir --maxNumWorkers, verifique se o valor corresponde ao número suficiente de discos para o pipeline. Considere o crescimento futuro ao definir o valor inicial. Para informações sobre o desempenho dos discos permanentes, consulte Configurar discos permanentes e VMs. O Dataflow cobra pelo uso do disco permanente e tem cotas do Compute Engine, incluindo as de disco permanente.

Por padrão, o número mínimo de workers é 1 para jobs de streaming que usam o Streaming Engine e (maxNumWorkers/15), arredondado para cima, para jobs que não usam o Streaming Engine.

Atualizar o intervalo de escalonamento automático

Para jobs que usam o Streaming Engine, você pode ajustar o número mínimo e máximo de workers sem parar ou substituir o job. Para ajustar esses valores, use uma atualização de job em andamento. Atualize as seguintes opções de job:

  • --min-num-workers: o número mínimo de workers.
  • --max-num-workers: o número máximo de workers.

gcloud

Use o comando gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

Substitua:

  • REGION: o ID da região do endpoint regional do job
  • MINIMUM_WORKERS: o número mínimo de instâncias do Compute Engine
  • MAXIMUM_WORKERS: o número máximo de instâncias do Compute Engine
  • JOB_ID: o ID do job a ser atualizado

Também é possível atualizar --min-num-workers e --max-num-workers individualmente.

REST

Use o método projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud do job do Dataflow
  • REGION: o ID da região do endpoint regional do job
  • JOB_ID: o ID do job a ser atualizado
  • MINIMUM_WORKERS: o número mínimo de instâncias do Compute Engine
  • MAXIMUM_WORKERS: o número máximo de instâncias do Compute Engine

Também é possível atualizar min_num_workers e max_num_workers individualmente. Especifique quais parâmetros atualizar no parâmetro de consulta updateMask e inclua os valores atualizados no campo runtimeUpdatableParams do corpo da solicitação. O exemplo a seguir atualiza min_num_workers:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Para jobs que não usam o Streaming Engine, é possível substituir o job atual por um valor atualizado de maxNumWorkers.

Se você atualizar um job de streaming que não use o Streaming Engine, o job atualizado terá o Escalonamento automático horizontal desativado por padrão. Para manter o escalonamento automático ativado, especifique --autoscalingAlgorithm e --maxNumWorkers para o job atualizado.

Definir a dica de utilização do worker

O Dataflow usa a utilização média da CPU como um sinal para aplicar o escalonamento automático horizontal. Por padrão, o Dataflow define uma meta de utilização da CPU de 0,8. Quando a utilização estiver fora desse intervalo, o Dataflow poderá adicionar ou remover workers.

Para ter mais controle sobre o comportamento do escalonamento automático, defina a meta de uso da CPU como um valor no intervalo [0,1, 0,9].

  • Defina um valor mais baixo de utilização da CPU se quiser alcançar latências de pico mais baixas. Um valor menor permite que o Dataflow seja escalonado de maneira mais agressiva em resposta à crescente utilização do worker e reduza a escala de maneira mais conservadora para melhorar a estabilidade. Um valor menor também oferece margem quando o pipeline está em execução em um estado estável, geralmente resultando em uma latência de cauda mais baixa. A latência de cauda mede os tempos de espera mais longos antes do processamento de um novo registro.

  • Defina um valor mais alto se você quiser economizar recursos e manter os custos mais baixos quando o tráfego aumentar. Um valor maior impede o aumento excessivo da escala, à custa de maior latência.

Para configurar a dica de utilização ao executar um job sem modelo, defina a opção de serviço worker_utilization_hint. Para um job de modelo, atualize a dica de utilização, já que as opções de serviço não têm suporte.

O exemplo a seguir mostra como usar worker_utilization_hint:

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

Substitua TARGET_UTILIZATION por um valor no intervalo [0,1, 0,9].

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Substitua TARGET_UTILIZATION por um valor no intervalo [0,1, 0,9].

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

Substitua TARGET_UTILIZATION por um valor no intervalo [0,1, 0,9].

Para novos pipelines, recomendamos que você teste sob cargas realistas usando a configuração padrão. Em seguida, avalie o comportamento do escalonamento automático conforme ele se aplica ao seu pipeline e faça os ajustes necessários.

A dica de utilização é apenas um fator usado pelo Dataflow ao decidir se precisa escalonar ou não workers. Outros fatores, como backlog e chaves disponíveis, podem modificar o valor da dica. Além disso, a dica não é um alvo rigoroso. O escalonador automático tenta manter a utilização da CPU no intervalo do valor da dica, mas a métrica de utilização agregada pode ser maior ou menor. Para mais informações, consulte Heurísticas de escalonamento automático de streaming.

Atualizar a dica de utilização

Para atualizar a dica de utilização enquanto um job está em execução, execute uma atualização em andamento da seguinte maneira:

gcloud

Use o comando gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

Substitua:

  • REGION: o ID da região do endpoint regional do job
  • JOB_ID: o ID do job a ser atualizado
  • TARGET_UTILIZATION: um valor no intervalo [0,1, 0,9].

Para redefinir a dica de utilização para o valor padrão, use o seguinte comando gcloud:

gcloud dataflow jobs update-options \
  --unset-worker-utilization-hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

Use o método projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud do job do Dataflow.
  • REGION: o ID da região do endpoint regional do job.
  • JOB_ID: o ID do job a ser atualizado.
  • TARGET_UTILIZATION: um valor no intervalo [0,1, 0,9].

Heurística de escalonamento automático de streaming

Para pipelines de streaming, o objetivo do Escalonamento automático horizontal é minimizar o backlog e, ao mesmo tempo, maximizar a utilização e a capacidade de workers, além de permitir uma rápida reação a picos de carga.

O Dataflow considera vários fatores no escalonamento automático, incluindo:

  • Backlog. O tempo estimado do backlog é calculado com base na capacidade e nos bytes do backlog que ainda serão processados da origem de entrada. Um pipeline é considerado "com backlog" quando o tempo estimado do backlog permanece acima de 15 segundos.

  • Meta de uso da CPU. A meta padrão para o uso médio da CPU é 0,8. É possível substituir esse valor.

  • Chaves disponíveis. As chaves são a unidade fundamental de paralelismo no Dataflow.

Em alguns casos, o Dataflow usa os seguintes fatores nas decisões de escalonamento automático. Se esses fatores forem usados para seu job, será possível ver essas informações na guia de métricas Escalonamento automático.

  • A limitação baseada em chaves usa o número de chaves de processamento recebidas pelo job para calcular o limite de workers do usuário, já que cada chave só pode ser processada por um worker por vez.

  • Abrandamento de redução. Se o Dataflow detectar que houve decisões de escalonamento automático instáveis, ele diminuirá a taxa de redução para melhorar a estabilidade.

  • O aumento baseado em CPU usa a alta taxa de utilização da CPU como um critério de aumento.

  • Para jobs de streaming que não usam o Streaming Engine, o escalonamento pode ser restrito pelo número de discos permanentes. Para mais informações, consulte Definir o intervalo de escalonamento automático.

Upscaling. Se um pipeline de streaming permanecer atrasado com paralelismo suficiente nos workers por vários minutos, o Dataflow será escalonado verticalmente. O Dataflow tenta limpar o backlog em aproximadamente 150 segundos de escalonamento vertical, considerando a capacidade atual por worker. Se houver backlog, mas o worker não tiver paralelismo suficiente para workers adicionais, o pipeline não será escalonado verticalmente. Escalonar o número de workers além do número de chaves disponíveis para processamento paralelo não ajuda a processar o backlog mais rápido.

Redução de escalonamento Quando o escalonador automático toma uma decisão de redução, o backlog é o fator de prioridade mais alta. O escalonador automático visa um backlog de não mais de 15 segundos. Se o backlog cair para menos de 10 segundos e a utilização média do worker estiver abaixo da meta de utilização da CPU, o Dataflow será reduzido. Enquanto o backlog for aceitável, o escalonador automático tentará manter a utilização da CPU próxima da utilização de destino da CPU. No entanto, se a utilização já estiver suficientemente próxima da meta, o escalonador automático poderá manter o número de workers inalterado, porque cada etapa de redução tem um custo.

O Streaming Engine também usa uma técnica de escalonamento automático preditivo com base no backlog do timer. Os dados ilimitados em um pipeline de streaming são divididos em janelas agrupadas por carimbos de data/hora. No final de uma janela, os timers são acionados para cada chave processada nessa janela. O acionamento de um timer indica que a janela expirou para uma determinada chave. O Streaming Engine pode medir o backlog do timer e prever quantos timers serão acionados ao final de uma janela. Usando o backlog do timer como um sinal, o Dataflow pode estimar a quantidade de processamento que precisa acontecer quando os próximos timers forem disparados. Com base na estimativa da carga futura, o Dataflow é escalonado automaticamente para atender à demanda esperada.

Métricas

Para encontrar os limites atuais de escalonamento automático de um job, consulte as seguintes métricas:

  • job/max_worker_instances_limit: número máximo de workers
  • job/min_worker_instances_limit: número mínimo de workers

Para ver informações sobre a utilização de workers, consulte as métricas a seguir:

  • job/aggregated_worker_utilization: o uso agregado do worker.
  • job/worker_utilization_hint: a dica atual de utilização do worker.

Para receber insights sobre o comportamento do escalonador automático, consulte a métrica a seguir:

  • job.worker_utilization_hint_is_actively_used: indica se o escalonador automático está usando ativamente a dica de utilização do worker. Se outros fatores substituirem a dica durante a amostragem, o valor será false.
  • job/horizontal_worker_scaling: descreve as decisões tomadas pelo escalonador automático. Essa métrica contém os seguintes rótulos:
    • direction: especifica se o escalonador automático realizou o escalonamento vertical ou horizontal ou se não realizou nenhuma ação.
    • rationale: especifica a lógica da decisão do escalonador automático.

Para mais informações, consulte métricas do Cloud Monitoring. Essas métricas também são exibidas nos gráficos de monitoramento de escalonamento automático.

A seguir