Resolver erros de falta de memória do Dataflow

Nesta página, você verá informações sobre o uso da memória nos pipelines do Dataflow e as etapas para investigar e resolver erros de falta de memória (OOM, na sigla em inglês) do Dataflow.

Sobre o uso da memória do Dataflow

Para solucionar erros de falta de memória, é útil entender como os pipelines do Dataflow usam a memória.

Quando o Dataflow executa um pipeline, o processamento é distribuído em várias máquinas virtuais (VMs) do Compute Engine, geralmente chamadas de workers. Os workers processam itens de trabalho do serviço do Dataflow e delegam os itens de trabalho aos processos do SDK do Apache Beam. Um processo do SDK do Apache Beam cria instâncias de DoFns. DoFn é uma classe do SDK do Apache Beam que define uma função de processamento distribuído.

O Dataflow inicia várias linhas de execução em cada worker, e a memória de cada worker é compartilhada em todas as linhas de execução. Uma linha de execução é uma única tarefa executável em um processo maior. O número padrão de linhas de execução depende de vários fatores e varia entre os jobs em lote e de streaming.

Se o pipeline precisar de mais memória do que a quantidade padrão disponível nos workers, talvez ocorram erros de falta de memória.

Os pipelines do Dataflow usam principalmente a memória do worker de três maneiras:

Memória operacional do worker

Os workers do Dataflow precisam de memória para os sistemas operacionais e processos do sistema. O uso da memória do worker normalmente não é maior que 1 GB. O uso normalmente é menor que 1 GB.

  • Vários processos do worker usam memória para garantir que o pipeline funcione. Cada um desses processos pode reservar uma pequena quantidade de memória para a operação.
  • Quando o pipeline não usa o Streaming Engine, outros processos de worker usam a memória.

Memória do processo do SDK

Os processos do SDK do Apache Beam podem criar objetos e dados compartilhados entre linhas de execução dentro do processo, chamados de objetos e dados compartilhados do SDK nesta página. O uso da memória desses objetos e dados compartilhados do SDK é chamado de memória de processo do SDK. A lista a seguir inclui exemplos de objetos e dados compartilhados do SDK:

  • Entradas secundárias
  • Modelos de machine learning
  • Objetos singleton na memória
  • Objetos Python criados com o módulo apache_beam.utils.shared
  • Dados carregados de fontes externas, como o Cloud Storage ou o BigQuery;

Jobs de streaming que não usam entradas secundárias de armazenamento do Streaming Engine na memória. Nos pipelines de Java e Go, cada worker tem uma cópia da entrada secundária. Para pipelines de Python, cada processo do SDK do Apache Beam tem uma cópia da entrada secundária.

Os jobs de streaming que usam o Streaming Engine têm um limite de tamanho de entrada secundária de 80 MB. As entradas secundárias são armazenadas fora da memória do worker.

O uso da memória de objetos e dados compartilhados do SDK cresce de maneira linear com o número de processos do SDK do Apache Beam. Em pipelines de Java e Go, um processo do SDK do Apache Beam é iniciado por worker. Nos pipelines do Python, um processo do SDK do Apache Beam é iniciado por vCPU. Os objetos e dados compartilhados do SDK são reutilizados em linhas de execução no mesmo processo do SDK do Apache Beam.

DoFn Uso da memória

DoFn é uma classe do SDK do Apache Beam que define uma função de processamento distribuído. Cada worker pode executar instâncias DoFn simultâneas. Cada linha de execução executa uma instância de DoFn. Ao avaliar o uso total da memória, calcular o tamanho do conjunto de trabalho ou a quantidade de memória necessária para que um aplicativo continue funcionando pode ser útil. Por exemplo, se uma DoFn individual usar no máximo 5 MB de memória e um worker tiver 300 linhas de execução, o uso da memória de DoFn poderá atingir 1,5 MB ou o número de bytes de memória multiplicado pelo número de linhas de execução. Dependendo de como os workers usam a memória, um pico no uso da memória pode fazer com que eles fiquem sem memória.

É difícil estimar quantas instâncias de um DoFn são criados. O número depende de vários fatores, como o SDK, o tipo de máquina e assim por diante. Além disso, o DoFn pode ser usado por vários threads sucessivamente. O serviço do Dataflow não garante quantas vezes uma DoFn é invocada, nem garante o número exato de instâncias de DoFn criadas durante um pipeline. A tabela a seguir fornece alguns insights sobre o nível de paralelismo esperado e estima um limite superior do número de instâncias de DoFn.

SDK do Beam para Python

Lote Streaming sem o Streaming Engine Streaming Engine
Paralelismo 1 processo por vCPU

1 linha de execução por processo

1 linha de execução por vCPU

1 processo por vCPU

12 linhas de execução por processo

12 linhas de execução por vCPU

1 processo por vCPU

12 linhas de execução por processo

12 linhas de execução por vCPU

Número máximo de instâncias DoFn simultâneas (esses números estão sujeitos a alterações a qualquer momento). 1 DoFn por linha de execução

1 DoFn por vCPU

1 DoFn por linha de execução

12 DoFn por vCPU

1 DoFn por linha de execução

12 DoFn por vCPU

SDK do Beam para Java/Go

Lote Streaming sem o Streaming Engine Streaming Engine
Paralelismo 1 processo por VM de worker

1 linha de execução por vCPU

1 processo por VM de worker

300 linhas de execução por processo

300 linhas de execução por VM de worker

1 processo por VM de worker

500 linhas de execução por processo

500 linhas de execução por VM de worker

Número máximo de instâncias DoFn simultâneas (esses números estão sujeitos a alterações a qualquer momento). 1 DoFn por linha de execução

1 DoFn por vCPU

1 DoFn por linha de execução

300 DoFn por VM de worker

1 DoFn por linha de execução

500 DoFn por VM de worker

Por exemplo, ao usar o SDK do Python com um worker do Dataflow n1-standard-2, o seguinte se aplica:

  • Jobs em lote: o Dataflow inicia um processo por vCPU (dois, neste caso). Cada processo usa uma linha de execução, e cada linha de execução cria uma instância de DoFn.
  • Jobs de streaming com o Streaming Engine: o Dataflow inicia um processo por vCPU (dois no total). No entanto, cada processo pode gerar até 12 linhas de execução, cada uma com a própria instância de DoFn.

Ao projetar pipelines complexos, é importante entender o ciclo de vida do DoFn. Garanta que as funções DoFn sejam serializáveis e evite modificar o argumento do elemento diretamente nelas.

Quando você tem um pipeline de várias linguagens e mais de um SDK do Apache Beam está em execução no worker, o worker usa o menor grau de paralelismo de linhas de execução por processo.

Diferenças de Java, Go e Python

Java, Go e Python gerenciam processos e memória de maneiras diferentes. Como resultado, a abordagem que você precisa seguir para resolver erros de falta de memória varia, se o pipeline usar Java, Go ou Python.

Pipelines Java e Go

Em pipelines de Java e Go:

  • Cada worker inicia um processo do SDK do Apache Beam.
  • Os objetos e dados compartilhados do SDK, como entradas laterais e caches, são compartilhados entre todas as linhas de execução no worker.
  • A memória usada por dados e objetos compartilhados do SDK geralmente não é escalonada com base no número de vCPUs no worker.

Pipelines do Python

Nos pipelines de Python:

  • Cada worker inicia um processo do SDK do Apache Beam por vCPU.
  • Objetos e dados compartilhados do SDK, como entradas secundárias e caches, são compartilhados entre todas as linhas de execução de cada processo do SDK do Apache Beam.
  • O número total de linhas de execução do worker é escalonado de maneira linear com base no número de vCPUs. Como resultado, a memória usada pelos objetos e dados compartilhados do SDK cresce de maneira linear com o número de vCPUs.
  • As linhas de execução que executam o trabalho são distribuídas entre os processos. Novas unidades de trabalho são atribuídas a um processo sem itens de trabalho ou ao processo com o menor número de itens de trabalho atualmente atribuídos.

Encontrar erros de memória insuficiente

Para determinar se o pipeline está ficando sem memória, use um dos métodos a seguir.

Java

O monitor de memória Java, configurado pelo MemoryMonitorOptions, gera relatórios periódicos de métricas de coleta de lixo. Se a fração do tempo de CPU usada para coleta de lixo excede um limite de 50% por um longo período, o arcabouço atual do SDK falhará.

Talvez você encontre um erro semelhante ao exemplo a seguir:

Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...

Esse erro de memória pode ocorrer quando a memória física ainda está disponível. O erro geralmente indica que o uso da memória do pipeline está ineficiente. Para resolver esse problema, otimize o pipeline.

Se o job tiver alto uso de memória ou erros de falta de memória, siga as recomendações desta página para otimizar o uso da memória ou aumentar a quantidade de memória disponível.

Resolver erros de falta de memória

Fazer alterações no pipeline do Dataflow pode resolver erros de falta de memória ou reduzir o uso da memória. As possíveis alterações incluem as seguintes ações:

O diagrama a seguir mostra o fluxo de trabalho da solução de problemas do Dataflow descrito nesta página.

Um diagrama que mostra o fluxo de trabalho da solução de problemas.

Otimizar o pipeline

Várias operações de pipeline podem causar erros de falta de memória. Nesta seção, apresentamos opções para reduzir o uso de memória do seu pipeline. Para identificar os estágios de pipeline que consomem mais memória, use o Cloud Profiler para monitorar o desempenho do pipeline.

Para otimizar seu pipeline, siga estas práticas recomendadas:

Usar conectores de E/S integrados do Apache Beam para ler arquivos

Não abra arquivos grandes em uma DoFn. Para ler arquivos, use os conectores de E/S integrados ao Apache Beam. Os arquivos abertos em uma DoFn precisam caber na memória. Como várias instâncias do DoFn são executadas simultaneamente, arquivos grandes abertos em DoFns podem causar erros de falta de memória.

Reformular operações ao usar PTransforms do GroupByKey

Ao usar uma PTransform do GroupByKey no Dataflow, os valores resultantes por chave e janela são processados em uma única linha de execução. Como esses dados são transmitidos como um fluxo do serviço de back-end do Dataflow para os workers, eles não precisam se ajustar à memória do worker. No entanto, se os valores forem coletados na memória, a lógica de processamento pode causar erros de falta de memória.

Por exemplo, se você tiver uma chave que contém dados de uma janela e adicionar os valores de chave a um objeto na memória, como uma lista, poderão ocorrer erros de falta de memória. Nesse cenário, o worker pode não ter capacidade de memória suficiente para conter todos os objetos.

Para mais informações sobre PTransforms do GroupByKey, consulte a documentação de GroupByKey em Python e de GroupByKey em Java.

A lista a seguir contém sugestões para projetar o pipeline para minimizar o consumo de memória ao usar PTransforms do GroupByKey.

  • Para reduzir a quantidade de dados por chave e por janela, evite chaves com muitos valores, também conhecidas como chaves de atalho.
  • Para reduzir a quantidade de dados coletados por janela, use uma janela menor.
  • Se você estiver usando valores de uma chave em uma janela para calcular um número, use uma transformação Combine. Não faça o cálculo em uma única instância DoFn depois de coletar os valores.
  • Filtre valores ou cópias antes do processamento. Para mais informações, consulte a documentação de transformação Filter em Python e Filter em Java.

Reduzir os dados de entrada de fontes externas

Se você fizer chamadas a uma API externa ou a um banco de dados para aprimoramento de dados, os dados retornados devem caber na memória do worker. Se você estiver recebendo chamadas em lote, é recomendável usar uma transformação GroupIntoBatches. Se você encontrar erros de memória insuficiente, reduza o tamanho do lote. Para mais informações sobre como agrupar em lotes, consulte a documentação de transformação GroupIntoBatches em Python e GroupIntoBatches em Java.

Compartilhar objetos em linhas de execução

O compartilhamento de um objeto de dados da memória em instâncias DoFn pode melhorar o espaço e a eficiência de acesso. Os objetos de dados criados em qualquer método da DoFn, incluindo Setup, StartBundle, Process, FinishBundle e Teardown, são invocados para cada método DoFn. No Dataflow, cada worker pode ter várias instâncias DoFn. Para um uso da memória mais eficiente, transmita um objeto de dados como um singleton para compartilhá-lo em várias DoFns. Para mais informações, consulte a postagem do blog Reutilização de cache em DoFns (link em inglês).

Usar representações de elementos com eficiência de memória

Avalie se é possível usar representações para elementos PCollection que usam menos memória. Ao usar codificadores no pipeline, considere não apenas a codificação de elementos do elemento PCollection, mas também a decodificação. Geralmente, as matrizes esparsas podem se beneficiar desse tipo de otimização.

Reduzir o tamanho das entradas secundárias

Se as DoFns usarem entradas secundárias, reduza o tamanho da entrada secundária. Para entradas secundárias que são coleções de elementos, use visualizações iteráveis, como AsIterable ou AsMultimap, em vez de visualizações que materializam toda a entrada secundária ao mesmo tempo, como AsList.

Disponibilizar mais memória

Para aumentar a memória disponível, é possível aumentar a quantidade total de memória disponível nos workers sem alterar a quantidade de memória disponível por linha de execução. Como alternativa, você pode aumentar a quantidade de memória disponível por linha de execução. Ao aumentar a memória por linha de execução, a memória total do worker também aumenta.

É possível aumentar a quantidade de memória disponível por linha de execução de quatro maneiras:

Usar um tipo de máquina com mais memória por vCPU

Para selecionar um worker com mais memória por vCPU, use um dos métodos a seguir.

  • Use um tipo de máquina com alta memória na família de máquinas de uso geral. Os tipos de máquina com alta memória têm mais memória por vCPU do que os tipos de máquina padrão. Usar um tipo de máquina com grande quantidade de memória aumenta a memória disponível para cada worker e a memória disponível por linha de execução, porque o número de vCPUs permanece o mesmo. Como resultado, o uso de um tipo de máquina com grande quantidade de memória pode ser uma maneira econômica de selecionar um worker com mais memória por vCPU.
  • Para ter mais flexibilidade ao especificar o número de vCPUs e o volume de memória, use um tipo de máquina personalizado. Com os tipos de máquina personalizados, é possível aumentar a memória em incrementos de 256 MB. Esses tipos de máquinas têm preços diferentes dos tipos padrão.
  • Algumas famílias de máquinas permitem usar tipos de máquina personalizados de memória estendida. A memória estendida permite uma proporção maior de memória por vCPU. O custo é maior.

Para definir os tipos de workers, use a opção de pipeline a seguir. Para mais informações, consulte Como definir opções de pipeline e Opções de pipeline.

Java

Usar a opção de pipeline --workerMachineType.

Python

Usar a opção de pipeline --machine_type.

Go

Usar a opção de pipeline --worker_machine_type.

Usar um tipo de máquina com mais vCPUs

Essa opção é recomendada apenas para pipelines de streaming em Java e Go. Os tipos de máquina com mais vCPUs têm mais memória total, porque a quantidade de memória é escalonada de maneira linear com o número de vCPUs. Por exemplo, um tipo de máquina n1-standard-4 com quatro vCPUs tem 15 GB de memória. Um tipo de máquina n1-standard-8 com oito vCPUs tem 30 GB de memória. Para saber mais sobre tipos de máquinas predefinidos, consulte Família de máquinas de uso geral.

O uso de workers com um número maior de vCPUs pode aumentar significativamente o custo do pipeline. No entanto, é possível usar o escalonamento automático horizontal para reduzir o número total de workers para que o paralelismo permaneça o mesmo. Por exemplo, se 50 workers usarem um tipo de máquina n1-standard-4 e você mudar para um tipo de máquina n1-standard-8, é possível usar o escalonamento automático horizontal e definir o número máximo de workers para reduzir o número total de workers do pipeline para cerca de 25. Essa configuração resulta em um pipeline com um custo semelhante.

Para definir o número máximo de workers, use a opção de pipeline a seguir.

Java

Usar a opção de pipeline --maxNumWorkers.

Para mais informações, consulte Opções de pipeline.

Go

Usar a opção de pipeline --max_num_workers.

Para mais informações, consulte Opções de pipeline.

Esse método não é recomendado para pipelines Python. Ao usar o SDK do Python, ao mudar para um worker com um número maior de vCPUs, você aumenta a memória e o número de processos do SDK do Apache Beam. Por exemplo, o tipo de máquina n1-standard-4 tem a mesma memória por linha de execução que o tipo de máquina n1-standard-8 para pipelines do Python. Portanto, com pipelines do Python, a recomendação é usar um tipo de máquina com grande quantidade de memória, reduzir o número de linhas de execução ou usar apenas um processo do SDK do Apache Beam.

Reduzir o número de linhas de execução

Se um tipo de máquina com grande quantidade de memória não resolver o problema, aumente a memória disponível por linha de execução reduzindo o número máximo de linhas de execução que executam instâncias de DoFn. Essa alteração reduz o paralelismo. Para reduzir o número de linhas de execução do SDK do Apache Beam que executam instâncias de DoFn, use a opção de pipeline a seguir.

Java

Usar a opção de pipeline --numberOfWorkerHarnessThreads.

Para mais informações, consulte Opções de pipeline.

Python

Usar a opção de pipeline --number_of_worker_harness_threads.

Para mais informações, consulte Opções de pipeline.

Go

Usar a opção de pipeline --number_of_worker_harness_threads.

Para mais informações, consulte Opções de pipeline.

Para reduzir o número de linhas de execução para pipelines em lote de Java e Go, defina o valor da sinalização como um número menor do que o número de vCPUs no worker. Para pipelines de streaming, defina o valor da sinalização como um número menor do que o número de linhas de execução por processo do SDK do Apache Beam. Para estimar as linhas de execução por processo, consulte a tabela na seção Uso da memória DoFn desta página.

Essa personalização não está disponível para pipelines do Python executados no SDK do Apache Beam 2.20.0 ou anterior ou para pipelines do Python que não usam o Runner v2.

Usar apenas um processo do SDK do Apache Beam

Para pipelines de streaming do Python e pipelines do Python que usam o Runner v2, é possível forçar o Dataflow a iniciar apenas um processo do SDK do Apache Beam por worker. Antes de usar essa opção, tente resolver o problema usando os outros métodos. Para configurar as VMs de worker do Dataflow para iniciar somente um processo de Python conteinerizado, use a seguinte opção de pipeline:

--experiments=no_use_multiple_sdk_containers

Com essa configuração, os pipelines do Python criam um processo do SDK do Apache Beam por worker. Essa configuração impede que os objetos e dados compartilhados sejam replicados várias vezes para cada processo do SDK do Apache Beam. No entanto, isso limita o uso eficiente dos recursos de computação disponíveis no worker.

Reduzir o número de processos do SDK do Apache Beam para um não reduz necessariamente o número total de linhas de execução iniciadas no worker. Além disso, colocar todas as linhas de execução em um único processo do SDK do Apache Beam pode causar processamento lento ou fazer com que o pipeline fique travado. Portanto, você talvez precise reduzir o número de linhas de execução, conforme descrito na seção Reduzir o número de linhas de execução desta página.

Também é possível forçar os workers a usar somente um processo do SDK do Apache Beam com um tipo de máquina com apenas uma vCPU.