Risolvere gli errori di esaurimento della memoria di Dataflow

Questa pagina fornisce informazioni sull'utilizzo della memoria nelle pipeline Dataflow e sui passaggi per indagare e risolvere i problemi relativi agli errori di esaurimento della memoria di Dataflow.

Informazioni sull'utilizzo della memoria di Dataflow

Per risolvere gli errori di esaurimento della memoria, è utile comprendere in che modo le pipeline di Dataflow utilizzano la memoria.

Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita su più macchine virtuali (VM) di Compute Engine, spesso chiamate worker. I worker elaborano gli elementi di lavoro dal servizio Dataflow e li delegano ai processi dell'SDK Apache Beam. Un processo SDK Apache Beam crea istanze di DoFn. DoFn è una classe SDK Apache Beam che definisce una funzione di elaborazione distribuita.

Dataflow avvia diversi thread su ogni worker e la memoria di ogni worker viene condivisa tra tutti i thread. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più grande. Il numero predefinito di thread dipende da diversi fattori e varia tra job batch e flussi.

Se la pipeline ha bisogno di più memoria di quella predefinita disponibile sui worker, potrebbero verificarsi errori di memoria esaurita.

Le pipeline Dataflow utilizzano principalmente la memoria dei worker in tre modi:

Memoria operativa dei worker

I worker Dataflow hanno bisogno di memoria per i sistemi operativi e i processi di sistema. La memoria utilizzata dai worker in genere non è superiore a 1 GB. In genere l'utilizzo è inferiore a 1 GB.

  • Diversi processi sul worker utilizzano la memoria per garantire che la pipeline sia in ordine. Ciascuno di questi processi potrebbe riservare una piccola quantità di memoria per il suo funzionamento.
  • Quando la pipeline non utilizza Streaming Engine, i processi worker aggiuntivi utilizzano la memoria.

Memoria di processo SDK

I processi SDK Apache Beam potrebbero creare oggetti e dati che vengono condivisi tra thread all'interno del processo, indicati in questa pagina come oggetti e dati condivisi dell'SDK. La memoria utilizzata da questi oggetti e dati condivisi dell'SDK è indicata come memoria di processo SDK. Il seguente elenco include esempi di dati e oggetti condivisi dall'SDK:

  • Input aggiuntivi
  • Modelli di machine learning
  • Oggetti singleton in memoria
  • Oggetti Python creati con il modulo apache_beam.utils.shared
  • Dati caricati da origini esterne, come Cloud Storage o BigQuery

I job di flussi che non utilizzano Streaming Engine archiviano gli input laterali in memoria. Per le pipeline Java e Go, ogni worker ha una copia dell'input aggiuntivo. Per le pipeline Python, ogni processo dell'SDK Apache Beam ha una copia dell'input aggiuntivo.

I job di flussi che utilizzano Streaming Engine hanno un limite di dimensioni di input laterale di 80 MB. Gli input aggiuntivi sono archiviati al di fuori della memoria dei worker.

L'utilizzo della memoria da oggetti e dati condivisi dell'SDK aumenta in modo lineare in base al numero di processi dell'SDK Apache Beam. Nelle pipeline Java e Go, viene avviato un processo SDK Apache Beam per worker. Nelle pipeline Python, viene avviato un processo dell'SDK Apache Beam per vCPU. Gli oggetti e i dati condivisi dell'SDK vengono riutilizzati nei thread all'interno dello stesso processo dell'SDK Apache Beam.

Memoria utilizzata da DoFn

DoFn è una classe SDK Apache Beam che definisce una funzione di elaborazione distribuita. Ogni worker può eseguire istanze DoFn simultanee. Ogni thread esegue un'istanza DoFn. Durante la valutazione dell'utilizzo totale della memoria, può essere utile calcolare le dimensioni del set di lavoro o la quantità di memoria necessaria per far sì che un'applicazione continui a funzionare. Ad esempio, se un singolo DoFn utilizza un massimo di 5 MB di memoria e un worker ha 300 thread, l'utilizzo della memoria di DoFn potrebbe raggiungere 1,5 GB, ovvero il numero di byte di memoria moltiplicato per il numero di thread. A seconda del modo in cui i worker utilizzano la memoria, un picco di utilizzo della memoria potrebbe esaurire la memoria da parte dei worker.

È difficile stimare quante istanze di un Dataflow DoFn vengono create. Il numero dipende da vari fattori, come l'SDK, il tipo di macchina e così via. Inoltre, il DoFn potrebbe essere utilizzato da più thread in successione. Il servizio Dataflow non garantisce quante volte viene richiamato DoFn né garantisce il numero esatto di istanze DoFn create nel corso di una pipeline. Tuttavia, la tabella seguente fornisce alcuni insight sul livello di parallelismo previsto e stima un limite superiore al numero di DoFn istanze.

SDK Beam Python

Batch Flussi di dati senza Streaming Engine Streaming Engine
Parallelismo 1 processo per vCPU

1 thread per processo

1 thread per vCPU

1 processo per vCPU

12 thread per processo

12 thread per vCPU

1 processo per vCPU

12 thread per processo

12 thread per vCPU

Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifica in qualsiasi momento) 1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

SDK Beam Java/Go

Batch Flussi di dati senza Streaming Engine Streaming Engine
Parallelismo 1 processo per VM worker

1 thread per vCPU

1 processo per VM worker

300 thread per processo

300 thread per VM worker

1 processo per VM worker

500 thread per processo

500 thread per VM worker

Numero massimo di istanze DoFn simultanee (tutti questi numeri sono soggetti a modifica in qualsiasi momento) 1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

300 DoFn per VM worker

1 DoFn per thread

500 DoFn per VM worker

Quando hai una pipeline multilingue e sul worker è in esecuzione più di un SDK Apache Beam, il worker utilizza il grado più basso di parallelismo thread per processo possibile.

Differenze tra Java, Go e Python

Java, Go e Python gestiscono processi e memoria in modo diverso. Di conseguenza, l'approccio da adottare per la risoluzione degli errori di esaurimento della memoria varia a seconda che la pipeline utilizzi o meno Java, Go o Python.

Pipeline Java e Go

Nelle pipeline Java e Go:

  • Ogni worker avvia un processo dell'SDK Apache Beam.
  • Gli oggetti e i dati condivisi dell'SDK, come input aggiuntivi e cache, vengono condivisi tra tutti i thread sul worker.
  • La memoria utilizzata dai dati e dagli oggetti condivisi dell'SDK solitamente non viene scalata in base al numero di vCPU sul worker.

Pipeline Python

Nelle pipeline Python:

  • Ogni worker avvia un processo SDK Apache Beam per vCPU.
  • Gli oggetti e i dati condivisi dell'SDK, come gli input aggiuntivi e le cache, vengono condivisi tra tutti i thread all'interno di ciascun processo dell'SDK Apache Beam.
  • Il numero totale di thread sul worker scala in modo lineare in base al numero di vCPU. Di conseguenza, la memoria utilizzata da oggetti e dati condivisi dell'SDK aumenta in modo lineare con il numero di vCPU.
  • I thread che eseguono il lavoro sono distribuiti tra i processi. Le nuove unità di lavoro vengono assegnate a un processo senza elementi di lavoro oppure al processo con il minor numero di elementi di lavoro attualmente assegnati.

Individuare gli errori di memoria insufficiente

Per determinare se la tua pipeline sta per esaurire la memoria, utilizza uno dei seguenti metodi.

Java

Java Memory Monitor, configurato dall'interfaccia di MemoryMonitorOptions, segnala periodicamente le metriche di garbage collection. Se la frazione di tempo di CPU utilizzata per la garbage collection supera una soglia del 50% per un periodo di tempo prolungato, l'attuale cablaggio dell'SDK non riesce.

Potresti visualizzare un errore simile al seguente esempio:

Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...

Questo errore di memoria può verificarsi quando la memoria fisica è ancora disponibile. L'errore di solito indica che l'utilizzo della memoria della pipeline è inefficiente. Per risolvere il problema, ottimizza la pipeline.

Se il job ha un utilizzo elevato della memoria o errori di memoria insufficiente, segui i suggerimenti in questa pagina per ottimizzare l'utilizzo della memoria o aumentare la quantità di memoria disponibile.

Risolvere gli errori di memoria insufficiente

Le modifiche alla pipeline Dataflow potrebbero risolvere errori di memoria o ridurre la memoria utilizzata. Le possibili modifiche includono le seguenti azioni:

Il seguente diagramma mostra il flusso di lavoro per la risoluzione dei problemi di Dataflow descritto in questa pagina.

Diagramma che mostra il flusso di lavoro per la risoluzione dei problemi.

Ottimizza la tua pipeline

Diverse operazioni della pipeline possono causare errori di memoria insufficiente. Questa sezione fornisce opzioni per ridurre l'utilizzo della memoria della pipeline. Per identificare le fasi della pipeline che consumano più memoria, utilizza Cloud Profiler per monitorare le prestazioni della pipeline.

Puoi utilizzare le seguenti best practice per ottimizzare la pipeline:

Utilizza i connettori I/O integrati di Apache Beam per leggere i file

Non aprire file di grandi dimensioni all'interno di un file DoFn. Per leggere i file, utilizza i connettori I/O integrati di Apache Beam. I file aperti in un DoFn devono rientrare nella memoria. Poiché più istanze DoFn vengono eseguite contemporaneamente, i file di grandi dimensioni aperti in DoFn possono causare errori di memoria insufficiente.

Riprogetta le operazioni quando si utilizzano GroupByKey PTransform

Quando utilizzi un'istanza PTransform GroupByKey in Dataflow, i valori per chiave e per finestra risultanti vengono elaborati su un singolo thread. Poiché questi dati vengono passati come flusso dal servizio di backend Dataflow ai worker, non è necessario che rientrino nella memoria dei worker. Tuttavia, se i valori vengono raccolti in memoria, la logica di elaborazione potrebbe causare errori di memoria insufficiente.

Ad esempio, se hai una chiave che contiene dati per una finestra e aggiungi le coppie chiave-valore a un oggetto in memoria come un elenco, possono verificarsi errori di memoria insufficiente. In questo scenario, il worker potrebbe non avere una capacità di memoria sufficiente per contenere tutti gli oggetti.

Per ulteriori informazioni su PTransform GroupByKey, consulta la documentazione di Apache Beam Python GroupByKey e Java GroupByKey.

L'elenco seguente contiene suggerimenti per la progettazione della pipeline in modo da ridurre al minimo il consumo di memoria quando utilizzi le trasformazioni PTransform GroupByKey.

  • Per ridurre la quantità di dati per chiave e per finestra, evita chiavi con molti valori, note anche come chiavi rapide.
  • Per ridurre la quantità di dati raccolti per finestra, utilizza una finestra di dimensioni inferiori.
  • Se usi i valori di una chiave in una finestra per calcolare un numero, usa una trasformazione Combine. Non eseguire il calcolo in una singola istanza di DoFn dopo aver raccolto i valori.
  • Filtra i valori o i duplicati prima dell'elaborazione. Per ulteriori informazioni, consulta la documentazione su Python Filter e Java Filter.

Riduci i dati in entrata da origini esterne

Se effettui chiamate a un'API esterna o a un database per l'arricchimento dei dati, i dati restituiti devono rientrare nella memoria del worker. Se esegui il raggruppamento in batch delle chiamate, è consigliabile utilizzare una trasformazione GroupIntoBatches. Se si verificano errori di memoria insufficiente, riduci le dimensioni del batch. Per ulteriori informazioni sul raggruppamento in batch, consulta la documentazione su Python GroupIntoBatches e Java GroupIntoBatches.

Condividi oggetti tra i thread

La condivisione di un oggetto dati in memoria tra DoFn istanze può migliorare lo spazio e l'efficienza dell'accesso. Gli oggetti dati creati con qualsiasi metodo di DoFn, tra cui Setup, StartBundle, Process, FinishBundle e Teardown, vengono richiamati per ogni DoFn. In Dataflow, ogni worker può avere diverse istanze DoFn. Per un utilizzo più efficiente della memoria, passa un oggetto dati come singleton per condividerlo tra più DoFn. Per ulteriori informazioni, consulta il post del blog Riutilizzo della cache in tutti i DoFn.

Utilizza rappresentazioni di elementi a elevata efficienza di memoria

Valuta se puoi utilizzare le rappresentazioni per gli elementi PCollection che utilizzano meno memoria. Quando utilizzi programmatori nella pipeline, considera non solo le rappresentazioni degli elementi PCollection codificate, ma anche decodificate. Le matrici sparse possono spesso trarre vantaggio da questo tipo di ottimizzazione.

Riduci le dimensioni degli input aggiuntivi

Se i tuoi DoFn utilizzano input aggiuntivi, riduci le dimensioni dell'input aggiuntivo. Per gli input collaterali che sono raccolte di elementi, valuta la possibilità di utilizzare visualizzazioni iterabili, come AsIterable o AsMultimap, anziché viste che materializzano l'intero input aggiuntivo contemporaneamente, ad esempio AsList.

Rendi disponibile più memoria

Per aumentare la memoria disponibile, puoi aumentare la quantità totale di memoria disponibile sui worker senza modificare la quantità di memoria disponibile per thread. In alternativa, puoi aumentare la quantità di memoria disponibile per thread. Aumentando la memoria per thread, aumenti anche la memoria totale sul worker.

Puoi aumentare la quantità di memoria disponibile per thread in quattro modi:

Utilizza un tipo di macchina con più memoria per vCPU

Per selezionare un worker con più memoria per vCPU, utilizza uno dei seguenti metodi.

  • Nella famiglia di macchine per uso generico, utilizza un tipo di macchina con memoria elevata. I tipi di macchine con memoria elevata hanno una memoria per vCPU maggiore rispetto ai tipi di macchine standard. L'utilizzo di un tipo di macchina con memoria elevata aumenta sia la memoria disponibile per ogni worker sia la memoria disponibile per thread, perché il numero di vCPU rimane lo stesso. Di conseguenza, l'utilizzo di un tipo di macchina con memoria elevata può essere un modo conveniente per selezionare un worker con più memoria per vCPU.
  • Per una maggiore flessibilità nella specifica del numero di vCPU e della quantità di memoria, puoi utilizzare un tipo di macchina personalizzata. Con i tipi di macchine personalizzate puoi aumentare la memoria con incrementi di 256 MB. Il prezzo di questi tipi di macchine è diverso rispetto ai tipi standard.
  • Alcune famiglie di macchine consentono di utilizzare tipi di macchine personalizzate con memoria estesa. La memoria estesa consente un rapporto memoria per vCPU più elevato. Il costo è più alto.

Per impostare i tipi di worker, utilizza la seguente opzione di pipeline. Per ulteriori informazioni, consulta Impostare le opzioni della pipeline e Opzioni della pipeline.

Java

Utilizza l'opzione pipeline --workerMachineType.

Python

Utilizza l'opzione pipeline --machine_type.

Go

Utilizza l'opzione pipeline --worker_machine_type.

Utilizza un tipo di macchina con più vCPU

Questa opzione è consigliata solo per le pipeline di flusso Java e Go. I tipi di macchina con più vCPU hanno una memoria totale maggiore, poiché la quantità di memoria scala in modo lineare seguendo il numero di vCPU. Ad esempio, un tipo di macchina n1-standard-4 con quattro vCPU ha 15 GB di memoria. Un tipo di macchina n1-standard-8 con otto vCPU ha 30 GB di memoria. Per ulteriori informazioni sui tipi di macchine predefinite, consulta la pagina Famiglia di macchine per uso generico.

L'utilizzo di worker con un numero più elevato di vCPU potrebbe aumentare significativamente il costo della pipeline. Tuttavia, puoi utilizzare la scalabilità automatica orizzontale per ridurre il numero di worker totali in modo che il parallelismo rimanga lo stesso. Ad esempio, se hai 50 worker che utilizzano un tipo di macchina n1-standard-4 e passi a un tipo di macchina n1-standard-8, puoi utilizzare la scalabilità automatica orizzontale e impostare il numero massimo di worker per ridurre il numero totale di worker nella pipeline a circa 25. Questa configurazione genera una pipeline con un costo simile.

Per impostare il numero massimo di worker, utilizza la seguente opzione della pipeline.

Java

Utilizza l'opzione pipeline --maxNumWorkers.

Per maggiori informazioni, consulta Opzioni pipeline.

Go

Utilizza l'opzione pipeline --max_num_workers.

Per maggiori informazioni, consulta Opzioni pipeline.

Questo metodo non è consigliato per le pipeline Python. Quando utilizzi l'SDK Python, se passi a un worker con un numero più elevato di vCPU, non solo aumenti la memoria, ma aumenti anche il numero di processi SDK Apache Beam. Ad esempio, il tipo di macchina n1-standard-4 ha la stessa memoria per thread del tipo di macchina n1-standard-8 per le pipeline Python. Pertanto, con le pipeline Python, si consiglia di utilizzare un tipo di macchina con memoria elevata, ridurre il numero di thread o utilizzare un solo processo SDK Apache Beam.

Riduci il numero di thread

Se l'utilizzo di un tipo di macchina con memoria elevata non risolve il problema, aumenta la memoria disponibile per thread riducendo il numero massimo di thread che eseguono istanze di DoFn. Questa modifica riduce il parallelismo. Per ridurre il numero di thread dell'SDK Apache Beam che eseguono istanze DoFn, utilizza la seguente opzione di pipeline.

Java

Utilizza l'opzione pipeline --numberOfWorkerHarnessThreads.

Per maggiori informazioni, consulta Opzioni pipeline.

Python

Utilizza l'opzione pipeline --number_of_worker_harness_threads.

Per maggiori informazioni, consulta Opzioni pipeline.

Go

Utilizza l'opzione pipeline --number_of_worker_harness_threads.

Per maggiori informazioni, consulta Opzioni pipeline.

Per ridurre il numero di thread per le pipeline batch Java e Go, imposta il valore del flag su un numero inferiore al numero di vCPU sul worker. Per le pipeline in modalità flusso, imposta il valore del flag su un numero inferiore al numero di thread per processo SDK Apache Beam. Per stimare i thread per processo, consulta la tabella nella sezione Utilizzo della memoria DoFn in questa pagina.

Questa personalizzazione non è disponibile per le pipeline Python in esecuzione sull'SDK Apache Beam 2.20.0 o versioni precedenti o per le pipeline Python che non utilizzano Runner v2.

Utilizza un solo processo dell'SDK Apache Beam

Per le pipeline di flusso Python e le pipeline Python che utilizzano Runner v2, puoi forzare Dataflow ad avviare un solo processo dell'SDK Apache Beam per worker. Prima di provare questa opzione, prova a risolvere il problema utilizzando altri metodi. Per configurare le VM worker Dataflow in modo da avviare un solo processo Python containerizzato, utilizza la seguente opzione di pipeline:

--experiments=no_use_multiple_sdk_containers

Con questa configurazione, le pipeline Python creano un processo SDK Apache Beam per worker. Questa configurazione impedisce che gli oggetti e i dati condivisi vengano replicati più volte per ciascun processo dell'SDK Apache Beam. Tuttavia, limita l'uso efficiente delle risorse di calcolo disponibili sul worker.

Ridurre il numero di processi dell'SDK Apache Beam a uno non riduce necessariamente il numero totale di thread avviati sul worker. Inoltre, la presenza di tutti i thread su un singolo processo SDK Apache Beam potrebbe causare un'elaborazione lenta o causare il blocco della pipeline. Pertanto, potrebbe essere necessario ridurre anche il numero di thread, come descritto nella sezione Ridurre il numero di thread in questa pagina.

Puoi anche forzare i worker a utilizzare un solo processo SDK Apache Beam utilizzando un tipo di macchina con una sola vCPU.