Questa pagina fornisce informazioni sull'utilizzo della memoria nelle pipeline di Dataflow e sui passaggi per analizzare e risolvere i problemi relativi agli errori OOM (Dataflow esaurimento memoria).
Informazioni sull'utilizzo della memoria di Dataflow
Per risolvere gli errori di memoria insufficiente, è utile capire in che modo le pipeline Dataflow utilizzano la memoria.
Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita
tra più macchine virtuali (VM) di Compute Engine, spesso chiamate worker.
I worker elaborano gli elementi di lavoro dal servizio Dataflow e delegano gli elementi di lavoro ai processi dell'SDK Apache Beam. Un processo dell'SDK Apache Beam crea istanze di DoFn
. DoFn
è una classe dell'SDK Apache Beam che definisce una funzione di elaborazione distribuita.
Dataflow avvia diversi thread su ciascun worker e la memoria di ogni worker è condivisa tra tutti i thread. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più ampio. Il numero predefinito di thread dipende da più fattori e varia tra job in batch e in flussi.
Se la tua pipeline richiede più memoria rispetto alla quantità predefinita disponibile per i worker, potresti riscontrare errori di memoria insufficiente.
Le pipeline Dataflow utilizzano principalmente la memoria worker in tre modi:
Memoria operativa worker
I worker Dataflow hanno bisogno di memoria per i loro sistemi operativi e processi di sistema. La memoria del worker di solito non è superiore a 1 GB. Generalmente l'utilizzo è inferiore a 1 GB.
- Vari processi sul worker utilizzano la memoria per garantire il corretto funzionamento della pipeline. Ciascuno di questi processi potrebbe riservare una piccola quantità di memoria per il suo funzionamento.
- Quando la tua pipeline non utilizza Streaming Engine, i processi worker aggiuntivi utilizzano la memoria.
Memoria di processo SDK
I processi dell'SDK Apache Beam potrebbero creare oggetti e dati condivisi tra i thread all'interno del processo, indicati in questa pagina come oggetti e dati condivisi dell'SDK. La memoria utilizzata da questi oggetti e dati condivisi dell'SDK è definita memoria di elaborazione dell'SDK. Il seguente elenco include esempi di dati e oggetti condivisi dell'SDK:
- Input secondari
- Modelli di machine learning
- Oggetti singleton in memoria
- Oggetti Python creati con il modulo
apache_beam.utils.shared
- Dati caricati da origini esterne, come Cloud Storage o BigQuery
I job di streaming che non utilizzano Streaming Engine archiviano gli input lato in memoria. Per le pipeline Java e Go, ogni worker ha una copia dell'input secondario. Per le pipeline Python, ogni processo dell'SDK Apache Beam ha una copia dell'input secondario.
I job di streaming che utilizzano Streaming Engine hanno un limite di dimensione di input laterale di 80 MB. Gli input secondari sono archiviati al di fuori della memoria del worker.
L'utilizzo della memoria da parte di oggetti e dati condivisi dell'SDK aumenta in modo lineare con il numero di processi dell'SDK Apache Beam. Nelle pipeline Java e Go, viene avviato un processo dell'SDK Apache Beam per worker. Nelle pipeline Python, viene avviato un processo dell'SDK Apache Beam per vCPU. Gli oggetti e i dati condivisi dell'SDK vengono riutilizzati in thread all'interno dello stesso processo dell'SDK Apache Beam.
Memoria utilizzata: DoFn
DoFn
è una classe dell'SDK Apache Beam che definisce una funzione di elaborazione distribuita.
Ogni worker può eseguire istanze DoFn
in parallelo. Ogni thread esegue un'istanza DoFn
. Potrebbe essere utile calcolare l'utilizzo totale della memoria, calcolare le dimensioni del set di lavoro o la quantità di memoria necessaria affinché un'applicazione continui a funzionare. Ad esempio, se un singolo DoFn
usa un massimo di 5 MB di memoria e un worker ha 300 thread, l'utilizzo della memoria di DoFn
potrebbe raggiungere un picco di 1, 5 GB o il numero di byte di memoria moltiplicato per il numero di thread. A seconda di come i worker utilizzano la memoria, un picco nell'utilizzo della memoria potrebbe esaurire la memoria dei worker.
È difficile stimare quante istanze di un
DoFn
Dataflow crea. Il numero dipende da vari fattori, come l'SDK,
il tipo di macchina e così via. Inoltre, il DoFn potrebbe essere utilizzato da più thread in successione.
Il servizio Dataflow non garantisce il numero di volte in cui viene richiamato un DoFn
, né il numero esatto di istanze DoFn
create nel corso di una pipeline.
Tuttavia, la tabella seguente fornisce informazioni sul livello di parallelismo previsto e stima un limite superiore sul numero di istanze DoFn
.
SDK Python Beam
Batch | Streaming senza Streaming Engine | Streaming Engine | |
---|---|---|---|
Parallelismo |
1 processo per vCPU 1 thread per processo 1 thread per vCPU
|
1 processo per vCPU 12 thread per processo 12 thread per vCPU |
1 processo per vCPU 12 thread per processo 12 thread per vCPU
|
Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifica in qualsiasi momento). |
1 DoFn per thread
1
|
1 DoFn per thread
12
|
1 DoFn per thread
12
|
SDK Java/Go Beam
Batch | Streaming senza Streaming Engine | Streaming Engine | |
---|---|---|---|
Parallelismo |
1 processo per VM worker 1 thread per vCPU
|
1 processo per VM worker 300 thread per processo 300 thread per VM worker
|
1 processo per VM worker 500 thread per processo 500 thread per VM worker
|
Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifica in qualsiasi momento). |
1 DoFn per thread
1
|
1 DoFn per thread
300
|
1 DoFn per thread
500
|
Se hai una pipeline multilingue e più di un SDK Apache Beam è in esecuzione sul worker, il worker utilizza il grado più basso possibile di parallelismo di thread per processo.
Differenze tra Java, Go e Python
Java, Go e Python gestiscono i processi e la memoria in modo diverso. Di conseguenza, l'approccio da adottare per la risoluzione degli errori di memoria varia a seconda che la pipeline utilizzi Java, Go o Python.
Pipeline Java e Go
Nelle pipeline Java e Go:
- Ogni worker avvia un processo dell'SDK Apache Beam.
- Gli oggetti e i dati condivisi dall'SDK, come input collaterali e cache, vengono condivisi tra tutti i thread nel worker.
- La memoria utilizzata dagli oggetti e dai dati condivisi con l'SDK non viene generalmente scalata in base al numero di vCPU sul worker.
Pipeline Python
Nelle pipeline Python:
- Ogni worker avvia un processo dell'SDK Apache Beam per vCPU.
- Gli oggetti e i dati condivisi dall'SDK, come input collaterali e cache, sono condivisi tra tutti i thread all'interno di ogni processo dell'SDK Apache Beam.
- Il numero totale di thread sul worker viene scalato in modo lineare in base al numero di vCPU. Di conseguenza, la memoria utilizzata dagli oggetti e dai dati condivisi dell'SDK cresce in modo lineare con il numero di vCPU.
- I thread che eseguono il lavoro vengono distribuiti tra i processi. Le nuove unità di lavoro vengono assegnate a un processo senza elementi di lavoro o a quello con il minor numero di elementi di lavoro attualmente assegnati.
Individuare gli errori di memoria
Per determinare se la tua pipeline sta per esaurire la memoria, utilizza uno dei seguenti metodi.
- Nella pagina Dettagli job, nel riquadro Log, visualizza la scheda Diagnostica. Questa scheda mostra gli errori relativi ai problemi di memoria e la frequenza con cui si verificano.
- Nell'interfaccia di monitoraggio Dataflow, utilizza il grafico Utilizzo memoria per monitorare la capacità e l'utilizzo della memoria del worker.
Se il job presenta un utilizzo elevato della memoria o errori di memoria insufficiente, segui i consigli in questa pagina per ottimizzare l'utilizzo della memoria o per aumentare la quantità di memoria disponibile.
Risolvi gli errori di memoria insufficiente
Le modifiche alla pipeline Dataflow potrebbero risolvere gli errori di memoria insufficiente o ridurre l'utilizzo della memoria. Le modifiche possibili includono le seguenti azioni:
Il seguente diagramma mostra il flusso di lavoro per la risoluzione dei problemi di Dataflow descritto in questa pagina.
Ottimizza la pipeline
Diverse operazioni della pipeline possono causare errori di memoria esaurita. Questa sezione fornisce opzioni per ridurre l'utilizzo della memoria della pipeline. Per identificare le fasi della pipeline che consumano più memoria, utilizza Cloud Profiler per monitorare le prestazioni della pipeline.
Per ottimizzare la pipeline, puoi utilizzare le seguenti best practice:
- Utilizzare i connettori I/O integrati di Apache Beam per leggere i file
- Riprogettare le operazioni quando si utilizzano
GroupByKey
PTransforms - Ridurre i dati in entrata da origini esterne
- Condividere oggetti tra thread
- Utilizzare rappresentazioni degli elementi in modo efficiente in termini di memoria
- Riduci le dimensioni degli input secondari
Usa i connettori I/O integrati Apache Beam per leggere i file
Non aprire file di grandi dimensioni all'interno di un DoFn
. Per leggere i file, utilizza i
connettori I/O integrati di Apache Beam.
I file aperti in un file DoFn
devono essere memorizzati in memoria. Poiché più istanze DoFn
vengono eseguite contemporaneamente, i file di grandi dimensioni aperti in DoFn
possono causare errori di memoria insufficiente.
Riprogettare le operazioni con l'utilizzo di GroupByKey
PTransforms
Quando utilizzi una classe GroupByKey
PTransform in Dataflow, i valori risultanti per chiave e finestra vengono elaborati in un singolo thread. Poiché questi dati vengono passati come flusso dal servizio di backend Dataflow ai worker, non è necessario che rientrino nella memoria worker. Tuttavia, se i valori vengono raccolti in memoria, la logica di elaborazione potrebbe causare errori di esaurimento della memoria.
Ad esempio, se hai una chiave che contiene dati per una finestra e aggiungi le coppie chiave-valore a un oggetto in memoria, come un elenco, potrebbero verificarsi errori di memoria insufficiente. In questo scenario, il worker potrebbe non avere capacità di memoria sufficiente per contenere tutti gli oggetti.
Per ulteriori informazioni su GroupByKey
PTransforms, consulta la documentazione di Apache Beam
Python GroupByKey
e Java GroupByKey
.
Il seguente elenco contiene suggerimenti per progettare la pipeline al fine di ridurre al minimo il consumo di memoria quando utilizzi GroupByKey
PTransforms.
- Per ridurre la quantità di dati per chiave e per finestra, evita le chiavi con molti valori, note anche come chiavi di scelta rapida.
- Per ridurre la quantità di dati raccolti per ogni finestra, utilizza una finestra di dimensioni inferiori.
- Se utilizzi i valori di una chiave in una finestra per calcolare un numero, utilizza una trasformazione
Combine
. Non eseguire il calcolo in una singola istanzaDoFn
dopo aver raccolto i valori. - Filtra i valori o i duplicati prima dell'elaborazione. Per ulteriori informazioni, consulta la documentazione su Python
Filter
e la documentazione sulla trasformazione di JavaFilter
.
Riduci i dati in entrata da origini esterne
Se stai effettuando chiamate a un'API esterna o a un database per l'arricchimento dei dati, i dati restituiti devono rientrare nella memoria worker.
Se stai raggruppando le chiamate, ti consigliamo di utilizzare una trasformazione GroupIntoBatches
.
Se si verificano errori di memoria esaurita, riduci le dimensioni del batch. Per ulteriori informazioni sul raggruppamento in batch, consulta la documentazione di Python GroupIntoBatches
e Java GroupIntoBatches
.
Condividi oggetti tra thread
La condivisione di un oggetto dati in memoria tra istanze DoFn
può migliorare lo spazio e
l'efficienza di accesso. Gli oggetti di dati creati in qualsiasi metodo di DoFn
, tra cui
Setup
, StartBundle
, Process
, FinishBundle
e Teardown
, vengono richiamati
per ogni DoFn
. In Dataflow, ogni worker può avere diverse istanze DoFn
. Per un utilizzo della memoria più efficiente, trasmetti un oggetto dati come un singleton per condividerlo tra più DoFn
. Per ulteriori informazioni, consulta il post del blog Riutilizzo della cache tra DoFn
.
Usa rappresentazioni degli elementi efficienti in memoria
Valuta se puoi utilizzare rappresentazioni per gli elementi PCollection
che utilizzano meno memoria. Quando utilizzi i programmatori nella pipeline, prendi in considerazione non solo le rappresentazioni degli elementi PCollection
codificate, ma anche decodificate. Spesso le matrici sparse possono trarre vantaggio da questo tipo di ottimizzazione.
Riduci le dimensioni degli input secondari
Se i tuoi DoFn
utilizzano input collaterali, riduci le dimensioni dell'input secondario. Per gli input
collaterali che sono raccolte di elementi, valuta la possibilità di utilizzare viste iterabili, come
AsIterable
o AsMultimap
, anziché viste che materializzano contemporaneamente l'intero input laterale, ad esempio
AsList
.
Rendi disponibile più memoria
Per aumentare la memoria disponibile, puoi aumentare la quantità totale di memoria disponibile sui worker senza modificare la quantità di memoria disponibile per thread. In alternativa, puoi aumentare la quantità di memoria disponibile per thread. Aumentando la memoria per thread, aumenti anche la memoria totale sul worker.
Puoi aumentare la quantità di memoria disponibile per thread in quattro modi:
- Utilizza un tipo di macchina con più memoria per vCPU.
- Utilizza un tipo di macchina con più vCPU (pipeline in modalità flusso Java e Go).
- Riduci il numero di thread.
- Utilizza un solo processo dell'SDK Apache Beam (flussi di Python e pipeline Python Runner v2).
Utilizza un tipo di macchina con più memoria per vCPU
Per selezionare un worker con più memoria per vCPU, utilizza uno dei seguenti metodi.
- Utilizzare un tipo di macchina con memoria elevata nella famiglia di macchine per uso generico. I tipi di macchine con memoria elevata hanno una memoria per vCPU maggiore rispetto ai tipi di macchina standard. L'utilizzo di un tipo di macchina con memoria elevata aumenta sia la memoria disponibile per ciascun worker sia la memoria disponibile per thread, poiché il numero di vCPU rimane lo stesso. Di conseguenza, l'utilizzo di un tipo di macchina con memoria elevata può essere un modo conveniente per selezionare un worker con più memoria per vCPU.
- Per una maggiore flessibilità quando specifichi il numero di vCPU e la quantità di memoria, puoi utilizzare un tipo di macchina personalizzata. Con i tipi di macchine personalizzate, puoi aumentare la memoria in incrementi di 256 MB. Il costo di questi tipi di macchine è diverso rispetto ai tipi di macchina standard.
- Alcune famiglie di macchine consentono di utilizzare tipi di macchine personalizzate con memoria estesa. La memoria estesa consente un rapporto memoria per vCPU più elevato. Il costo è più alto.
Per impostare i tipi di worker, utilizza la seguente opzione della pipeline. Per ulteriori informazioni, consulta Impostazione delle opzioni della pipeline e Opzioni di pipeline.
Java
Utilizza l'opzione della pipeline --workerMachineType
.
Python
Utilizza l'opzione della pipeline --machine_type
.
Go
Utilizza l'opzione della pipeline --worker_machine_type
.
Utilizza un tipo di macchina con più vCPU
Questa opzione è consigliata solo per le pipeline in modalità flusso Java e Go. I tipi di macchina con più vCPU hanno più memoria totale, poiché la quantità di memoria scala in modo lineare con il numero di vCPU. Ad esempio, un tipo di macchina n1-standard-4
con quattro vCPU ha 15 GB di memoria. Un tipo di macchina n1-standard-8
con otto vCPU
ha 30 GB di memoria. Per ulteriori informazioni sui tipi di macchine predefinite, consulta Famiglia di macchine per uso generico.
L'utilizzo di worker con un numero maggiore di vCPU potrebbe
aumentare significativamente il costo della pipeline. Tuttavia, puoi utilizzare la scalabilità automatica orizzontale per ridurre il numero di worker totali in modo che il parallelismo rimanga lo stesso. Ad esempio, se hai 50 worker che utilizzano un tipo di macchina n1-standard-4
e passi a un tipo di macchina n1-standard-8
, puoi utilizzare la scalabilità automatica orizzontale e impostare il numero massimo di worker per ridurre il numero totale di worker nella pipeline a circa 25. Questa configurazione genera una pipeline con un costo simile.
Per impostare il numero massimo di worker, utilizza la seguente opzione di pipeline.
Java
Utilizza l'opzione della pipeline --maxNumWorkers
.
Per ulteriori informazioni, consulta la sezione Opzioni di pipeline.
Go
Utilizza l'opzione della pipeline --max_num_workers
.
Per ulteriori informazioni, consulta la sezione Opzioni di pipeline.
Questo metodo non è consigliato per le pipeline Python. Quando utilizzi l'SDK Python, se passi a un worker con un numero maggiore di vCPU, non solo aumenti la memoria, ma aumenti anche il numero di processi dell'SDK Apache Beam. Ad esempio, il tipo di macchina n1-standard-4
ha la stessa memoria per thread del tipo di macchina n1-standard-8
per le pipeline Python. Di conseguenza, per le pipeline Python, si consiglia di utilizzare un tipo di macchina con memoria elevata, ridurre il numero di thread o utilizzare un solo processo dell'SDK Apache Beam.
Riduci il numero di thread
Se l'utilizzo di un tipo di macchina con memoria elevata non risolve il problema, aumenta la memoria disponibile per thread riducendo il numero massimo di thread che eseguono DoFn
istanze.
Questa modifica riduce il parallelismo. Per ridurre il numero di thread dell'SDK Apache Beam che eseguono DoFn
istanze, utilizza la seguente opzione di pipeline.
Java
Utilizza l'opzione della pipeline --numberOfWorkerHarnessThreads
.
Per ulteriori informazioni, consulta la sezione Opzioni di pipeline.
Python
Utilizza l'opzione della pipeline --number_of_worker_harness_threads
.
Per ulteriori informazioni, consulta la sezione Opzioni di pipeline.
Go
Utilizza l'opzione della pipeline --number_of_worker_harness_threads
.
Per ulteriori informazioni, consulta la sezione Opzioni di pipeline.
Per ridurre il numero di thread per le pipeline batch Java e Go, imposta il valore del flag su un numero inferiore al numero di vCPU sul worker. Per le pipeline in modalità flusso,
imposta il valore del flag su un numero inferiore al numero di thread per
processo dell'SDK Apache Beam.
Per stimare i thread per processo, consulta la tabella nella sezione Utilizzo memoria DoFn
in questa pagina.
Questa personalizzazione non è disponibile per le pipeline Python in esecuzione sull'SDK Apache Beam 2.20.0 o versioni precedenti o per le pipeline Python che non utilizzano Runner v2.
Utilizza un solo processo dell'SDK Apache Beam
Per le pipeline in modalità flusso Python e le pipeline Python che utilizzano Runner v2, puoi forzare Dataflow ad avviare un solo processo dell'SDK Apache Beam per worker. Prima di provare questa opzione, prova a risolvere il problema utilizzando gli altri metodi. Per configurare le VM worker Dataflow in modo che avviino un solo processo Python containerizzato, utilizza la seguente opzione pipeline:
--experiments=no_use_multiple_sdk_containers
Con questa configurazione, le pipeline Python creano un processo dell'SDK Apache Beam per worker. Questa configurazione impedisce che i dati e gli oggetti condivisi vengano replicati più volte per ogni processo dell'SDK Apache Beam. Tuttavia, limita l'uso efficiente delle risorse di calcolo disponibili sul worker.
La riduzione a uno del numero di processi dell'SDK Apache Beam non riduce necessariamente il numero totale di thread avviati sul worker. Inoltre, avere tutti i thread in un singolo processo dell'SDK Apache Beam potrebbe causare un rallentamento dell'elaborazione o un blocco della pipeline. Di conseguenza, potrebbe anche essere necessario ridurre il numero di thread, come descritto nella sezione Ridurre il numero di thread in questa pagina.
Puoi anche forzare i worker a utilizzare un solo processo dell'SDK Apache Beam utilizzando un tipo di macchina con una sola vCPU.