Questa pagina fornisce informazioni sull'utilizzo della memoria in Dataflow pipeline e i passaggi per analizzare e risolvere i problemi Errori di Dataflow di esaurimento memoria (OOM).
Informazioni sull'utilizzo della memoria di Dataflow
Per risolvere gli errori di esaurimento della memoria, è utile comprendere in che modo Le pipeline Dataflow utilizzano la memoria.
Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita
su più macchine virtuali (VM) Compute Engine, spesso chiamate worker.
I worker elaborano gli elementi di lavoro dal servizio Dataflow
e delegare gli elementi di lavoro ai processi SDK Apache Beam. Un Apache Beam
Il processo SDK crea istanze
di DoFn
. DoFn
è una classe SDK Apache Beam che definisce un modello
di elaborazione.
Dataflow avvia diversi thread su ogni worker e la memoria di ciascun worker è condiviso tra tutti i thread. Un thread è un singolo eseguibile all'interno di un processo più grande. Il numero predefinito i thread dipendono da più fattori e variano tra job batch e flussi.
Se la pipeline ha bisogno di più memoria di quella predefinita sui worker, potresti riscontrare errori di memoria insufficiente.
Le pipeline Dataflow utilizzano principalmente la memoria dei worker in tre modi:
Memoria operativa dei worker
I worker Dataflow hanno bisogno di memoria per i sistemi operativi e il sistema i processi di machine learning. La memoria utilizzata dai worker in genere non è superiore a 1 GB. Utilizzo in genere è inferiore a 1 GB.
- Vari processi sul worker usano la memoria per garantire che la pipeline che funzioni. Ciascuno di questi processi di riservare una piccola quantità di memoria per le sue operazioni.
- Quando la pipeline non usano Streaming Engine, mentre i processi worker aggiuntivi utilizzano la memoria.
Memoria di processo SDK
I processi dell'SDK Apache Beam potrebbero creare oggetti e dati che vengono condivisi tra i thread durante il processo, indicati in questa pagina come dati e oggetti condivisi dall'SDK. La memoria utilizzata da questi oggetti e dati condivisi dell'SDK è indicata come SDK di processo. Il seguente elenco include esempi di dati e oggetti condivisi dall'SDK:
- Input aggiuntivi
- Modelli di machine learning
- Oggetti singleton in memoria
- Oggetti Python creati con
Modulo
apache_beam.utils.shared
- Dati caricati da origini esterne, come Cloud Storage o BigQuery
I job di flussi che non utilizzano Streaming Engine archiviano gli input laterali in memoria. Per le pipeline Java e Go, ogni worker ha una copia dell'input aggiuntivo. Per Python di pipeline di input, ciascun processo dell'SDK Apache Beam ha una copia dell'input aggiuntivo.
I job in modalità flusso che utilizzano Streaming Engine hanno un limite di dimensioni di input laterale di 80 MB. Gli input aggiuntivi sono archiviati al di fuori della memoria dei worker.
L'utilizzo della memoria da oggetti e dati condivisi dell'SDK cresce in modo lineare con il numero di processi dell'SDK Apache Beam. Nelle pipeline Java e Go, viene avviato un processo dell'SDK Apache Beam per worker. Nelle pipeline Python, viene avviato un processo dell'SDK Apache Beam per vCPU. Gli oggetti e i dati condivisi dell'SDK vengono riutilizzati nei thread all'interno dello stesso processo dell'SDK Apache Beam.
Memoria utilizzata da DoFn
DoFn
è una classe SDK Apache Beam che definisce una funzione di elaborazione distribuita.
Ogni worker può eseguire istanze DoFn
simultanee. Ogni thread esegue un DoFn
in esecuzione in un'istanza Compute Engine. Durante la valutazione dell'utilizzo totale della memoria, il calcolo delle dimensioni del set di lavoro o
necessaria per far funzionare un'applicazione, potrebbe essere
utili. Ad esempio, se un singolo DoFn
utilizza
un massimo di 5 MB di memoria e un worker ha 300 thread, quindi DoFn
di memoria
potrebbe raggiungere 1, 5 GB o il numero di byte di memoria moltiplicato
il numero di thread. A seconda del modo in cui i worker usano la memoria, un picco di memoria
potrebbe causare l'esaurimento della memoria da parte dei worker.
È difficile stimare quante istanze
DoFn
creati da Dataflow. Il numero dipende da vari fattori, ad esempio dall'SDK,
il tipo di macchina e così via. Inoltre, il DoFn potrebbe essere utilizzato da più thread in successione.
Il servizio Dataflow non garantisce quante volte viene richiamato DoFn
,
né garantisce il numero esatto di istanze DoFn
create nel corso di una pipeline.
Tuttavia, la tabella seguente fornisce alcune informazioni sul livello
di parallelismo ci si può aspettare e che stima un limite superiore
il numero di istanze DoFn
.
SDK Beam Python
Batch | Flussi di dati senza Streaming Engine | Streaming Engine | |
---|---|---|---|
Parallelismo |
1 processo per vCPU 1 thread per processo 1 thread per vCPU
|
1 processo per vCPU 12 thread per processo 12 thread per vCPU |
1 processo per vCPU 12 thread per processo 12 thread per vCPU
|
Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifica in qualsiasi momento) |
1 DoFn per thread
1
|
1 DoFn per thread
12
|
1 DoFn per thread
12
|
SDK Beam Java/Go
Batch | Flussi di dati senza Streaming Engine | Streaming Engine | |
---|---|---|---|
Parallelismo |
1 processo per VM worker 1 thread per vCPU
|
1 processo per VM worker 300 thread per processo 300 thread per VM worker
|
1 processo per VM worker 500 thread per processo 500 thread per VM worker
|
Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifica in qualsiasi momento) |
1 DoFn per thread
1
|
1 DoFn per thread
300
|
1 DoFn per thread
500
|
Quando hai una pipeline multilingue e viene eseguito più di un SDK Apache Beam in esecuzione sul worker, il worker utilizza il livello più basso di thread per processo parallelismo possibile.
Differenze tra Java, Go e Python
Java, Go e Python gestiscono processi e memoria in modo diverso. Di conseguenza, l'approccio che dovresti eseguire quando vuoi risolvere gli errori relativi all'esaurimento della memoria varia in base al indipendentemente dal fatto che la pipeline utilizzi Java, Go o Python.
Pipeline Java e Go
Nelle pipeline Java e Go:
- Ogni worker avvia un processo dell'SDK Apache Beam.
- Gli oggetti e i dati condivisi dell'SDK, come input aggiuntivi e cache, vengono condivisi tra tutti i thread sul worker.
- La memoria utilizzata dagli oggetti e dai dati condivisi dell'SDK di solito non viene scalata in base il numero di vCPU sul worker.
Pipeline Python
Nelle pipeline Python:
- Ogni worker avvia un processo SDK Apache Beam per vCPU.
- Gli oggetti e i dati condivisi dell'SDK, come gli input aggiuntivi e le cache, vengono condivisi tra tutti i thread all'interno di ciascun processo dell'SDK Apache Beam.
- Il numero totale di thread sul worker scala in modo lineare in base al numero di vCPU. Di conseguenza, la memoria utilizzata da oggetti e dati condivisi dell'SDK aumenta in modo lineare con il numero di vCPU.
- I thread che eseguono il lavoro sono distribuiti tra i processi. Le nuove unità di lavoro sono assegnati a un processo senza elementi di lavoro oppure al processo con il minor numero di elementi di lavoro attualmente assegnati.
Individuare gli errori di memoria insufficiente
Per determinare se la pipeline sta per esaurire la memoria, utilizza una uno dei seguenti metodi.
- Nella pagina Dettagli job, nel riquadro Log, visualizza la scheda Diagnostica. Questa scheda mostra gli errori relativi a problemi di memoria e la frequenza con cui si verificano.
- Nell'interfaccia di monitoraggio di Dataflow, usa il grafico sull'utilizzo della memoria per monitorare la capacità e l'utilizzo della memoria dei worker.
Nella pagina Dettagli job, nel riquadro Log, seleziona Log worker. Trova gli errori di memoria.
Java
Java Memory Monitor, configurato dal
Interfaccia di MemoryMonitorOptions
,
segnala periodicamente le metriche di garbage collection. Se la frazione di tempo di CPU utilizzata
La garbage collection supera una soglia del 50% per
per un periodo di tempo prolungato, l'attuale cablaggio dell'SDK ha esito negativo.
Potresti visualizzare un errore simile al seguente esempio:
Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
Questo errore di memoria può verificarsi quando la memoria fisica è ancora disponibili. L'errore di solito indica che la memoria utilizzata dalla pipeline è poco efficiente. Per risolvere il problema, ottimizza la pipeline.
Se il job presenta errori di utilizzo elevato o di memoria insufficiente, segui i consigli in questa pagina per ottimizzare l'utilizzo della memoria o aumentare la quantità di memoria disponibili.
Risolvere gli errori di memoria insufficiente
Le modifiche alla pipeline Dataflow potrebbero esaurire la memoria o ridurre l'utilizzo della memoria. Le possibili modifiche includono le seguenti azioni:
Il seguente diagramma mostra il flusso di lavoro per la risoluzione dei problemi di Dataflow descritto in questa pagina.
Ottimizza la tua pipeline
Diverse operazioni della pipeline possono causare errori di memoria insufficiente. Questa sezione fornisce e opzioni per ridurre l'utilizzo della memoria da parte della pipeline. a identificare la pipeline. fasi che consumano più memoria, utilizzare Cloud Profiler per monitorare le prestazioni della pipeline.
Puoi utilizzare le seguenti best practice per ottimizzare la pipeline:
- Usa i connettori I/O integrati di Apache Beam per leggere i file
- Riprogettare le operazioni quando si utilizzano
GroupByKey
PTransform - Riduci i dati in entrata da origini esterne
- Condividere oggetti tra i thread
- Utilizzare rappresentazioni di elementi efficienti dalla memoria
- Riduci le dimensioni degli input aggiuntivi
Utilizza i connettori I/O integrati di Apache Beam per leggere i file
Non aprire file di grandi dimensioni all'interno di un file DoFn
. Per leggere i file, utilizza
Connettori I/O integrati Apache Beam.
I file aperti in un DoFn
devono rientrare nella memoria. Poiché vengono eseguite più istanze di DoFn
contemporaneamente, i file di grandi dimensioni aperti in DoFn
secondi possono causare errori di memoria esaurita.
Riprogetta le operazioni quando si utilizzano GroupByKey
PTransform
Quando utilizzi un'istruzione PTransform GroupByKey
in Dataflow,
i valori per chiave e per finestra vengono elaborati su un singolo thread. Poiché questi dati
viene passato come flusso dal servizio di backend Dataflow
worker, non deve necessariamente
rientrare nella memoria dei worker. Tuttavia, se i valori sono
raccolti in memoria, la logica di elaborazione potrebbe causare errori di memoria insufficiente.
Ad esempio, se hai una chiave che contiene dati di una finestra e aggiungi il parametro a un oggetto in memoria, come un elenco, gli errori di esaurimento della memoria potrebbero che si verificano. In questo scenario, il worker potrebbe non avere capacità di memoria sufficiente per che contengono tutti gli oggetti.
Per ulteriori informazioni sulle trasformazioni PTransform GroupByKey
, consulta Apache Beam
Python GroupByKey
e Java GroupByKey
documentazione.
L'elenco seguente contiene suggerimenti per la progettazione della pipeline in modo da ridurre al minimo
il consumo di memoria quando si utilizzano GroupByKey
PTransform.
- Per ridurre la quantità di dati per chiave e per finestra, evita chiavi con molte o chiavi di scelta rapida,
- Per ridurre la quantità di dati raccolti per finestra, utilizza una finestra di dimensioni inferiori.
- Se utilizzi i valori di una chiave in una finestra per calcolare un numero, utilizza un
Trasformazione
Combine
: Non eseguire il calcolo in una singola istanza diDoFn
dopo aver raccolto i valori. - Filtra i valori o i duplicati prima dell'elaborazione. Per ulteriori informazioni, consulta
Python
Filter
e JavaFilter
e trasformare la documentazione.
Riduci i dati in entrata da origini esterne
Se effettui chiamate a un'API esterna o a un database per l'arricchimento dei dati,
i dati restituiti devono rientrare
nella memoria del worker.
Se esegui il raggruppamento in batch delle chiamate, è consigliabile utilizzare una trasformazione GroupIntoBatches
.
Se si verificano errori di memoria insufficiente, riduci le dimensioni del batch. Per ulteriori informazioni
sul raggruppamento in batch, vedi
Python GroupIntoBatches
e Java GroupIntoBatches
e trasformare la documentazione.
Condividi oggetti tra i thread
La condivisione di un oggetto dati in memoria tra DoFn
istanze può migliorare lo spazio e
e l'efficienza dell'accesso. Oggetti dati creati con qualsiasi metodo di DoFn
, tra cui
Setup
, StartBundle
, Process
, FinishBundle
e Teardown
, sono richiamati
per ogni DoFn
. In Dataflow, ogni worker potrebbe avere diversi DoFn
di Compute Engine. Per un utilizzo più efficiente della memoria, passa un oggetto dati come singleton
condividerlo tra diversi DoFn
. Per ulteriori informazioni, vedi il post del blog
Riutilizzo della cache in DoFn
.
Utilizzare rappresentazioni di elementi a elevata efficienza di memoria
Valuta se puoi utilizzare le rappresentazioni per PCollection
che utilizzano meno memoria. Quando utilizzi programmatori nella tua pipeline, non considerare
solo rappresentazioni di elementi PCollection
codificate ma anche decodificate. Sparsa
spesso le matrici possono trarre vantaggio da questo tipo di ottimizzazione.
Riduci le dimensioni degli input aggiuntivi
Se DoFn
utilizza input aggiuntivi, riduci le dimensioni dell'input aggiuntivo. Da lato
ovvero raccolte di elementi, valuta la possibilità di utilizzare viste iterabili, come
AsIterable
o AsMultimap
, invece di visualizzazioni che materializzano contemporaneamente l'intero input aggiuntivo, come
AsList
Rendi disponibile più memoria
Per aumentare la memoria disponibile, puoi aumentare la quantità totale di memoria disponibile. sui worker senza modificare la quantità di memoria disponibile per thread. In alternativa, puoi aumentare la quantità di memoria disponibile per thread. Quando aumenti il valore per thread, aumenti anche la memoria totale sul worker.
Puoi aumentare la quantità di memoria disponibile per thread in quattro modi:
- Utilizza un tipo di macchina con più memoria per vCPU.
- Utilizza un tipo di macchina con più vCPU (pipeline di flusso Java e Go).
- Riduci il numero di thread.
- Utilizza un solo processo SDK Apache Beam (flussi di dati Python e pipeline Python Runner v2).
Utilizza un tipo di macchina con più memoria per vCPU
Per selezionare un worker con più memoria per vCPU, utilizza uno dei seguenti metodi.
- Utilizza un tipo di macchina con memoria elevata nel famiglia di macchine per uso generico. I tipi di macchina con memoria elevata hanno una memoria per vCPU maggiore rispetto a dei tipi di macchina standard. L'utilizzo di un tipo di macchina con memoria elevata aumenta memoria disponibile per ogni worker e la memoria disponibile per thread, il numero di vCPU rimane invariato. Di conseguenza, l'utilizzo di una macchina con memoria elevata può essere un modo conveniente per selezionare un worker con più memoria per vCPU.
- Per una maggiore flessibilità nella specifica del numero di vCPU e della quantità memoria, puoi utilizzare un tipo di macchina personalizzata. Con i tipi di macchine personalizzate, possono aumentare la memoria con incrementi di 256 MB. Questi tipi di macchine hanno un prezzo diverso rispetto ai tipi di macchina standard.
- Alcune famiglie di macchine ti consentono memoria estesa personalizzati. La memoria estesa consente un rapporto memoria per vCPU più elevato. Il costo è più alto.
Per impostare i tipi di worker, utilizza la seguente opzione di pipeline. Per ulteriori informazioni, consulta Impostare le opzioni della pipeline e le Opzioni pipeline.
Java
Utilizza l'opzione pipeline --workerMachineType
.
Python
Utilizza l'opzione pipeline --machine_type
.
Vai
Utilizza l'opzione pipeline --worker_machine_type
.
Utilizza un tipo di macchina con più vCPU
Questa opzione è consigliata solo per le pipeline di flusso Java e Go. Tipi di macchina con più
Le vCPU hanno più memoria totale, perché la quantità di memoria scala in modo lineare
con il numero di vCPU. Ad esempio, un tipo di macchina n1-standard-4
con quattro
Le vCPU hanno 15 GB di memoria. Un tipo di macchina n1-standard-8
con otto vCPU
ha 30 GB di memoria. Per ulteriori informazioni sui tipi di macchina predefinita, consulta
Famiglia di macchine per uso generico.
L'utilizzo dei worker con un numero più elevato di vCPU potrebbe
aumenta significativamente il costo della pipeline. Puoi però usare lo spazio
della scalabilità automatica per ridurre il numero totale di worker
rimane la stessa. Ad esempio, se hai 50 worker che utilizzano un tipo di macchina n1-standard-4
e
passi a un tipo di macchina n1-standard-8
, puoi usare la scalabilità automatica orizzontale
e impostare il numero massimo di worker per ridurre il numero totale
di circa 25 worker nella pipeline. Questa configurazione genera una pipeline con un costo simile.
Per impostare il numero massimo di worker, utilizza la seguente opzione della pipeline.
Java
Utilizza l'opzione pipeline --maxNumWorkers
.
Per ulteriori informazioni, vedi Opzioni pipeline.
Vai
Utilizza l'opzione pipeline --max_num_workers
.
Per ulteriori informazioni, vedi Opzioni pipeline.
Questo metodo non è consigliato per le pipeline Python. Quando utilizzi il
SDK Python, se passi a un worker con un numero
alle vCPU, non solo aumenti la memoria, ma aumenti anche il numero
Processi dell'SDK Apache Beam. Ad esempio, il tipo di macchina n1-standard-4
ha la stessa memoria
per thread come tipo di macchina n1-standard-8
per le pipeline Python. Di conseguenza, con le pipeline Python,
è consigliabile usare un tipo di macchina con memoria elevata,
o utilizzare un solo processo SDK Apache Beam.
Riduci il numero di thread
Se l'utilizzo di un tipo di macchina con memoria elevata non risolve il problema, aumenta la quantità di memoria
disponibile per thread riducendo il numero massimo di thread che eseguono DoFn
istanze.
Questa modifica riduce il parallelismo. ridurre il numero di applicazioni Apache Beam
I thread dell'SDK che eseguono DoFn
istanze, usa la seguente opzione di pipeline.
Java
Utilizza l'opzione pipeline --numberOfWorkerHarnessThreads
.
Per ulteriori informazioni, vedi Opzioni pipeline.
Python
Utilizza l'opzione pipeline --number_of_worker_harness_threads
.
Per ulteriori informazioni, vedi Opzioni pipeline.
Vai
Utilizza l'opzione pipeline --number_of_worker_harness_threads
.
Per ulteriori informazioni, vedi Opzioni pipeline.
Per ridurre il numero di thread per le pipeline batch Java e Go, imposta il valore
del flag a un numero inferiore al numero di vCPU sul worker. Per le pipeline in modalità flusso,
il valore del flag su un numero inferiore al numero di thread per
Processo SDK Apache Beam.
Per stimare i thread per processo, consulta la tabella nella sezione Memoria utilizzata da DoFn
in questa pagina.
Questa personalizzazione non è disponibile per le pipeline Python in esecuzione su Apache Beam SDK 2.20.0 o versioni precedenti o per le pipeline Python che non utilizzano Runner v2.
Utilizza un solo processo dell'SDK Apache Beam
Per le pipeline di flusso Python e le pipeline Python che utilizzano Runner v2, puoi obbliga Dataflow ad avviare un solo processo SDK Apache Beam per worker. Prima del giorno con questa opzione, prova innanzitutto a risolvere il problema utilizzando altri metodi. A configurare le VM worker Dataflow in modo da avviare una sola per il processo Python, utilizza il seguente opzione pipeline:
--experiments=no_use_multiple_sdk_containers
Con questa configurazione, le pipeline Python creano un solo processo SDK Apache Beam per worker. Questa configurazione impedisce che gli oggetti e i dati condivisi replicati più volte per ogni processo dell'SDK Apache Beam. Tuttavia, limita l'uso efficiente delle risorse di calcolo disponibili sul worker.
Ridurre il numero di processi SDK Apache Beam a uno non necessariamente ridurre il numero totale di thread avviati sul worker. Inoltre, avere tutti i thread su un singolo processo SDK Apache Beam potrebbero causare un'elaborazione lenta o causare della pipeline per bloccarsi. Potresti quindi dover ridurre anche il numero di thread, come descritto nella sezione Ridurre il numero di thread in questa pagina.
Puoi anche forzare i worker a utilizzare un solo processo SDK Apache Beam utilizzando un tipo di macchina con una sola vCPU.