Arresta una pipeline Dataflow in esecuzione

Per arrestare un job Dataflow, utilizza Console Google Cloud, Cloud Shell, un terminale locale installato con Google Cloud CLI, oppure API REST Dataflow.

Puoi arrestare un job Dataflow in uno dei tre modi seguenti:

  • Annulla un lavoro. Questo metodo si applica sia a pipeline in modalità flusso che a pipeline di dati. L'annullamento di un job interrompe il servizio Dataflow mentre elabora qualsiasi dato, compresi quelli presenti nel buffer. Per ulteriori informazioni, vedi Annulla un job.

  • Svuota un job. Questo metodo si applica solo alle pipeline in modalità flusso. Il svuotamento di un job consente al servizio Dataflow di terminare di elaborare i dati presenti nel buffer e contemporaneamente interrompere l'importazione dei nuovi dati. Per ulteriori informazioni, vedi Svuotare un job.

  • Forza l'annullamento di un job. Questo metodo si applica sia a pipeline in modalità flusso che a pipeline di dati. Forzare l'annullamento di un job interrompe immediatamente al servizio Dataflow dall'elaborazione di dati, compresi quelli nel buffer e i dati di Google Cloud. Prima di forzare l'annullamento, devi prima tentare un annullamento regolare. L'opzione Forza annullamento è destinata solo ai job bloccati nella con la normale procedura di annullamento. Per ulteriori informazioni, vedi Forza l'annullamento di un job.

Quando annulli un job, non puoi riavviarlo. Se non utilizzi Flex Modelli, puoi clonare la pipeline annullata e avviare un nuovo job dalla una pipeline clonata.

Prima di arrestare una pipeline in modalità flusso, valuta la possibilità di creare uno snapshot del un lavoro. Gli snapshot Dataflow salvano lo stato di una pipeline in modalità flusso, puoi avviare una nuova versione del tuo job Dataflow senza perdere stato. Per saperne di più, vedi Utilizzo degli snapshot Dataflow.

Se hai una pipeline complessa, valuta la possibilità di creare modello ed eseguire il job dal modello.

Non puoi eliminare i job Dataflow, ma puoi archiviarli di job completati. Tutti i job completati vengono eliminati dopo un periodo di conservazione di 30 giorni.

Annullare un job Dataflow

Quando annulli un job, il servizio Dataflow lo interrompe. immediatamente.

Quando annulli un job, si verificano le seguenti azioni:

  1. Il servizio Dataflow interrompe tutte le importazione dati e i dati e l'elaborazione dei dati.

  2. Il servizio Dataflow inizia la pulizia di Google Cloud collegate al job.

    Queste risorse potrebbero includere l'arresto delle istanze worker di Compute Engine e la chiusura delle connessioni attive a origini o sink di I/O.

Informazioni importanti sull'annullamento di un lavoro

  • L'annullamento di un job interrompe immediatamente l'elaborazione della pipeline.

  • Se annulli un job, potresti perdere i dati in corso. I dati in transito si riferiscono ai dati già letti, ma ancora in fase di elaborazione da parte della pipeline.

  • Dati scritti dalla pipeline in un sink di output prima dell'annullamento potrebbe essere ancora accessibile nel sink di output.

  • Se la perdita di dati non è un problema, l'annullamento del job garantisce che Le risorse Google Cloud associate al tuo job vengono arrestate il prima possibile.

Svuotamento di un job Dataflow

Quando viene svuotato un job, il servizio Dataflow lo completa in con quello attuale. Se vuoi evitare la perdita di dati mentre riduci pipeline di flusso, l'opzione migliore è lo svuotamento del job.

Quando svuota un job, si verificano le seguenti azioni:

  1. Il job smette di importare nuovi dati dalle origini di input poco dopo aver ricevuto la richiesta di svuotamento (in genere entro pochi minuti).

  2. Il servizio Dataflow conserva tutte le risorse esistenti, come di Compute Engine, per terminare l'elaborazione e la scrittura dei dati presenti nel buffer una pipeline o un blocco note personalizzato.

  3. Quando tutte le operazioni in attesa di elaborazione e scrittura sono complete, Il servizio Dataflow arresta le risorse Google Cloud associati al job.

Per svuotare il job, Dataflow smette di leggere il nuovo input, contrassegna la con un timestamp evento all'infinito, e poi propaga i timestamp infinito attraverso la pipeline. Pertanto, delle pipeline in fase di svuotamento potrebbe avere una filigrana infinita.

Informazioni importanti sullo svuotamento di un job

  • Lo svuotamento di un job non è supportato per le pipeline batch.

  • La pipeline continuerà a sostenere il costo per il mantenimento di eventuali risorse Google Cloud fino al termine dell'elaborazione e della scrittura.

  • Puoi aggiornare una pipeline che è in fase di svuotamento. Se la pipeline è bloccata, aggiornala con codice che risolve l'errore che sta causando il problema consente uno svuotamento riuscito senza perdita di dati.

  • Puoi annullare un job attualmente in fase di svuotamento.

  • Lo svuotamento di un job può richiedere una notevole quantità di tempo, ad esempio quando la pipeline ha una grande quantità di dati nel buffer.

  • Se la pipeline in modalità flusso include DoFn divisibile devi troncare il risultato prima di eseguire l'opzione di svuotamento. Per ulteriori informazioni sul troncamento dei DoFn suddivisibili, consulta Apache Beam documentazione.

  • In alcuni casi, un job Dataflow potrebbe non essere in grado di completare di svuotamento. Puoi consultare i log del job per determinare la causa principale e intraprendere le azioni appropriate.

Conservazione dei dati

  • I flussi di dati Dataflow sono tolleranti al riavvio e al riavvio dei worker non ha esito negativo per i job di flussi di dati quando si verificano errori. Invece, Il servizio Dataflow riprova finché non esegui un'azione come come l'annullamento o il riavvio del job. Quando svuota il job, Dataflow continua a riprovare, il che può portare a pipeline bloccate. In questa situazione, per abilitare uno svuotamento riuscito senza perdita di dati, aggiornare la pipeline con il codice che corregge l'errore che sta causando il problema.

  • Dataflow non conferma i messaggi fino a quando Il servizio Dataflow esegue il commit durevole di queste attività. Ad esempio, con Kafka, puoi visualizzare questo processo come un passaggio sicuro della proprietà da Kafka a Dataflow, eliminando il rischio di perdita di dati.

Job bloccati

  • Lo svuotamento non risolve le pipeline bloccate. Se lo spostamento dei dati è bloccato, la pipeline rimane bloccato dopo il comando di svuotamento. Per risolvere una pipeline bloccata, utilizza update per aggiornare con il codice che risolve l'errore che sta creando il problema. Puoi anche annullare i job bloccati, ma l'annullamento dei job potrebbe causa la perdita di dati.

Timer

  • Se il codice della pipeline in modalità flusso include un timer di loop, il job potrebbe essere lento o non può essere svuotato. Poiché lo svuotamento non termina prima del completamento di tutti i timer, le pipeline con timer a loop infiniti non terminano mai lo svuotamento.

  • Dataflow attende il completamento di tutti i timer del tempo di elaborazione invece di attivarli immediatamente, il che potrebbe causare un drenaggio lento.

Effetti dello svuotamento di un job

Quando svuota una pipeline in modalità flusso, Dataflow si chiude immediatamente qualsiasi fase di elaborazione finestre e attiva tutti attivatori.

Il sistema non attende il completamento di eventuali finestre temporali in sospeso. di un'operazione di svuotamento.

Ad esempio, se la pipeline è composta da dieci minuti in una finestra di due ore quando svuota il job, Dataflow non attende il resto della finestra per completare l'operazione. La finestra viene chiusa immediatamente con risultati parziali. Dataflow provoca la chiusura delle finestre aperte facendo avanzare i dati dalla filigrana all'infinito. Questa funzionalità è compatibile anche con origini dati personalizzate.

Durante lo svuotamento di una pipeline che utilizza una classe di origine dati personalizzata, Dataflow interrompe l'invio di richieste di nuovi dati e fa avanzare i dati da filigrana a infinito e chiama il metodo finalize() della tua origine all'ultimo punto di controllo.

Lo svuotamento può causare il riempimento parziale delle finestre. In questo caso, riavviare la pipeline svuotata, la stessa finestra potrebbe attivarsi una seconda volta il che può causare problemi ai dati. Ad esempio, nel uno dei seguenti scenari, i file potrebbero avere nomi in conflitto e i dati potrebbero essere sovrascritti:

Se scarichi una pipeline con windowing orario alle 12:34, dalle 12:00 alle 13:00 si chiude con solo i dati che sono stati attivati entro i primi 34 minuti dal finestra. La pipeline non legge i nuovi dati dopo le 12:34.

Se poi riavvii immediatamente la pipeline, la finestra dalle 12:00 alle 13:00 vengono attivati di nuovo, con solo i dati letti dalle 12:35 alle 13:00. Non vengono inviati duplicati, ma se un nome file viene ripetuto, i dati vengono sovrascritti.

Nella console Google Cloud puoi visualizzare i dettagli delle trasformazioni della tua pipeline. Il seguente diagramma mostra gli effetti di un'operazione di svuotamento in corso. Nota che la filigrana sia stata avanzata al valore massimo.

Vista passo di un'operazione di svuotamento.

Figura 1. Vista passo di un'operazione di svuotamento.

Forza l'annullamento di un job Dataflow

Utilizza la forza dell'annullamento solo quando non sei in grado di annullare il job utilizzando altri metodi. Forza l'annullamento termina il job senza eliminare tutte le risorse. Se utilizzi Forza l'annullamento ripetutamente, le risorse divulgate potrebbero accumularsi, e le risorse divulgate usano la tua quota.

Quando forzi l'annullamento di un job, il servizio Dataflow arresta il job. con la perdita immediata di eventuali VM create dal job Dataflow. Regolare l'annullamento deve essere tentato almeno 30 minuti prima di forzare l'annullamento.

Quando forzi l'annullamento di un job si verificano le seguenti azioni:

  • Il servizio Dataflow interrompe tutte le importazione dati e i dati e l'elaborazione dei dati.

Informazioni importanti sull'annullamento dell'annullamento di un job

  • Forzare l'annullamento di un job interrompe immediatamente l'elaborazione della pipeline.

  • Forza l'annullamento di un job è destinato solo ai job bloccati in il normale processo di annullamento.

  • Le istanze worker create dal job Dataflow non vengono vengano necessariamente rilasciate, il che potrebbe causare la divulgazione di istanze worker. Le istanze worker divulgate non contribuiscono ai costi del job, ma potrebbero la tua quota. Al termine dell'annullamento del job, puoi eliminare queste risorse.

    Per i job Dataflow Prime, non puoi visualizzare o eliminare le VM divulgate. Nella maggior parte dei casi, queste VM non creano problemi. Tuttavia, se le VM divulgate causano problemi, ad esempio il consumo della quota di VM, contatta l'assistenza.

Arresta un job Dataflow

Prima di interrompere un job, devi comprendere gli effetti dell'annullamento, svuotare o forzare l'annullamento di un job.

Console

  1. Vai alla pagina Job Dataflow:

    Vai a Job

  2. Fai clic sul job che vuoi arrestare.

    Per arrestare un job, il suo stato deve essere In esecuzione.

  3. Nella pagina dei dettagli del job, fai clic su Arresta.

  4. Esegui una di queste operazioni:

    • Per una pipeline batch, fai clic su Annulla o Forza annullamento.

    • Per una pipeline in modalità flusso, fai clic su Annulla, Svuota o Forza annullamento.

  5. Per confermare la scelta, fai clic su Arresta job.

gcloud

Per svuotare o annullare un job Dataflow, puoi usare il comando gcloud dataflow jobs in Cloud Shell o in un terminale locale installato con gcloud CLI.

  1. Accedi alla tua shell.

  2. Elenca gli ID job per i job Dataflow attualmente in esecuzione e prendi nota dell'ID per il job che vuoi arrestare:

    gcloud dataflow jobs list
    

    Se il flag --region non è impostato, vengono eseguiti i job Dataflow da tutti i servizi disponibili. regioni sono visualizzate.

  3. Esegui una di queste operazioni:

    • Per svuotare un job di flussi:

      gcloud dataflow jobs drain JOB_ID
      

      Sostituisci JOB_ID con l'ID job che hai copiato in precedenza.

    • Per annullare un job batch o flusso:

      gcloud dataflow jobs cancel JOB_ID
      

      Sostituisci JOB_ID con l'ID job che hai copiato in precedenza.

    • Per forzare l'annullamento di un job batch o di flussi di dati:

      gcloud dataflow jobs cancel JOB_ID --force
      

      Sostituisci JOB_ID con l'ID job che hai copiato in precedenza.

API

Per annullare o svuotare un job utilizzando API REST Dataflow, puoi scegliere projects.locations.jobs.update o projects.jobs.update. Nel corpo della richiesta, Supera lo stato del job richiesto nel campo requestedState dell'istanza job dell'API scelta.

Importante: utilizzi projects.locations.jobs.update perché projects.jobs.update consente di aggiornare solo lo stato dei job in esecuzione in us-central1.

  • Per annullare il job, imposta il relativo stato su JOB_STATE_CANCELLED.

  • Per svuotare il job, imposta lo stato del job su JOB_STATE_DRAINED.

  • Per forzare l'annullamento del job, imposta lo stato del job su JOB_STATE_CANCELLED con l'etichetta "force_cancel_job": "true". Il corpo della richiesta è:

    ​​{
      "requestedState": "JOB_STATE_CANCELLED",
      "labels": {
        "force_cancel_job": "true"
      }
    }
    

Rileva il completamento del job Dataflow

Per rilevare il completamento dell'annullamento o dello svuotamento del job, utilizza uno dei seguenti metodi:

  • Utilizza un servizio di orchestrazione dei flussi di lavoro come Cloud Composer per monitorare il job Dataflow.
  • Esegui la pipeline in modo sincrono in modo che le attività siano bloccate fino al completamento della pipeline. Per ulteriori informazioni, consulta la sezione Controllo delle modalità di esecuzione in Impostazione delle opzioni della pipeline in corso...
  • Utilizza lo strumento a riga di comando in Google Cloud CLI per eseguire il polling dello stato del job. Per ottenere un elenco di tutti i job Dataflow nel tuo progetto, esegui questo comando nella shell o nel terminale:

    gcloud dataflow jobs list
    

    L'output mostra l'ID, il nome, lo stato (STATE) e altre informazioni di ciascun job. Per ulteriori informazioni, vedi Utilizzo dell'interfaccia a riga di comando di Dataflow.

Archivia job Dataflow

Quando archivi un job Dataflow, il job viene rimosso dall'elenco di nella pagina Job di Dataflow nella console. Il job viene spostato in un elenco di job archiviati. Puoi archiviare solo i job completati, include i job nei seguenti stati:

  • JOB_STATE_CANCELLED
  • JOB_STATE_DRAINED
  • JOB_STATE_DONE
  • JOB_STATE_FAILED
  • JOB_STATE_UPDATED

Per ulteriori informazioni, vedi Rileva il completamento del job Dataflow in questo documento. Per informazioni sulla risoluzione dei problemi, vedi Archivia errori del job in "Risolvi gli errori di Dataflow".

Tutti i job completati vengono eliminati dopo un periodo di conservazione di 30 giorni.

Archiviare un job

Segui questi passaggi per rimuovere un lavoro completato dall'elenco dei job principale nella Pagina Job di Dataflow.

Console

  1. Nella console Google Cloud, vai alla pagina Job di Dataflow.

    Vai a Job

    Viene visualizzato un elenco di job Dataflow insieme al relativo stato.

  2. Seleziona un job.

  3. Nella pagina Dettagli job, fai clic su Archivia. Se il job non è stato l'opzione Archivia non è disponibile.

API

Per archiviare i job mediante l'API, utilizza JobMetadata . Nel campo JobMetadata, per userDisplayProperties, utilizza la coppia chiave-valore accoppia "archived":"true".

La richiesta API deve includere anche updateMask parametro di query.

curl --request PUT \

"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  --data
'{"job_metadata":{"userDisplayProperties":{"archived":"true"}}}' \
  --compressed

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto
  • REGION: una regione Dataflow
  • JOB_ID: l'ID del tuo job Dataflow.

Visualizza e ripristina i job archiviati

Segui questi passaggi per visualizzare i job archiviati o ripristinarli nell'istanza principale dei job nella pagina Job di Dataflow.

Console

  1. Nella console Google Cloud, vai alla pagina Job di Dataflow.

    Vai a Job

  2. Fai clic sul pulsante di attivazione/disattivazione Archiviati. Viene visualizzato un elenco di job Dataflow archiviati.

  3. Seleziona un job.

  4. ripristinare il job nell'elenco dei job principali in Dataflow. Job, quindi fai clic su Ripristina nella pagina Dettagli job.

API

Per ripristinare i job utilizzando l'API, utilizza JobMetadata . Nel campo JobMetadata, per userDisplayProperties, utilizza la coppia chiave-valore accoppia "archived":"false".

La richiesta API deve includere anche updateMask parametro di query.

curl --request PUT \

"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  --data
'{"job_metadata":{"userDisplayProperties":{"archived":"false"}}}' \
  --compressed

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto
  • REGION: una regione Dataflow
  • JOB_ID: l'ID del tuo job Dataflow.

Passaggi successivi