Il scaling dinamico dei thread fa parte della suite di funzionalità di scaling verticale di Dataflow. Completa la funzionalità di scalabilità automatica orizzontale di Dataflow regolando il numero di attività parallele, chiamate anche bundle, che vengono eseguite da ciascun worker Dataflow. L'obiettivo è aumentare l'efficienza complessiva della pipeline Dataflow.
Quando Dataflow esegue una pipeline, l'elaborazione viene distribuita su più macchine virtuali (VM) Compute Engine, note anche come worker. Un thread è una singola attività eseguibile in esecuzione all'interno di un processo più grande. Dataflow avvia diversi thread su ogni worker.
Quando la scalabilità dinamica dei thread è abilitata, il servizio Dataflow sceglie automaticamente il numero appropriato di thread da eseguire su ogni worker Dataflow. Poiché ogni thread esegue un'attività, l'aumento del numero di thread consente di eseguire più attività in parallelo su un worker. Quando utilizzi questa funzionalità con la funzionalità di scalabilità automatica orizzontale, il numero totale di thread utilizzati dalla pipeline rimane invariato, ma vengono utilizzati meno worker.
La scalabilità dinamica dei thread utilizza un algoritmo per determinare il numero di thread di cui ha bisogno ciascun worker in base agli indicatori di utilizzo delle risorse generati durante l'esecuzione della pipeline. Per ulteriori informazioni, consulta la sezione Come funziona di questa pagina.
Vantaggi
Il scaling dinamico dei thread presenta i seguenti potenziali vantaggi.
- Consente ai worker di Dataflow di elaborare i dati in modo più efficiente migliorando l'utilizzo della CPU e della memoria per ciascun worker.
- Migliora l'elaborazione parallela regolando il numero di thread worker disponibili per l'esecuzione delle attività in parallelo durante l'esecuzione della pipeline.
- Riduce il numero di worker necessari per elaborare set di dati di grandi dimensioni, il che potrebbe ridurre i costi.
Supporto e limitazioni
- Il scaling dinamico dei thread è disponibile per le pipeline che utilizzano gli SDK Java, Python e Go.
- Il job Dataflow deve utilizzare Runner v2.
- Sono supportate solo le pipeline batch.
- Le pipeline che richiedono un uso intensivo di CPU o memoria potrebbero non trarre vantaggio dalla scalabilità dinamica dei thread.
- La scalabilità dinamica dei thread non riduce il tempo necessario per completare un job Dataflow.
Come funziona
La scalabilità dinamica dei thread utilizza i principi di ottimizzazione automatica per aumentare o diminuire dinamicamente il numero di thread su ciascun worker nel pool di worker Dataflow. Il numero di thread viene scalato in modo indipendente su ogni worker. Ogni thread esegue un'attività. L'aumento del numero di thread consente di eseguire più attività in parallelo su un worker. Man mano che le attività vengono completate e i thread non sono più necessari, il numero di thread viene ridotto. Un algoritmo determina il numero di thread necessari per ogni worker.
Il numero di thread su un worker viene scalato fino a un massimo di due thread per vCPU quando sono soddisfatte entrambe le seguenti condizioni:
- L'utilizzo della memoria sul worker è inferiore al 50%.
- L'utilizzo della CPU sul worker è inferiore al 65%.
Il numero di thread su un worker viene ridotto a un minimo di un thread per vCPU quando viene soddisfatta la seguente condizione:
- L'utilizzo della memoria sul worker è superiore al 70%.
Per visualizzare l'utilizzo della memoria e della CPU per il job, utilizza la scheda Metriche job dell'interfaccia web di Dataflow.
Per garantire la validità dei consigli, Dataflow attende che l'utilizzo delle risorse si stabilizzi prima di inviare i consigli ai lavoratori. Ad esempio, l'utilizzo della memoria e della CPU potrebbe rientrare nell'intervallo per la scalabilità, ma poiché l'utilizzo delle risorse è ancora in crescita, Dataflow non invia un consiglio. Dopo che l'utilizzo delle risorse si è stabilizzato, Dataflow invia un consiglio.
Se si verifica un errore di esaurimento della memoria (OOM), la scalabilità dei thread viene disabilitata automaticamente e la pipeline viene eseguita con un thread per vCPU.
Attivare il scaling dinamico dei thread
Per attivare il scaling dinamico dei thread, utilizza la seguente opzione del servizio Dataflow.
Java
--dataflowServiceOptions=enable_dynamic_thread_scaling
Python
--dataflow_service_options=enable_dynamic_thread_scaling
Vai
--dataflow_service_options=enable_dynamic_thread_scaling
Quando la scalabilità dinamica dei thread è attiva, puoi anche impostare il numero iniziale e massimo di worker disponibili per la pipeline durante l'esecuzione. Per ulteriori informazioni, consulta Opzioni pipeline.
Verificare che la scalabilità dinamica dei thread sia abilitata
Quando il scaling dinamico dei thread è abilitato, nei file di log dei worker viene visualizzato il seguente messaggio:
Enabling thread vertical scaling feature in worker.
Per visualizzare i file di log dei worker, in Esplora log, utilizza il riquadro Query per filtrare i log in base a Nome log. Utilizza il seguente nome del log nel filtro:
projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness
Puoi vedere il numero consigliato di thread nei file di log dei worker. Il seguente messaggio include il numero di thread consigliato:
worker_thread_scaling_report_response { recommended_thread_count: NUMBER }
Se l'utilizzo delle risorse non rientra nell'intervallo per la scalabilità, il valore visualizzato corrisponde al numero di vCPU sul worker.
Puoi anche utilizzare la console Google Cloud per verificare se il ridimensionamento dinamico dei thread è attivo. Quando è attivata, nel riquadro Informazioni job di Dataflow, nella riga dataflowServiceOptions della sezione Opzioni pipeline viene visualizzato enable_dynamic_thread_scaling
.
Risoluzione dei problemi
Questa sezione fornisce istruzioni per la risoluzione dei problemi comuni relativi al scaling dinamico dei thread.
Il rendimento peggiora con il scaling dinamico dei thread abilitato
L'aumento del numero di thread potrebbe causare problemi di prestazioni nei seguenti casi:
- Quando più processi tentano di utilizzare la stessa risorsa, un processo è in grado di utilizzarla mentre gli altri devono attendere. Questa situazione è nota come concorrenza per le risorse. Quando si verifica la contesa delle risorse, le prestazioni della pipeline potrebbero diminuire.
- Quando si verificano errori di memoria insufficiente, il scaling dinamico dei thread viene disattivato. In alcuni casi, gli errori di memoria insufficiente potrebbero causare il mancato completamento della pipeline.
Verifica se il numero di thread è aumentato. Per informazioni su come verificare il numero di thread consigliato, consulta Verificare che la scalabilità dei thread sia abilitata in questa pagina.
Se il ridimensionamento dei thread è attivo, per risolvere il problema, quando esegui la pipeline, non includere l'opzione del servizio di ridimensionamento dinamico dei thread.
Worker unificato … sia abilitato che disabilitato
Dopo aver attivato il scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:
The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.
Questo errore si verifica quando Runner v2 è disabilitato esplicitamente.
Per risolvere il problema, attiva Runner v2. Per ulteriori informazioni, consulta la sezione Attivare Dataflow Runner v2 nella pagina "Utilizzare Dataflow Runner v2".
Esegui l'upgrade dell'SDK
Dopo aver attivato il scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:
Java
Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.
Python
Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.
Questo errore si verifica quando non è possibile attivare Runner 2 perché la versione dell'SDK non lo supporta.
Per risolvere il problema, utilizza una versione dell'SDK che supporti Runner 2.
Impossibile attivare la funzionalità di ridimensionamento verticale dei thread
Dopo aver attivato il scaling dinamico dei thread, il job potrebbe non riuscire con il seguente errore:
The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.
Questo errore si verifica quando la pipeline imposta esplicitamente il numero di thread per worker utilizzando l'opzione della pipeline numberOfWorkerHarnessThreads
o number_of_worker_harness_threads
.
Per risolvere il problema, rimuovi l'opzione della pipeline numberOfWorkerHarnessThreads
o
number_of_worker_harness_threads
dalla pipeline.