Nesta página, você verá informações sobre o uso da memória nos pipelines do Dataflow e as etapas para investigar e resolver erros de falta de memória (OOM, na sigla em inglês) do Dataflow.
Sobre o uso da memória do Dataflow
Para solucionar erros de falta de memória, é útil entender como os pipelines do Dataflow usam a memória.
Quando o Dataflow executa um pipeline, o processamento é distribuído
em várias máquinas virtuais (VMs) do Compute Engine, geralmente chamadas de workers.
Os workers processam itens de trabalho do serviço do Dataflow
e delegam os itens de trabalho aos processos do SDK do Apache Beam. Um processo do SDK
do Apache Beam cria instâncias
de DoFn
s. DoFn
é uma classe do SDK do Apache Beam que define uma função
de processamento distribuído.
O Dataflow inicia várias linhas de execução em cada worker, e a memória de cada worker é compartilhada em todas as linhas de execução. Uma linha de execução é uma única tarefa executável em um processo maior. O número padrão de linhas de execução depende de vários fatores e varia entre os jobs em lote e de streaming.
Se o pipeline precisar de mais memória do que a quantidade padrão disponível nos workers, talvez ocorram erros de falta de memória.
Os pipelines do Dataflow usam principalmente a memória do worker de três maneiras:
Memória operacional do worker
Os workers do Dataflow precisam de memória para os sistemas operacionais e processos do sistema. O uso da memória do worker normalmente não é maior que 1 GB. O uso normalmente é menor que 1 GB.
- Vários processos do worker usam memória para garantir que o pipeline funcione. Cada um desses processos pode reservar uma pequena quantidade de memória para a operação.
- Quando o pipeline não usa o Streaming Engine, outros processos de worker usam a memória.
Memória do processo do SDK
Os processos do SDK do Apache Beam podem criar objetos e dados compartilhados entre linhas de execução dentro do processo, chamados de objetos e dados compartilhados do SDK nesta página. O uso da memória desses objetos e dados compartilhados do SDK é chamado de memória de processo do SDK. A lista a seguir inclui exemplos de objetos e dados compartilhados do SDK:
- Entradas secundárias
- Modelos de machine learning
- Objetos singleton na memória
- Objetos Python criados com o
módulo
apache_beam.utils.shared
- Dados carregados de fontes externas, como o Cloud Storage ou o BigQuery;
Jobs de streaming que não usam entradas secundárias de armazenamento do Streaming Engine na memória. Nos pipelines de Java e Go, cada worker tem uma cópia da entrada secundária. Para pipelines de Python, cada processo do SDK do Apache Beam tem uma cópia da entrada secundária.
Os jobs de streaming que usam o Streaming Engine têm um limite de tamanho de entrada secundária de 80 MB. As entradas secundárias são armazenadas fora da memória do worker.
O uso da memória de objetos e dados compartilhados do SDK cresce de maneira linear com o número de processos do SDK do Apache Beam. Em pipelines de Java e Go, um processo do SDK do Apache Beam é iniciado por worker. Nos pipelines do Python, um processo do SDK do Apache Beam é iniciado por vCPU. Os objetos e dados compartilhados do SDK são reutilizados em linhas de execução no mesmo processo do SDK do Apache Beam.
DoFn
Uso da memória
DoFn
é uma classe do SDK do Apache Beam que define uma função
de processamento distribuído.
Cada worker pode executar instâncias DoFn
simultâneas. Cada linha de execução executa uma instância
de DoFn
. Ao avaliar o uso total da memória, calcular o tamanho do conjunto de trabalho ou
a quantidade de memória necessária para que um aplicativo continue funcionando pode ser
útil. Por exemplo, se uma DoFn
individual usar
no máximo 5 MB de memória e um worker tiver 300 linhas de execução, o uso da memória de DoFn
poderá atingir 1,5 MB ou o número de bytes de memória multiplicado pelo
número de linhas de execução. Dependendo de como os workers usam a memória, um pico no uso da memória pode fazer com que eles fiquem sem memória.
É difícil estimar quantas instâncias de um DoFn são criados. O número depende de vários fatores, como o SDK, o tipo de máquina e assim por diante. Além disso, o DoFn pode ser usado por vários threads sucessivamente.
O serviço do Dataflow não garante quantas vezes uma DoFn
é invocada,
nem garante o número exato de instâncias de DoFn
criadas durante um pipeline.
A tabela a seguir fornece alguns insights sobre o nível de paralelismo
esperado e estima um limite superior do
número de instâncias de DoFn
.
SDK do Beam para Python
Lote | Streaming sem o Streaming Engine | Streaming Engine | |
---|---|---|---|
Paralelismo |
1 processo por vCPU 1 linha de execução por processo 1 linha de execução por vCPU
|
1 processo por vCPU 12 linhas de execução por processo 12 linhas de execução por vCPU |
1 processo por vCPU 12 linhas de execução por processo 12 linhas de execução por vCPU
|
Número máximo de instâncias DoFn simultâneas (esses números estão sujeitos a alterações a qualquer momento). |
1 DoFn por linha de execução
1
|
1 DoFn por linha de execução
12
|
1 DoFn por linha de execução
12
|
SDK do Beam para Java/Go
Lote | Streaming sem o Streaming Engine | Streaming Engine | |
---|---|---|---|
Paralelismo |
1 processo por VM de worker 1 linha de execução por vCPU
|
1 processo por VM de worker 300 linhas de execução por processo 300 linhas de execução por VM de worker
|
1 processo por VM de worker 500 linhas de execução por processo 500 linhas de execução por VM de worker
|
Número máximo de instâncias DoFn simultâneas (esses números estão sujeitos a alterações a qualquer momento). |
1 DoFn por linha de execução
1
|
1 DoFn por linha de execução
300
|
1 DoFn por linha de execução
500
|
Quando você tem um pipeline de várias linguagens e mais de um SDK do Apache Beam está em execução no worker, o worker usa o menor grau de paralelismo de linhas de execução por processo.
Diferenças de Java, Go e Python
Java, Go e Python gerenciam processos e memória de maneiras diferentes. Como resultado, a abordagem que você precisa seguir para resolver erros de falta de memória varia, se o pipeline usar Java, Go ou Python.
Pipelines Java e Go
Em pipelines de Java e Go:
- Cada worker inicia um processo do SDK do Apache Beam.
- Os objetos e dados compartilhados do SDK, como entradas laterais e caches, são compartilhados entre todas as linhas de execução no worker.
- A memória usada por dados e objetos compartilhados do SDK geralmente não é escalonada com base no número de vCPUs no worker.
Pipelines do Python
Nos pipelines de Python:
- Cada worker inicia um processo do SDK do Apache Beam por vCPU.
- Objetos e dados compartilhados do SDK, como entradas secundárias e caches, são compartilhados entre todas as linhas de execução de cada processo do SDK do Apache Beam.
- O número total de linhas de execução do worker é escalonado de maneira linear com base no número de vCPUs. Como resultado, a memória usada pelos objetos e dados compartilhados do SDK cresce de maneira linear com o número de vCPUs.
- As linhas de execução que executam o trabalho são distribuídas entre os processos. Novas unidades de trabalho são atribuídas a um processo sem itens de trabalho ou ao processo com o menor número de itens de trabalho atualmente atribuídos.
Encontrar erros de memória insuficiente
Para determinar se o pipeline está ficando sem memória, use um dos métodos a seguir.
- Na página Detalhes do job, no painel Registros, consulte a guia Diagnóstico. Esta guia exibe erros relacionados a problemas de memória e com que frequência eles ocorrem.
- Na interface de monitoramento do Dataflow, use o gráfico de Utilização da memória para monitorar a capacidade e o uso da memória do worker.
Na página Detalhes do job, no painel Registros, selecione Registros de worker. Encontre os erros de memória.
Java
O monitor de memória Java, configurado pelo
MemoryMonitorOptions
,
gera relatórios periódicos de métricas de coleta de lixo. Se a fração do tempo de CPU usada
para coleta de lixo excede um limite de 50%
por um longo período, o arcabouço atual do SDK falhará.
Talvez você encontre um erro semelhante ao exemplo a seguir:
Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
Esse erro de memória pode ocorrer quando a memória física ainda está disponível. O erro geralmente indica que o uso da memória do pipeline está ineficiente. Para resolver esse problema, otimize o pipeline.
Se o job tiver alto uso de memória ou erros de falta de memória, siga as recomendações desta página para otimizar o uso da memória ou aumentar a quantidade de memória disponível.
Resolver erros de falta de memória
Fazer alterações no pipeline do Dataflow pode resolver erros de falta de memória ou reduzir o uso da memória. As possíveis alterações incluem as seguintes ações:
O diagrama a seguir mostra o fluxo de trabalho da solução de problemas do Dataflow descrito nesta página.
Otimizar o pipeline
Várias operações de pipeline podem causar erros de falta de memória. Nesta seção, apresentamos opções para reduzir o uso de memória do seu pipeline. Para identificar os estágios de pipeline que consomem mais memória, use o Cloud Profiler para monitorar o desempenho do pipeline.
Para otimizar seu pipeline, siga estas práticas recomendadas:
- Usar conectores de E/S integrados do Apache Beam para ler arquivos
- Reformular operações ao usar PTransforms do
GroupByKey
- Reduzir os dados de entrada de fontes externas
- Compartilhar objetos em linhas de execução
- Usar representações de elementos com eficiência de memória
- Reduzir o tamanho das entradas secundárias
Usar conectores de E/S integrados do Apache Beam para ler arquivos
Não abra arquivos grandes em uma DoFn
. Para ler arquivos, use os conectores de E/S integrados ao Apache Beam.
Os arquivos abertos em uma DoFn
precisam caber na memória. Como várias instâncias do DoFn
são executadas
simultaneamente, arquivos grandes abertos em DoFn
s podem causar erros de falta de memória.
Reformular operações ao usar PTransforms do GroupByKey
Ao usar uma PTransform do GroupByKey
no Dataflow, os valores resultantes
por chave e janela são processados em uma única linha de execução. Como esses dados são
transmitidos como um fluxo do serviço de back-end do Dataflow para os
workers, eles não precisam se ajustar à memória do worker. No entanto, se os valores forem
coletados na memória, a lógica de processamento pode causar erros de falta de memória.
Por exemplo, se você tiver uma chave que contém dados de uma janela e adicionar os valores de chave a um objeto na memória, como uma lista, poderão ocorrer erros de falta de memória. Nesse cenário, o worker pode não ter capacidade de memória suficiente para conter todos os objetos.
Para mais informações sobre PTransforms do GroupByKey
, consulte a documentação de GroupByKey
em Python e de GroupByKey
em Java.
A lista a seguir contém sugestões para projetar o pipeline para minimizar
o consumo de memória ao usar PTransforms do GroupByKey
.
- Para reduzir a quantidade de dados por chave e por janela, evite chaves com muitos valores, também conhecidas como chaves de atalho.
- Para reduzir a quantidade de dados coletados por janela, use uma janela menor.
- Se você estiver usando valores de uma chave em uma janela para calcular um número, use uma transformação
Combine
. Não faça o cálculo em uma única instânciaDoFn
depois de coletar os valores. - Filtre valores ou cópias antes do processamento. Para mais informações, consulte a documentação de transformação
Filter
em Python eFilter
em Java.
Reduzir os dados de entrada de fontes externas
Se você fizer chamadas a uma API externa ou a um banco de dados para aprimoramento de dados,
os dados retornados devem caber na memória do worker.
Se você estiver recebendo chamadas em lote, é recomendável usar uma transformação GroupIntoBatches
.
Se você encontrar erros de memória insuficiente, reduza o tamanho do lote. Para mais informações
sobre como agrupar em lotes, consulte a
documentação de transformação
GroupIntoBatches
em Python
e GroupIntoBatches
em Java.
Compartilhar objetos em linhas de execução
O compartilhamento de um objeto de dados da memória em instâncias DoFn
pode melhorar o espaço e
a eficiência de acesso. Os objetos de dados criados em qualquer método da DoFn
, incluindo
Setup
, StartBundle
, Process
, FinishBundle
e Teardown
, são invocados
para cada método DoFn
. No Dataflow, cada worker pode ter várias instâncias
DoFn
. Para um uso da memória mais eficiente, transmita um objeto de dados como um singleton para compartilhá-lo em várias DoFn
s. Para mais informações, consulte a postagem do blog
Reutilização de cache em DoFn
s (link em inglês).
Usar representações de elementos com eficiência de memória
Avalie se é possível usar representações para elementos PCollection
que usam menos memória. Ao usar codificadores no pipeline, considere não
apenas a codificação de elementos do elemento PCollection
, mas também a decodificação. Geralmente, as matrizes esparsas
podem se beneficiar desse tipo de otimização.
Reduzir o tamanho das entradas secundárias
Se as DoFn
s usarem entradas secundárias, reduza o tamanho da entrada secundária. Para entradas
secundárias que são coleções de elementos, use visualizações iteráveis, como
AsIterable
ou AsMultimap
, em vez de visualizações que materializam toda a entrada secundária ao mesmo tempo, como
AsList
.
Disponibilizar mais memória
Para aumentar a memória disponível, é possível aumentar a quantidade total de memória disponível nos workers sem alterar a quantidade de memória disponível por linha de execução. Como alternativa, você pode aumentar a quantidade de memória disponível por linha de execução. Ao aumentar a memória por linha de execução, a memória total do worker também aumenta.
É possível aumentar a quantidade de memória disponível por linha de execução de quatro maneiras:
- Use a machine type with more memory per vCPU.
- Usar um tipo de máquina com mais vCPUs (pipelines de streaming em Java e Go).
- Reduzir o número de linhas de execução.
- Usar apenas um processo do SDK do Apache Beam (pipelines de streaming do Python e do Python Runner v2).
Usar um tipo de máquina com mais memória por vCPU
Para selecionar um worker com mais memória por vCPU, use um dos métodos a seguir.
- Use um tipo de máquina com alta memória na família de máquinas de uso geral. Os tipos de máquina com alta memória têm mais memória por vCPU do que os tipos de máquina padrão. Usar um tipo de máquina com grande quantidade de memória aumenta a memória disponível para cada worker e a memória disponível por linha de execução, porque o número de vCPUs permanece o mesmo. Como resultado, o uso de um tipo de máquina com grande quantidade de memória pode ser uma maneira econômica de selecionar um worker com mais memória por vCPU.
- Para ter mais flexibilidade ao especificar o número de vCPUs e o volume de memória, use um tipo de máquina personalizado. Com os tipos de máquina personalizados, é possível aumentar a memória em incrementos de 256 MB. Esses tipos de máquinas têm preços diferentes dos tipos padrão.
- Algumas famílias de máquinas permitem usar tipos de máquina personalizados de memória estendida. A memória estendida permite uma proporção maior de memória por vCPU. O custo é maior.
Para definir os tipos de workers, use a opção de pipeline a seguir. Para mais informações, consulte Como definir opções de pipeline e Opções de pipeline.
Java
Usar a opção de pipeline --workerMachineType
.
Python
Usar a opção de pipeline --machine_type
.
Go
Usar a opção de pipeline --worker_machine_type
.
Usar um tipo de máquina com mais vCPUs
Essa opção é recomendada apenas para pipelines de streaming em Java e Go. Os tipos de máquina com mais
vCPUs têm mais memória total, porque a quantidade de memória é escalonada de maneira linear
com o número de vCPUs. Por exemplo, um tipo de máquina n1-standard-4
com quatro
vCPUs tem 15 GB de memória. Um tipo de máquina n1-standard-8
com oito vCPUs
tem 30 GB de memória. Para saber mais sobre tipos de máquinas predefinidos, consulte
Família de máquinas de uso geral.
O uso de workers com um número maior de vCPUs pode aumentar significativamente o custo do pipeline. No entanto, é possível usar o escalonamento automático horizontal para reduzir o número total de workers para que o paralelismo permaneça o mesmo. Por exemplo, se 50 workers usarem um tipo de máquina n1-standard-4
e você mudar para
um tipo de máquina n1-standard-8
, é possível usar o escalonamento automático horizontal
e definir o número máximo de workers para reduzir o
número total de workers do pipeline para cerca de 25. Essa configuração resulta em um pipeline com um custo semelhante.
Para definir o número máximo de workers, use a opção de pipeline a seguir.
Java
Usar a opção de pipeline --maxNumWorkers
.
Para mais informações, consulte Opções de pipeline.
Go
Usar a opção de pipeline --max_num_workers
.
Para mais informações, consulte Opções de pipeline.
Esse método não é recomendado para pipelines Python. Ao usar o SDK do Python, ao mudar para um worker com um número maior de vCPUs, você aumenta a memória e o número de processos do SDK do Apache Beam. Por exemplo, o tipo de máquina n1-standard-4
tem a mesma memória
por linha de execução que o tipo de máquina n1-standard-8
para pipelines do Python. Portanto, com pipelines do Python,
a recomendação é usar um
tipo de máquina com grande quantidade de memória, reduzir o número de
linhas de execução ou usar apenas um processo do SDK do Apache Beam.
Reduzir o número de linhas de execução
Se um tipo de máquina com grande quantidade de memória não resolver o problema, aumente a memória
disponível por linha de execução reduzindo o número máximo de linhas de execução que executam instâncias de DoFn
.
Essa alteração reduz o paralelismo. Para reduzir o número de linhas de execução do SDK do Apache Beam que executam instâncias de DoFn
, use a opção de pipeline a seguir.
Java
Usar a opção de pipeline --numberOfWorkerHarnessThreads
.
Para mais informações, consulte Opções de pipeline.
Python
Usar a opção de pipeline --number_of_worker_harness_threads
.
Para mais informações, consulte Opções de pipeline.
Go
Usar a opção de pipeline --number_of_worker_harness_threads
.
Para mais informações, consulte Opções de pipeline.
Para reduzir o número de linhas de execução para pipelines em lote de Java e Go, defina o valor
da sinalização como um número menor do que o número de vCPUs no worker. Para pipelines de streaming,
defina o valor da sinalização como um número menor do que o número de linhas de execução por
processo do SDK do Apache Beam.
Para estimar as linhas de execução por processo, consulte a tabela na seção Uso da memória DoFn
desta página.
Essa personalização não está disponível para pipelines do Python executados no SDK do Apache Beam 2.20.0 ou anterior ou para pipelines do Python que não usam o Runner v2.
Usar apenas um processo do SDK do Apache Beam
Para pipelines de streaming do Python e pipelines do Python que usam o Runner v2, é possível forçar o Dataflow a iniciar apenas um processo do SDK do Apache Beam por worker. Antes de usar essa opção, tente resolver o problema usando os outros métodos. Para configurar as VMs de worker do Dataflow para iniciar somente um processo de Python conteinerizado, use a seguinte opção de pipeline:
--experiments=no_use_multiple_sdk_containers
Com essa configuração, os pipelines do Python criam um processo do SDK do Apache Beam por worker. Essa configuração impede que os objetos e dados compartilhados sejam replicados várias vezes para cada processo do SDK do Apache Beam. No entanto, isso limita o uso eficiente dos recursos de computação disponíveis no worker.
Reduzir o número de processos do SDK do Apache Beam para um não reduz necessariamente o número total de linhas de execução iniciadas no worker. Além disso, colocar todas as linhas de execução em um único processo do SDK do Apache Beam pode causar processamento lento ou fazer com que o pipeline fique travado. Portanto, você talvez precise reduzir o número de linhas de execução, conforme descrito na seção Reduzir o número de linhas de execução desta página.
Também é possível forçar os workers a usar somente um processo do SDK do Apache Beam com um tipo de máquina com apenas uma vCPU.