Risolvere i problemi di memoria insufficiente di Dataflow

Questa pagina descrive come trovare e risolvere gli errori di esaurimento della memoria (OOM) in Dataflow.

Trovare gli errori di memoria insufficiente

Per determinare se la pipeline sta esaurendo la memoria, utilizza uno dei seguenti metodi.

  • Nella pagina Dettagli job, nel riquadro Log, visualizza la scheda Diagnostica. Questa scheda mostra gli errori relativi a problemi di memoria e la frequenza con cui si verificano.
  • Nell'interfaccia di monitoraggio di Dataflow, utilizza il grafico Utilizzo della memoria per monitorare la capacità e l'utilizzo della memoria dei worker.
  • Nella pagina Dettagli job, nel riquadro Log, seleziona Log dei worker per trovare errori di esaurimento della memoria nei log dei worker.
  • Gli errori di memoria insufficiente potrebbero comparire anche nei log di sistema. Per visualizzarli, vai a Esplora log e utilizza la seguente query:

    resource.type="dataflow_step"
    resource.labels.job_id="JOB_ID"
    "out of memory" OR "OutOfMemory" OR "Shutting down JVM"
    

    Sostituisci JOB_ID con l'ID del tuo job.

  • Per i job Java, Java Memory Monitor genera periodicamente report sulle metriche della raccolta dei rifiuti. Se la frazione di tempo della CPU utilizzata per la raccolta dei rifiuti supera una soglia del 50% per un periodo di tempo prolungato, l'harness dell'SDK non va a buon fine. È possibile che venga visualizzato un errore simile al seguente esempio:

    Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
    

    Questo errore può verificarsi quando la memoria fisica è ancora disponibile e in genere indica che l'utilizzo della memoria della pipeline è inefficiente. Per risolvere questo problema, ottimizza la pipeline.

    Il monitor della memoria Java viene configurato dall'interfaccia MemoryMonitorOptions.

Se il tuo job ha un utilizzo elevato della memoria o errori di esaurimento della memoria, segui i consigli riportati in questa pagina per ottimizzare l'utilizzo della memoria o aumentare la quantità di memoria disponibile.

Risolvere gli errori di memoria insufficiente

Le modifiche alla pipeline di Dataflow potrebbero risolvere gli errori di esaurimento della memoria o ridurre l'utilizzo della memoria. Le possibili modifiche includono le seguenti azioni:

Il seguente diagramma mostra il flusso di lavoro per la risoluzione dei problemi di Dataflow descritto in questa pagina.

Un diagramma che mostra il flusso di lavoro per la risoluzione dei problemi.

Prova le seguenti misure di mitigazione:

  • Se possibile, ottimizza la pipeline per ridurre l'utilizzo della memoria.
  • Se il job è un job batch, prova a svolgere i seguenti passaggi nell'ordine indicato:
    1. Utilizza un tipo di macchina con più memoria per vCPU.
    2. Riduci il numero di thread a un valore inferiore al numero di vCPU per worker.
    3. Utilizza un tipo di macchina personalizzata con più memoria per vCPU.
  • Se il job è un job di streaming che utilizza Python, riduci il numero di thread a meno di 12.
  • Se il job è un job di streaming che utilizza Java o Go, prova quanto segue:
    1. Riduci il numero di thread a meno di 500 per i job di Runner v2 o a meno di 300 per i job che non utilizzano Runner v2.
    2. Utilizza un tipo di macchina con più memoria.

Ottimizzare la pipeline

Diverse operazioni della pipeline possono causare errori di esaurimento della memoria. Questa sezione fornisce opzioni per ridurre l'utilizzo della memoria della pipeline. Per identificare le fasi della pipeline che consumano più memoria, utilizza Cloud Profiler per monitorare le prestazioni della pipeline.

Per ottimizzare la pipeline, puoi utilizzare le seguenti best practice:

Utilizzare i connettori I/O integrati di Apache Beam per leggere i file

Non aprire file di grandi dimensioni all'interno di un DoFn. Per leggere i file, utilizza i connettori I/O integrati di Apache Beam. I file aperti in un DoFn devono rientrare nella memoria. Poiché vengono eseguite più istanze di DoFn contemporaneamente, i file di grandi dimensioni aperti in DoFn possono causare errori di memoria insufficiente.

Riprogetta le operazioni quando utilizzi le PTransform GroupByKey

Quando utilizzi una PTransform GroupByKey in Dataflow, i valori per chiave e per finestra risultanti vengono elaborati su un singolo thread. Poiché questi dati vengono trasmessi come stream dal servizio di backend Dataflow ai worker, non devono essere inseriti nella memoria dei worker. Tuttavia, se i valori vengono raccolti in memoria, la logica di elaborazione potrebbe causare errori di memoria insufficiente.

Ad esempio, se hai una chiave che contiene dati per una finestra e aggiungi i valori della chiave a un oggetto in memoria, ad esempio un elenco, potrebbero verificarsi errori di esaurimento della memoria. In questo caso, il worker potrebbe non avere una capacità di memoria sufficiente per contenere tutti gli oggetti.

Per ulteriori informazioni sulle PTransform GroupByKey, consulta la documentazione di Apache Beam per GroupByKey in Python e GroupByKey in Java.

L'elenco seguente contiene suggerimenti per progettare la pipeline in modo da ridurre al minimo il consumo di memoria quando utilizzi le trasformazioni P GroupByKey.

  • Per ridurre la quantità di dati per chiave e per finestra, evita le chiavi con molti valori, note anche come hot key.
  • Per ridurre la quantità di dati raccolti per finestra, utilizza una finestra più piccola.
  • Se utilizzi i valori di una chiave in una finestra per calcolare un numero, utilizza una Combine trasformazione. Non eseguire il calcolo in una singola istanza di DoFn dopo aver raccolto i valori.
  • Filtra i valori o i duplicati prima dell'elaborazione. Per ulteriori informazioni, consulta la documentazione delle trasformazioni Filter per Python e Filter per Java.

Riduci i dati in entrata da origini esterne

Se effettui chiamate a un'API esterna o a un database per l'arricchimento dei dati, i dati restituiti devono rientrare nella memoria del worker. Se raggruppi le chiamate, ti consigliamo di utilizzare una trasformazione GroupIntoBatches. Se si verificano errori di memoria insufficiente, riduci la dimensione del batch. Per ulteriori informazioni sul raggruppamento in batch, consulta la documentazione delle trasformazioni GroupIntoBatches per Python e GroupIntoBatches per Java.

Condividere oggetti tra thread

La condivisione di un oggetto dati in memoria tra istanze DoFn può migliorare l'efficienza dello spazio e dell'accesso. Gli oggetti dati creati in qualsiasi metodo di DoFn, inclusi Setup, StartBundle, Process, FinishBundle e Teardown, vengono invocati per ogni DoFn. In Dataflow, ogni worker potrebbe avere più istanze DoFn. Per un utilizzo più efficiente della memoria, passa un oggetto dati come singleton per condividerlo tra più DoFn. Per ulteriori informazioni, consulta il post del blog Riutilizzo della cache tra DoFn.

Utilizza rappresentazioni degli elementi che consentono di risparmiare memoria

Valuta se puoi utilizzare rappresentazioni per gli elementi PCollection che utilizzano meno memoria. Quando utilizzi i codificatori nella pipeline, valuta la possibilità di considerare non solo le rappresentazioni degli elementi PCollection codificati, ma anche decodificati. Le matrici sparse possono spesso trarre vantaggio da questo tipo di ottimizzazione.

Riduci le dimensioni degli input laterali

Se i tuoi DoFn utilizzano ingressi laterali, riduci le dimensioni dell'ingresso laterale. Per gli input secondari che sono raccolte di elementi, ti consigliamo di utilizzare viste iterabili, come AsIterable o AsMultimap, anziché viste che materializzano contemporaneamente l'intero input secondario, come AsList.

Riduci il numero di thread

Puoi aumentare la memoria disponibile per thread riducendo il numero massimo di thread che eseguono istanze DoFn. Questa modifica riduce il parallelismo, ma consente di rendere disponibile più memoria per ogni DoFn.

La tabella seguente mostra il numero predefinito di thread creati da Dataflow:

Tipo di job SDK Python SDK Java/Go
Batch 1 thread per vCPU 1 thread per vCPU
Streaming con Runner v2 12 thread per vCPU 500 thread per VM worker
Streaming senza Runner v2 12 thread per vCPU 300 thread per VM di lavoro

Per ridurre il numero di thread dell'SDK Apache Beam, imposta la seguente opzione di pipeline:

Utilizza l'opzione della pipeline --numberOfWorkerHarnessThreads.

Utilizza l'opzione della pipeline --number_of_worker_harness_threads.

Utilizza l'opzione della pipeline --number_of_worker_harness_threads.

Per i job batch, imposta il valore su un numero inferiore al numero di vCPU.

Per i job di streaming, inizia riducendo il valore a metà del valore predefinito. Se questo passaggio non riduce il problema, continua a dimezzare il valore, osservando i risultati a ogni passaggio. Ad esempio, quando utilizzi Python, prova i valori 6, 3 e 1.

Utilizza un tipo di macchina con più memoria per vCPU

Per selezionare un worker con più memoria per vCPU, utilizza uno dei seguenti metodi.

  • Utilizza un tipo di macchina con memoria elevata nella famiglia di macchine per uso generico. I tipi di macchine con memoria elevata hanno una memoria per vCPU superiore rispetto ai tipi di macchine standard. L'utilizzo di un tipo di macchina con memoria elevata aumenta sia la memoria disponibile per ogni worker sia la memoria disponibile per thread, perché il numero di vCPU rimane invariato. Di conseguenza, l'utilizzo di un tipo di macchina con memoria elevata può essere un modo conveniente per selezionare un worker con più memoria per vCPU.
  • Per una maggiore flessibilità nella specifica del numero di vCPU e della quantità di memoria, puoi utilizzare un tipo di macchina personalizzata. Con i tipi di macchine personalizzate, puoi aumentare la memoria in incrementi di 256 MB. I prezzi di questi tipi di macchine sono diversi rispetto ai tipi di macchine standard.
  • Alcune famiglie di macchine ti consentono di utilizzare tipi di macchine personalizzate con memoria estesa. La memoria estesa consente un rapporto memoria/vCPU più elevato. Il costo è più elevato.

Per impostare i tipi di worker, utilizza la seguente opzione di pipeline. Per ulteriori informazioni, consulta Impostare le opzioni della pipeline e Opzioni della pipeline.

Utilizza l'opzione della pipeline --workerMachineType.

Utilizza l'opzione della pipeline --machine_type.

Utilizza l'opzione della pipeline --worker_machine_type.

Informazioni sull'utilizzo della memoria di Dataflow

Per risolvere i problemi di memoria insufficiente, è utile capire come le pipeline Dataflow utilizzano la memoria.

Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita su più macchine virtuali (VM) Compute Engine, spesso chiamate worker. I worker elaborano gli elementi di lavoro del servizio Dataflow e li delegheranno ai processi dell'SDK Apache Beam. Un processo dell'SDK Apache Beam crea istanze di DoFn. DoFn è una classe SDK Apache Beam che definisce una funzione di elaborazione distribuita.

Dataflow avvia diversi thread su ogni worker e la memoria di ciascun worker viene condivisa tra tutti i thread. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più grande. Il numero predefinito di thread dipende da diversi fattori e varia in base ai job batch e in streaming.

Se la pipeline ha bisogno di più memoria rispetto alla quantità predefinita disponibile sui worker, potresti riscontrare errori di esaurimento della memoria.

Le pipeline Dataflow utilizzano principalmente la memoria dei worker in tre modi:

Memoria operativa del worker

I worker di Dataflow richiedono memoria per i sistemi operativi e le procedure di sistema. L'utilizzo della memoria dei worker in genere non supera 1 GB. L'utilizzo solitamente è inferiore a 1 GB.

  • Vari processi sul worker utilizzano la memoria per garantire il corretto funzionamento della pipeline. Ciascuno di questi processi potrebbe prenotare una piccola quantità di memoria per il proprio funzionamento.
  • Quando la pipeline non utilizza Streaming Engine, i processi di lavoro aggiuntivi utilizzano la memoria.

Memoria del processo SDK

I processi dell'SDK Apache Beam potrebbero creare oggetti e dati condivisi tra i thread all'interno del processo, indicati in questa pagina come oggetti e dati condivisi dell'SDK. L'utilizzo della memoria da parte di questi oggetti e dati condivisi dall'SDK è denominato memoria di processo dell'SDK. Il seguente elenco include esempi di oggetti e dati condivisi dall'SDK:

  • Input aggiuntivi
  • Modelli di machine learning
  • Oggetti singleton in memoria
  • Oggetti Python creati con il modulo apache_beam.utils.shared
  • Dati caricati da origini esterne, come Cloud Storage o BigQuery

I job di streaming che non utilizzano Streaming Engine memorizzano gli input lato client in memoria. Per le pipeline Java e Go, ogni worker ha una copia dell'input laterale. Per le pipeline in Python, ogni processo dell'SDK Apache Beam ha una copia dell'input laterale.

I job di streaming che utilizzano Streaming Engine hanno un limite di dimensioni per gli input secondari di 80 MB. Gli input laterali vengono memorizzati al di fuori della memoria del worker.

L'utilizzo della memoria da parte degli oggetti e dei dati condivisi dell'SDK cresce in modo lineare con il numero di processi dell'SDK Apache Beam. Nelle pipeline Java e Go, viene avviato un processo SDK Apache Beam per ogni worker. Nelle pipeline Python, viene avviato un processo dell'SDK Apache Beam per vCPU. Gli oggetti e i dati condivisi dell'SDK vengono riutilizzati nei thread all'interno dello stesso processo dell'SDK Apache Beam.

Utilizzo memoria di DoFn

DoFn è una classe SDK Apache Beam che definisce una funzione di elaborazione distribuita. Ogni worker può eseguire istanze DoFn simultanee. Ogni thread esegue un'istanza DoFn. Quando valuti l'utilizzo totale della memoria, potrebbe essere utile calcolare le dimensioni del set di lavoro o la quantità di memoria necessaria per il funzionamento continuo di un'applicazione. Ad esempio, se un singolo DoFn utilizza un massimo di 5 MB di memoria e un worker ha 300 thread, l'utilizzo della memoria di DoFn potrebbe raggiungere un picco di 1,5 GB, ovvero il numero di byte di memoria moltiplicato per il numero di thread. A seconda di come i worker utilizzano la memoria, un picco di utilizzo della memoria potrebbe causare il loro esaurimento.

È difficile stimare quante istanze di un DoFn vengono create da Dataflow. Il numero dipende da vari fattori, come l'SDK, il tipo di macchina e così via. Inoltre, la DoFn potrebbe essere utilizzata da più thread in successione. Il servizio Dataflow non garantisce quante volte viene invocato un DoFn, né il numero esatto di istanze DoFn create nel corso di una pipeline. Tuttavia, la tabella seguente fornisce alcune informazioni sul livello di parallelismo che puoi aspettarti e stima un limite superiore per il numero di istanze DoFn.

Batch Streaming senza Streaming Engine Streaming Engine
Parallelismo 1 processo per vCPU

1 thread per processo

1 thread per vCPU

1 processo per vCPU

12 thread per processo

12 thread per vCPU

1 processo per vCPU

12 thread per processo

12 thread per vCPU

Numero massimo di istanze DoFn concorrenti (tutti questi numeri sono soggetti a modifiche in qualsiasi momento). 1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

1 DoFn per thread

12 DoFn per vCPU

Batch Streaming Appliance e Streaming Engine senza runner v2 Streaming Engine con runner v2
Parallelismo 1 processo per VM worker

1 thread per vCPU

1 processo per VM worker

300 thread per processo

300 thread per VM di lavoro

1 processo per VM worker

500 thread per processo

500 thread per VM worker

Numero massimo di istanze DoFn concorrenti (tutti questi numeri sono soggetti a modifiche in qualsiasi momento). 1 DoFn per thread

1 DoFn per vCPU

1 DoFn per thread

300 DoFn per VM worker

1 DoFn per thread

500 DoFn per VM worker

Ad esempio, quando utilizzi l'SDK Python con un worker n1-standard-2 Dataflow, si applica quanto segue:

  • Job batch: Dataflow avvia un processo per vCPU (due in questo caso). Ogni processo utilizza un thread e ogni thread crea un'DoFn istanza.
  • Job in streaming con Streaming Engine: Dataflow avvia un processo per vCPU (due in totale). Tuttavia, ogni processo può generare fino a 12 thread, ciascuno con la propria istanza DoFn.

Quando progetti pipeline complesse, è importante comprendere il DoFn ciclo di vita. Assicurati che le funzioni DoFn siano serializzabili ed evita di modificare l'argomento elemento direttamente al loro interno.

Quando hai una pipeline multilingue e più di un SDK Apache Beam è in esecuzione sul worker, il worker utilizza il grado più basso possibile di parallelismo thread per processo.

Differenze tra Java, Go e Python

Java, Go e Python gestiscono i processi e la memoria in modo diverso. Di conseguenza, l'approccio da seguire per la risoluzione dei problemi di esaurimento della memoria varia a seconda che la pipeline utilizzi Java, Go o Python.

Pipeline Java e Go

Nelle pipeline Java e Go:

  • Ogni worker avvia un processo dell'SDK Apache Beam.
  • Gli oggetti e i dati condivisi dall'SDK, come input secondari e cache, vengono condivisi tra tutti i thread del worker.
  • La memoria utilizzata dagli oggetti e dai dati condivisi dell'SDK in genere non viene scalata in base al numero di vCPU sul worker.

Pipeline Python

Nelle pipeline Python:

  • Ogni worker avvia un processo dell'SDK Apache Beam per vCPU.
  • Gli oggetti e i dati condivisi dall'SDK, come gli input secondari e le cache, vengono condivisi tra tutti i thread all'interno di ogni processo dell'SDK Apache Beam.
  • Il numero totale di thread sul worker è proporzionale in base al numero di vCPU. Di conseguenza, la memoria utilizzata dagli oggetti e dai dati condivisi dell'SDK aumenta in modo lineare con il numero di vCPU.
  • I thread che eseguono il lavoro sono distribuiti tra i processi. Le nuove unità di lavoro vengono assegnate a un processo senza elementi di lavoro o al processo con il minor numero di elementi di lavoro attualmente assegnati.