Per arrestare un job Dataflow, utilizza la console Google Cloud , Cloud Shell, un terminale locale installato con Google Cloud CLI o l'API REST Dataflow.
Puoi interrompere un job Dataflow in uno dei tre modi seguenti:
Annullare un job. Questo metodo si applica sia alle pipeline di streaming sia alle pipeline batch. L'annullamento di un job impedisce al servizio Dataflow di elaborare i dati, inclusi quelli nel buffer. Per ulteriori informazioni, consulta la sezione Annullare un job.
Svuota un job. Questo metodo si applica solo alle pipeline di streaming. Lo svuotamento di un job consente al servizio Dataflow di completare l'elaborazione dei dati nel buffer e contemporaneamente di interrompere l'importazione di nuovi dati. Per ulteriori informazioni, consulta la sezione Scaricare un job.
Forza l'annullamento di un job. Questo metodo si applica sia alle pipeline di streaming sia alle pipeline batch. L'annullamento forzato di un job interrompe immediatamente l'elaborazione di qualsiasi dato da parte del servizio Dataflow, inclusi i dati nel buffer. Prima di eseguire l'annullamento forzato, devi prima provare un annullamento normale. L'annullamento forzato è destinato esclusivamente ai job bloccati nel processo di annullamento normale. Per ulteriori informazioni, consulta la sezione Annullare forzatamente un job.
Quando annulli un job, non puoi riavviarlo. Se non utilizzi i modelli Flex, puoi clonare la pipeline annullata e avviare un nuovo job dalla pipeline clonata.
Prima di interrompere una pipeline di streaming, ti consigliamo di creare uno snapshot del job. Gli snapshot di Dataflow salvano lo stato di una pipeline di inserimento flussi, quindi puoi avviare una nuova versione del job Dataflow senza perdere lo stato. Per scoprire di più, consulta Utilizzo degli snapshot di Dataflow.
Se hai una pipeline complessa, ti consigliamo di creare un modello e di eseguire il job dal modello.
Non puoi eliminare i job Dataflow, ma puoi archiviarli se sono stati completati. Tutti i job completati, inclusi quelli nell'elenco dei job archiviati, vengono eliminati dopo un periodo di conservazione di 30 giorni.
Annullare un job Dataflow
Quando annulli un job, il servizio Dataflow lo interrompe immediatamente.
Quando annulli un job, vengono eseguite le seguenti azioni:
Il servizio Dataflow interrompe tutte le operazioni di importazione ed elaborazione dei dati.
Il servizio Dataflow inizia a ripulire le risorse Google Cloud collegate al tuo job.
Queste risorse potrebbero includere lo spegnimento delle istanze di worker Compute Engine e la chiusura delle connessioni attive alle origini o alle destinazioni di I/O.
Informazioni importanti sull'annullamento di un job
L'annullamento di un job interrompe immediatamente l'elaborazione della pipeline.
Potresti perdere i dati in transito quando annulli un job. I dati in transito si riferiscono ai dati già letti, ma che sono ancora in fase di elaborazione dalla pipeline.
I dati scritti dalla pipeline in un destinazione di output prima dell'annullamento del job potrebbero essere ancora accessibili nella destinazione di output.
Se la perdita di dati non è un problema, l'annullamento del job garantisce che le risorseGoogle Cloud associate al job vengano chiuse il prima possibile.
Svuotare un job Dataflow
Quando scarichi un job, il servizio Dataflow lo completa nel suo stato corrente. Se vuoi evitare la perdita di dati durante l'interruzione delle pipeline di streaming, l'opzione migliore è svuotare il job.
Quando scarichi un job, vengono eseguite le seguenti azioni:
Il job interrompe l'importazione di nuovi dati dalle origini di input poco dopo aver ricevuto la richiesta di svuotamento (in genere entro pochi minuti).
Il servizio Dataflow conserva le risorse esistenti, ad esempio le istanze di worker, per completare l'elaborazione e la scrittura di eventuali dati nel buffer della pipeline.
Al termine di tutte le operazioni di scrittura e di elaborazione in attesa, il servizio Dataflow arresta le risorse Google Cloud associate al tuo job.
Per svuotare il job, Dataflow smette di leggere il nuovo input, contrassegna la fonte con un timestamp dell'evento infinito e poi propaga i timestamp infiniti nella pipeline. Pertanto, le pipeline in fase di svuotamento potrebbero avere una filigrana infinita.
Informazioni importanti sullo svuotamento di un job
Lo svuotamento di un job non è supportato per le pipeline batch.
La pipeline continua a sostenere il costo della manutenzione di eventuali risorse associateGoogle Cloud fino al termine di tutte le operazioni di elaborazione e scrittura.
Puoi aggiornare una pipeline in fase di svuotamento. Se la pipeline è bloccata, aggiornandola con il codice che corregge l'errore che sta causando il problema, puoi eseguire lo scarico correttamente senza perdita di dati.
Puoi annullare un job che è attualmente in fase di svuotamento.
Lo svuotamento di un job può richiedere molto tempo, ad esempio quando la pipeline contiene una grande quantità di dati in buffer.
Se la pipeline di streaming include un DoFn suddividibile, devi troncare il risultato prima di eseguire l'opzione di scarico. Per ulteriori informazioni sulla troncatura delle funzioni DoFn suddividibili, consulta la documentazione di Apache Beam.
In alcuni casi, un job Dataflow potrebbe non essere in grado di completare l'operazione di svuotamento. Puoi consultare i log del job per determinare la causa principale e intraprendere le azioni appropriate.
Conservazione dei dati
L'inserimento flussi di Dataflow è tollerante al riavvio dei worker e non fa fallire i job di inserimento flussi quando si verificano errori. Il servizio Dataflow esegue invece ripetuti tentativi finché non esegui un'azione come l'annullamento o il riavvio del job. Quando svuoti il job, Dataflow continua a riprovare, il che può causare il blocco delle pipeline. In questa situazione, per eseguire un svuotamento riuscito senza perdita di dati, aggiorna la pipeline con il codice che corregge l'errore che sta causando il problema.
Dataflow non conferma i messaggi finché il servizio Dataflow non li esegue in modo permanente. Ad esempio, con Kafka puoi considerare questa procedura come un trasferimento sicuro della proprietà del messaggio da Kafka a Dataflow, eliminando il rischio di perdita di dati.
Job bloccati
- Lo svuotamento non corregge le pipeline bloccate. Se il trasferimento dei dati è bloccato, la pipeline rimane bloccata dopo il comando di scarico. Per risolvere un problema con una pipeline bloccata, utilizza il comando update per aggiornare la pipeline con il codice che risolve l'errore che sta causando il problema. Puoi anche annullare i job bloccati, ma l'annullamento dei job potrebbe comportare la perdita di dati.
Timer
Se il codice della pipeline di streaming include un timer in loop, il job potrebbe essere lento o impossibile da svuotare. Poiché lo svuotamento non termina finché non sono stati completati tutti i timer, le pipeline con timer di loop infinito non terminano mai lo svuotamento.
Dataflow attende il completamento di tutti i timer di tempo di elaborazione instead of firing them right away, che potrebbe comportare uno svuotamento lento.
Effetti del ricalcolo di un job
Quando svuoti una pipeline di streaming, Dataflow chiude immediatamente tutte le finestre in elaborazione e attiva tutti gli attivatori.
Il sistema non attende il completamento di eventuali finestre basate sul tempo in un'operazione di svuotamento.
Ad esempio, se la pipeline è in esecuzione da dieci minuti in un intervallo di due ore quando svuoti il job, Dataflow non attende il termine del resto dell'intervallo. La finestra si chiude immediatamente con risultati parziali. Dataflow chiude le finestre aperte facendo avanzare la marcatura temporale dei dati all'infinito. Questa funzionalità funziona anche con le origini dati personalizzate.
Quando scarichi una pipeline che utilizza un'origine dati personalizzata, Dataflow interrompe l'invio di richieste di nuovi dati, imposta la marcatura temporale dei dati su infinito e chiama il metodo finalize()
dell'origine nell'ultimo checkpoint.
Lo svuotamento può comportare finestre parzialmente riempite. In questo caso, se riavvii la pipeline svuotata, la stessa finestra potrebbe essere attivata una seconda volta, il che può causare problemi con i dati. Ad esempio, nel seguente scenario, i file potrebbero avere nomi in conflitto e i dati potrebbero essere sovrascritti:
Se scarichi una pipeline con finestre orarie alle 12:34, la finestra dalle 12:00 alle 13:00 si chiude con solo i dati attivati nei primi 34 minuti della finestra. La pipeline non legge nuovi dati dopo le 12:34.
Se riavvii immediatamente la pipeline, la finestra dalle 12:00 alle 13:00 viene nuovamente attivata, con solo i dati letti dalle 12:35 alle 13:00. Non vengono inviati duplicati, ma se un nome file viene ripetuto, i dati vengono sovrascritti.
Nella console Google Cloud , puoi visualizzare i dettagli delle trasformazioni della pipeline. Il seguente diagramma mostra gli effetti di un'operazione di svuotamento in corso. Tieni conto che la filigrana viene avanzata al valore massimo.
Figura 1. Una visualizzazione passo passo di un'operazione di scarico.
Annullare forzatamente un job Dataflow
Utilizza l'annullamento forzato solo quando non riesci ad annullare il job utilizzando altri metodi. L'annullamento forzato termina il job senza ripulire tutte le risorse. Se utilizzi l'annullamento forzato ripetutamente, le risorse perse potrebbero accumularsi e utilizzare la tua quota.
Quando annulli forzatamente un job, il servizio Dataflow lo interrompe immediatamente, causando la perdita di tutte le VM create dal job Dataflow. L'annullamento normale deve essere tentato almeno 30 minuti prima dell'annullamento forzato.
Quando annulli forzatamente un job, si verificano le seguenti azioni:
- Il servizio Dataflow interrompe tutte le operazioni di importazione ed elaborazione dei dati.
Informazioni importanti sull'annullamento forzato di un job
L'annullamento forzato di un job interrompe immediatamente l'elaborazione della pipeline.
L'annullamento forzato di un job è destinato solo ai job bloccati nel processo di annullamento normale.
Le istanze worker create dal job Dataflow non vengono necessariamente rilasciate, il che potrebbe comportare la fuga di istanze worker. Le istanze di worker con perdite non contribuiscono ai costi del job, ma potrebbero utilizzare la tua quota. Al termine dell'annullamento del job, puoi eliminare queste risorse.
Per i job Dataflow Prime, non puoi vedere o eliminare le VM con accesso non autorizzato. Nella maggior parte dei casi, queste VM non creano problemi. Tuttavia, se le VM con accesso non autorizzato causano problemi, ad esempio il consumo della quota di VM, contatta l'assistenza.
Interrompere un job Dataflow
Prima di interrompere un job, devi comprendere gli effetti dell'annullamento, del svuotamento o dell'annullamento forzato di un job.
Console
Vai alla pagina Job Dataflow:
Fai clic sul job che vuoi interrompere.
Per interrompere un job, lo stato del job deve essere in esecuzione.
Nella pagina dei dettagli del job, fai clic su Interrompi.
Esegui una di queste operazioni:
Per una pipeline batch, fai clic su Annulla o Forza annullamento.
Per una pipeline di streaming, fai clic su Annulla, Svuotamento o Forza annullamento.
Per confermare la tua scelta, fai clic su Interrompi job.
gcloud
Per svuotare o annullare un job Dataflow, puoi utilizzare il comando gcloud dataflow jobs
in Cloud Shell o in un terminale locale installato con gcloud CLI.
Accedi alla shell.
Elenca gli ID job dei job Dataflow attualmente in esecuzione, quindi prendi nota dell'ID job che vuoi interrompere:
gcloud dataflow jobs list
Se il flag
--region
non è impostato, vengono visualizzati i job Dataflow di tutte le regioni disponibili.Esegui una di queste operazioni:
Per svuotare un job di streaming:
gcloud dataflow jobs drain JOB_ID
Sostituisci
JOB_ID
con l'ID job che hai copiato in precedenza.Per annullare un job batch o di streaming:
gcloud dataflow jobs cancel JOB_ID
Sostituisci
JOB_ID
con l'ID job che hai copiato in precedenza.Per annullare forzatamente un job batch o di streaming:
gcloud dataflow jobs cancel JOB_ID --force
Sostituisci
JOB_ID
con l'ID job che hai copiato in precedenza.
API
Per annullare o svuotare un job utilizzando l'API REST Dataflow, puoi scegliere projects.locations.jobs.update
o projects.jobs.update
.
Nel corpo della richiesta,
trasmetti lo stato del job obbligatorio
nel campo requestedState
dell'istanza del job dell'API scelta.
Importante: ti consigliamo di utilizzare projects.locations.jobs.update
, poiché projects.jobs.update
consente solo di aggiornare lo stato dei job in esecuzione in us-central1
.
Per annullare il job, imposta lo stato del job su
JOB_STATE_CANCELLED
.Per svuotare il job, imposta lo stato su
JOB_STATE_DRAINED
.Per annullare forzatamente il job, imposta lo stato su
JOB_STATE_CANCELLED
con l'etichetta"force_cancel_job": "true"
. Il corpo della richiesta è:{ "requestedState": "JOB_STATE_CANCELLED", "labels": { "force_cancel_job": "true" } }
Rileva il completamento del job Dataflow
Per rilevare il completamento dell'annullamento o dello svuotamento del job, utilizza uno dei seguenti metodi:
- Utilizza un servizio di orchestrazione del flusso di lavoro come Cloud Composer per monitorare il job Dataflow.
- Esegui la pipeline in modo sincrono in modo che le attività vengano bloccate fino al completamento della pipeline. Per ulteriori informazioni, consulta Controllo delle modalità di esecuzione in Impostazione delle opzioni della pipeline.
Utilizza lo strumento a riga di comando in Google Cloud CLI per eseguire il polling dello stato del job. Per visualizzare un elenco di tutti i job Dataflow nel progetto, esegui il seguente comando nella shell o nel terminale:
gcloud dataflow jobs list
L'output mostra l'ID, il nome, lo stato (
STATE
) e altre informazioni per ogni job. Per ulteriori informazioni, consulta Utilizzare l'interfaccia a riga di comando di Dataflow.
Archivia i job Dataflow
Quando archivi un job Dataflow, questo viene rimosso dall'elenco dei job nella pagina Job di Dataflow nella console. Il job viene spostato in un elenco di job archiviati. Puoi archiviare solo i job completati, inclusi quelli nei seguenti stati:
JOB_STATE_CANCELLED
JOB_STATE_DRAINED
JOB_STATE_DONE
JOB_STATE_FAILED
JOB_STATE_UPDATED
Per ulteriori informazioni, consulta Rileva il completamento dei job Dataflow in questo documento. Per informazioni sulla risoluzione dei problemi, consulta Archiviare gli errori dei job in "Risolvere gli errori di Dataflow".
Tutti i job archiviati vengono eliminati dopo un periodo di conservazione di 30 giorni.
Archiviare un job
Per rimuovere un job completato dall'elenco principale dei job nella pagina Job di Dataflow, segui questi passaggi.
Console
Nella console Google Cloud , vai alla pagina Job di Dataflow.
Viene visualizzato un elenco di job Dataflow con il relativo stato.
Seleziona un job.
Nella pagina Dettagli job, fai clic su Archivia. Se il job non è stato completato, l'opzione Archivia non è disponibile.
API
Per archiviare i job utilizzando l'API, utilizza il
campo JobMetadata
. Nel campo JobMetadata
, per userDisplayProperties
, utilizza la coppia chiave-valore "archived":"true"
.
La richiesta API deve includere anche il parametro di query updateMask.
curl --request PUT \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
--data
'{"job_metadata":{"userDisplayProperties":{"archived":"true"}}}' \
--compressed
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progettoREGION
: una regione DataflowJOB_ID
: l'ID del job Dataflow
Visualizzare e ripristinare i job archiviati
Segui questi passaggi per visualizzare i job archiviati o per ripristinarli nell'elenco principale dei job nella pagina Job di Dataflow.
Console
Nella console Google Cloud , vai alla pagina Job di Dataflow.
Fai clic sul pulsante di attivazione/disattivazione Archiviato. Viene visualizzato un elenco di job Dataflow archiviati.
Seleziona un job.
Per ripristinare il job nell'elenco principale dei job nella pagina Job di Dataflow, fai clic su Ripristina nella pagina Dettagli job.
API
Per ripristinare i job utilizzando l'API, utilizza il
campo JobMetadata
. Nel campo JobMetadata
, per userDisplayProperties
, utilizza la coppia chiave-valore "archived":"false"
.
La richiesta API deve includere anche il parametro di query updateMask.
curl --request PUT \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
--data
'{"job_metadata":{"userDisplayProperties":{"archived":"false"}}}' \
--compressed
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progettoREGION
: una regione DataflowJOB_ID
: l'ID del job Dataflow
Passaggi successivi
- Esplora la riga di comando Dataflow.
- Esplora la API REST Dataflow.
- Esplora la interfaccia di monitoraggio di Dataflow nella console Google Cloud .
- Scopri di più sull'aggiornamento di una pipeline.