Per arrestare un job Dataflow, utilizza Console Google Cloud, Cloud Shell, un terminale locale installato con Google Cloud CLI, oppure API REST Dataflow.
Puoi arrestare un job Dataflow in uno dei tre modi seguenti:
Annulla un lavoro. Questo metodo si applica sia a pipeline in modalità flusso che a pipeline di dati. L'annullamento di un job impedisce al servizio Dataflow di elaborare i dati, inclusi quelli nel buffer. Per ulteriori informazioni, consulta la sezione Annullare un job.
Svuota un job. Questo metodo si applica solo alle pipeline in modalità flusso. Il svuotamento di un job consente al servizio Dataflow di terminare di elaborare i dati presenti nel buffer e contemporaneamente interrompere l'importazione dei nuovi dati. Per ulteriori informazioni, vedi Svuotare un job.
Forza l'annullamento di un job. Questo metodo si applica sia alle pipeline di streaming sia alle pipeline batch. Forzare l'annullamento di un job interrompe immediatamente al servizio Dataflow dall'elaborazione di dati, compresi quelli nel buffer e i dati di Google Cloud. Prima di procedere con l'annullamento forzato, devi prima tentare un annullamento regolare. L'annullamento forzato è destinato solo ai job bloccati nel processo di annullamento normale. Per ulteriori informazioni, consulta la sezione Annullare forzatamente un job.
Quando annulli un job, non puoi riavviarlo. Se non utilizzi Flex Modelli, puoi clonare la pipeline annullata e avviare un nuovo job dalla una pipeline clonata.
Prima di interrompere una pipeline di streaming, ti consigliamo di creare uno snapshot del job. Gli snapshot di Dataflow salvano lo stato di una pipeline di inserimento flussi, quindi puoi avviare una nuova versione del job Dataflow senza perdere lo stato. Per saperne di più, vedi Utilizzo degli snapshot Dataflow.
Se hai una pipeline complessa, ti consigliamo di creare un modello e di eseguire il job dal modello.
Non puoi eliminare i job Dataflow, ma puoi archiviarli se sono stati completati. Tutti i job completati, inclusi quelli nell'elenco dei job archiviati, vengono eliminati dopo un periodo di conservazione di 30 giorni.
Annullare un job Dataflow
Quando annulli un job, il servizio Dataflow lo interrompe. immediatamente.
Quando annulli un job, si verificano le seguenti azioni:
Il servizio Dataflow interrompe tutte le importazione dati e i dati e l'elaborazione dei dati.
Il servizio Dataflow inizia a ripulire le risorse Google Cloud collegate al tuo job.
Queste risorse potrebbero includere l'arresto delle istanze worker di Compute Engine e la chiusura delle connessioni attive a origini o sink di I/O.
Informazioni importanti sull'annullamento di un job
L'annullamento di un job interrompe immediatamente l'elaborazione della pipeline.
Se annulli un job, potresti perdere i dati in corso. I dati in transito si riferiscono ai dati già letti, ma ancora in fase di elaborazione da parte della pipeline.
I dati scritti dalla pipeline in un'area di destinazione di output prima dell'annullamento del job potrebbero essere ancora accessibili nell'area di destinazione di output.
Se la perdita di dati non è un problema, l'annullamento del job garantisce che Le risorse Google Cloud associate al tuo job vengono arrestate il prima possibile.
Svuotare un job Dataflow
Quando scarichi un job, il servizio Dataflow lo completa nel suo stato corrente. Se vuoi evitare la perdita di dati mentre riduci pipeline di flusso, l'opzione migliore è lo svuotamento del job.
Quando svuota un job, si verificano le seguenti azioni:
Il job interrompe l'importazione di nuovi dati dalle origini di input poco dopo aver ricevuto la richiesta di svuotamento (in genere entro pochi minuti).
Il servizio Dataflow conserva le risorse esistenti, ad esempio le istanze di worker, per completare l'elaborazione e la scrittura di eventuali dati nel buffer della pipeline.
Una volta completate tutte le operazioni di scrittura e di elaborazione in attesa, il servizio Dataflow arresta le risorse Google Cloud associate al job.
Per svuotare il job, Dataflow smette di leggere il nuovo input, contrassegna la con un timestamp evento all'infinito, e poi propaga i timestamp infinito attraverso la pipeline. Pertanto, delle pipeline in fase di svuotamento potrebbe avere una filigrana infinita.
Informazioni importanti sullo svuotamento di un job
Lo svuotamento di un job non è supportato per le pipeline batch.
La pipeline continuerà a sostenere il costo per il mantenimento di eventuali risorse Google Cloud fino al termine dell'elaborazione e della scrittura.
Puoi aggiornare una pipeline che è in fase di svuotamento. Se la pipeline è bloccata, aggiornala con codice che risolve l'errore che sta causando il problema consente uno svuotamento riuscito senza perdita di dati.
Puoi annullare un job attualmente in fase di svuotamento.
Lo svuotamento di un job può richiedere molto tempo, ad esempio quando la pipeline contiene una grande quantità di dati in buffer.
Se la pipeline in modalità flusso include DoFn divisibile devi troncare il risultato prima di eseguire l'opzione di svuotamento. Per ulteriori informazioni sul troncamento dei DoFn suddivisibili, consulta Apache Beam documentazione.
In alcuni casi, un job Dataflow potrebbe non essere in grado di completare di svuotamento. Puoi consultare i log del job per determinare la causa principale e intraprendere le azioni appropriate.
Conservazione dei dati
I flussi di dati Dataflow sono tolleranti al riavvio e al riavvio dei worker non ha esito negativo per i job di flussi di dati quando si verificano errori. Invece, Il servizio Dataflow riprova finché non esegui un'azione come come l'annullamento o il riavvio del job. Quando svuota il job, Dataflow continua a riprovare, il che può portare a pipeline bloccate. In questa situazione, per abilitare uno svuotamento riuscito senza perdita di dati, aggiornare la pipeline con il codice che corregge l'errore che sta causando il problema.
Dataflow non conferma i messaggi fino a quando Il servizio Dataflow esegue il commit durevole di queste attività. Ad esempio, con Kafka puoi considerare questa procedura come un trasferimento sicuro della proprietà del messaggio da Kafka a Dataflow, eliminando il rischio di perdita di dati.
Job bloccati
- Lo svuotamento non corregge le pipeline bloccate. Se il trasferimento dei dati è bloccato, la pipeline rimane bloccata dopo il comando di scarico. Per risolvere un problema con una pipeline bloccata, utilizza il comando update per aggiornare la pipeline con il codice che risolve l'errore che sta causando il problema. Puoi anche annullare i job bloccati, ma l'annullamento dei job potrebbe comportare la perdita di dati.
Timer
Se il codice della pipeline in modalità flusso include un timer di loop, il job potrebbe essere lento o non può essere svuotato. Poiché lo svuotamento non termina finché non sono stati completati tutti i timer, le pipeline con timer con loop infinito non terminano mai lo svuotamento.
Dataflow attende il completamento di tutti i timer del tempo di elaborazione invece di attivarli immediatamente, il che potrebbe causare un drenaggio lento.
Effetti dello svuotamento di un job
Quando svuota una pipeline in modalità flusso, Dataflow si chiude immediatamente qualsiasi fase di elaborazione finestre e attiva tutti attivatori.
Il sistema non attende il completamento di eventuali finestre basate sul tempo in un'operazione di scarico.
Ad esempio, se la pipeline è in esecuzione da dieci minuti in un intervallo di due ore quando svuoti il job, Dataflow non attende il termine del resto dell'intervallo. La finestra si chiude immediatamente con risultati parziali. Dataflow provoca la chiusura delle finestre aperte facendo avanzare i dati dalla filigrana all'infinito. Questa funzionalità funziona anche con le origini dati personalizzate.
Quando scarichi una pipeline che utilizza un'origine dati personalizzata, Dataflow interrompe l'invio di richieste di nuovi dati, imposta la marcatura temporale dei dati su infinito e chiama il metodo finalize()
dell'origine nell'ultimo checkpoint.
Lo svuotamento può comportare finestre parzialmente riempite. In questo caso, se riavvii la pipeline svuotata, la stessa finestra potrebbe essere attivata una seconda volta, il che può causare problemi con i dati. Ad esempio, nel uno dei seguenti scenari, i file potrebbero avere nomi in conflitto e i dati potrebbero essere sovrascritti:
Se scarichi una pipeline con windowing orario alle 12:34, dalle 12:00 alle 13:00 si chiude con solo i dati che sono stati attivati entro i primi 34 minuti dal finestra. La pipeline non legge i nuovi dati dopo le 12:34.
Se poi riavvii immediatamente la pipeline, la finestra dalle 12:00 alle 13:00 vengono attivati di nuovo, con solo i dati letti dalle 12:35 alle 13:00. Non vengono inviati duplicati, ma se un nome file viene ripetuto, i dati vengono sovrascritti.
Nella console Google Cloud puoi visualizzare i dettagli delle trasformazioni della pipeline. Il seguente diagramma mostra gli effetti di un'operazione di svuotamento in corso. Tieni conto che la filigrana viene avanzata al valore massimo.
Figura 1. Vista passo di un'operazione di svuotamento.
Forza l'annullamento di un job Dataflow
Utilizza l'annullamento forzato solo quando non riesci ad annullare il job utilizzando altri metodi. L'annullamento forzato termina il job senza ripulire tutte le risorse. Se utilizzi Forza l'annullamento ripetutamente, le risorse divulgate potrebbero accumularsi, e le risorse divulgate usano la tua quota.
Quando forzi l'annullamento di un job, il servizio Dataflow arresta il job. con la perdita immediata di eventuali VM create dal job Dataflow. L'annullamento normale deve essere tentato almeno 30 minuti prima dell'annullamento forzato.
Quando annulli forzatamente un job, si verificano le seguenti azioni:
- Il servizio Dataflow interrompe tutte le importazione dati e i dati e l'elaborazione dei dati.
Informazioni importanti sull'annullamento forzato di un job
L'annullamento forzato di un job interrompe immediatamente l'elaborazione della pipeline.
Forza l'annullamento di un job è destinato solo ai job bloccati in il normale processo di annullamento.
Le istanze worker create dal job Dataflow non vengono vengano necessariamente rilasciate, il che potrebbe causare la perdita di istanze worker. Le istanze di worker con perdite non contribuiscono ai costi del job, ma potrebbero utilizzare la tua quota. Al termine dell'annullamento del job, puoi eliminare queste risorse.
Per i job Dataflow Prime, non puoi vedere o eliminare le VM con accesso non autorizzato. Nella maggior parte dei casi, queste VM non creano problemi. Tuttavia, se le VM divulgate causano problemi, ad esempio il consumo della quota di VM, contatta l'assistenza.
Interrompere un job Dataflow
Prima di interrompere un job, devi comprendere gli effetti dell'annullamento, svuotare o forzare l'annullamento di un job.
Console
Vai alla pagina Job Dataflow:
Fai clic sul job che vuoi arrestare.
Per interrompere un job, lo stato del job deve essere in esecuzione.
Nella pagina dei dettagli del job, fai clic su Arresta.
Esegui una di queste operazioni:
Per una pipeline batch, fai clic su Annulla o Forza annullamento.
Per una pipeline di streaming, fai clic su Annulla, Svuotamento o Forza annullamento.
Per confermare la scelta, fai clic su Arresta job.
gcloud
Per svuotare o annullare un job Dataflow, puoi utilizzare il comando gcloud dataflow jobs
in Cloud Shell o in un terminale locale installato con gcloud CLI.
Accedi alla tua shell.
Elenca gli ID job per i job Dataflow attualmente in esecuzione e prendi nota dell'ID per il job che vuoi arrestare:
gcloud dataflow jobs list
Se il flag
--region
non è impostato, vengono visualizzati i job Dataflow di tutte le regioni disponibili.Esegui una di queste operazioni:
Per svuotare un job di streaming:
gcloud dataflow jobs drain JOB_ID
Sostituisci
JOB_ID
con l'ID job che hai copiato in precedenza.Per annullare un job batch o flusso:
gcloud dataflow jobs cancel JOB_ID
Sostituisci
JOB_ID
con l'ID job che hai copiato in precedenza.Per annullare forzatamente un job batch o di streaming:
gcloud dataflow jobs cancel JOB_ID --force
Sostituisci
JOB_ID
con l'ID job che hai copiato in precedenza.
API
Per annullare o svuotare un job utilizzando l'API REST Dataflow, puoi scegliere projects.locations.jobs.update
o projects.jobs.update
.
Nel corpo della richiesta,
Supera lo stato del job richiesto
nel campo requestedState
dell'istanza job dell'API scelta.
Importante: utilizzi projects.locations.jobs.update
perché projects.jobs.update
consente di aggiornare solo lo stato dei job in esecuzione in us-central1
.
Per annullare il job, imposta il relativo stato su
JOB_STATE_CANCELLED
.Per svuotare il job, imposta lo stato del job su
JOB_STATE_DRAINED
.Per forzare l'annullamento del job, imposta lo stato del job su
JOB_STATE_CANCELLED
con l'etichetta"force_cancel_job": "true"
. Il corpo della richiesta è:{ "requestedState": "JOB_STATE_CANCELLED", "labels": { "force_cancel_job": "true" } }
Rileva il completamento del job Dataflow
Per rilevare il completamento dell'annullamento o dello svuotamento del job, utilizza uno dei seguenti metodi:
- Utilizza un servizio di orchestrazione dei flussi di lavoro come Cloud Composer per monitorare il job Dataflow.
- Esegui la pipeline in modo sincrono in modo che le attività vengano bloccate fino al completamento della pipeline. Per ulteriori informazioni, consulta la sezione Controllo delle modalità di esecuzione in Impostazione delle opzioni della pipeline in corso...
Utilizza lo strumento a riga di comando in Google Cloud CLI per eseguire il polling dello stato del job. Per ottenere un elenco di tutti i job Dataflow nel tuo progetto, esegui questo comando nella shell o nel terminale:
gcloud dataflow jobs list
L'output mostra l'ID, il nome, lo stato (
STATE
) e altre informazioni di ciascun job. Per ulteriori informazioni, vedi Utilizzo dell'interfaccia a riga di comando di Dataflow.
Archivia job Dataflow
Quando archivi un job Dataflow, questo viene rimosso dall'elenco dei job nella pagina Job di Dataflow nella console. Il job viene spostato in un elenco di job archiviati. Puoi archiviare solo i job completati, inclusi quelli nei seguenti stati:
JOB_STATE_CANCELLED
JOB_STATE_DRAINED
JOB_STATE_DONE
JOB_STATE_FAILED
JOB_STATE_UPDATED
Per ulteriori informazioni, consulta Rileva il completamento dei job Dataflow in questo documento. Per informazioni sulla risoluzione dei problemi, consulta Archiviare gli errori dei job in "Risolvere gli errori di Dataflow".
Tutti i job archiviati vengono eliminati dopo un periodo di conservazione di 30 giorni.
Archiviare un job
Segui questi passaggi per rimuovere un lavoro completato dall'elenco dei job principale nella Pagina Job di Dataflow.
Console
Nella console Google Cloud, vai alla pagina Job di Dataflow.
Viene visualizzato un elenco di job Dataflow insieme al relativo stato.
Seleziona un job.
Nella pagina Dettagli job, fai clic su Archivia. Se il job non è stato l'opzione Archivia non è disponibile.
API
Per archiviare i job utilizzando l'API, utilizza il
campo JobMetadata
. Nel campo JobMetadata
, per userDisplayProperties
, utilizza la coppia chiave-valore "archived":"true"
.
La richiesta API deve includere anche updateMask parametro di query.
curl --request PUT \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
--data
'{"job_metadata":{"userDisplayProperties":{"archived":"true"}}}' \
--compressed
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progettoREGION
: una regione DataflowJOB_ID
: l'ID del tuo job Dataflow.
Visualizza e ripristina i job archiviati
Segui questi passaggi per visualizzare i job archiviati o ripristinarli nell'istanza principale dei job nella pagina Job di Dataflow.
Console
Nella console Google Cloud, vai alla pagina Job di Dataflow.
Fai clic sul pulsante di attivazione/disattivazione Archiviati. Viene visualizzato un elenco di job Dataflow archiviati.
Seleziona un job.
Per ripristinare il job nell'elenco principale dei job nella pagina Job di Dataflow, fai clic su Ripristina nella pagina Dettagli job.
API
Per ripristinare i job utilizzando l'API, utilizza il
campo JobMetadata
. Nel campo JobMetadata
, per userDisplayProperties
, utilizza la coppia chiave-valore "archived":"false"
.
La richiesta API deve includere anche updateMask parametro di query.
curl --request PUT \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
--data
'{"job_metadata":{"userDisplayProperties":{"archived":"false"}}}' \
--compressed
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progettoREGION
: una regione DataflowJOB_ID
: l'ID del tuo job Dataflow.
Passaggi successivi
- Esplora il Riga di comando di Dataflow.
- Esplora il API REST Dataflow.
- Esplora il Interfaccia di monitoraggio di Dataflow nella console Google Cloud.
- Scopri di più su l'aggiornamento di una pipeline.