Arresta una pipeline Dataflow in esecuzione

Per arrestare un job Dataflow, utilizza la console Google Cloud, Cloud Shell, un terminale locale installato con Google Cloud CLI o l'API REST Dataflow.

Puoi arrestare un job Dataflow in uno dei tre modi seguenti:

  • Annulla un job. Questo metodo si applica alle pipeline in modalità flusso e batch. L'annullamento di un job interrompe l'elaborazione da parte del servizio Dataflow di tutti i dati, inclusi i dati nel buffer. Per ulteriori informazioni, consulta Annullare un job.

  • Svuota un job. Questo metodo si applica solo alle pipeline in modalità flusso. Lo svuotamento di un job consente al servizio Dataflow di completare l'elaborazione dei dati nel buffer, interrompendo contemporaneamente l'importazione di nuovi dati. Per ulteriori informazioni, consulta Svuotamento di un job.

  • Forza l'annullamento di un job. Questo metodo si applica alle pipeline in modalità flusso e batch. Forzando l'annullamento di un job, il servizio Dataflow interrompe immediatamente l'elaborazione di tutti i dati, inclusi i dati nel buffer. Prima di forzare l'annullamento, devi tentare un annullamento standard. La forzatura dell'annullamento è destinata solo ai job bloccati nella normale procedura di annullamento. Per ulteriori informazioni, consulta Forzare l'annullamento di un job.

Quando annulli un job, non puoi riavviarlo. Se non utilizzi i modelli flessibili, puoi clonare la pipeline annullata e avviare un nuovo job dalla pipeline clonata.

Prima di arrestare una pipeline di inserimento flussi, valuta la possibilità di creare uno snapshot del job. Gli snapshot di Dataflow salvano lo stato di una pipeline in modalità flusso, per consentirti di avviare una nuova versione del job Dataflow senza perdere lo stato. Per saperne di più, consulta Utilizzo degli snapshot di Dataflow.

Se hai una pipeline complessa, valuta la possibilità di creare un modello ed eseguire il job dal modello.

Non puoi eliminare i job Dataflow, ma puoi archiviare i job completati. Tutti i job completati vengono eliminati dopo un periodo di conservazione di 30 giorni.

Annulla un job Dataflow

Quando annulli un job, il servizio Dataflow lo interrompe immediatamente.

Quando annulli un job, si verificano le seguenti azioni:

  1. Il servizio Dataflow interrompe tutte le operazioni di importazione e elaborazione dei dati.

  2. Il servizio Dataflow inizia la pulizia delle risorse Google Cloud collegate al job.

    Queste risorse potrebbero includere l'arresto delle istanze worker di Compute Engine e la chiusura delle connessioni attive alle origini o ai sink di I/O.

Informazioni importanti sull'annullamento di un job

  • L'annullamento di un job interrompe immediatamente l'elaborazione della pipeline.

  • Potresti perdere i dati in corso quando annulli un job. I dati in corso si riferiscono a dati già letti ma ancora in fase di elaborazione da parte della pipeline.

  • I dati scritti dalla pipeline in un sink di output prima dell'annullamento del job potrebbero essere ancora accessibili nel sink di output.

  • Se la perdita di dati non è un problema, l'annullamento del job garantisce che le risorse Google Cloud associate al job vengano arrestate il prima possibile.

Svuota un job Dataflow

Quando esegui lo svuotamento di un job, il servizio Dataflow completa il job nel suo stato attuale. Se vuoi evitare la perdita di dati quando utilizzi le pipeline in modalità flusso, l'opzione migliore è svuotare il job.

Quando esegui lo svuotamento di un job, si verificano le seguenti azioni:

  1. Il job interrompe l'importazione di nuovi dati dalle origini di input poco dopo aver ricevuto la richiesta di svuotamento (in genere entro pochi minuti).

  2. Il servizio Dataflow conserva tutte le risorse esistenti, ad esempio le istanze worker, per completare l'elaborazione e la scrittura dei dati nel buffer nella pipeline.

  3. Quando tutte le operazioni di elaborazione e scrittura in attesa sono state completate, il servizio Dataflow arresta le risorse Google Cloud associate al job.

Per svuotare il job, Dataflow smette di leggere i nuovi input, contrassegna l'origine con un timestamp dell'evento impostato su infinito e propaga i timestamp infinito attraverso la pipeline. Di conseguenza, le pipeline in fase di svuotamento potrebbero avere una filigrana infinita.

Informazioni importanti sullo svuotamento di un job

  • Lo svuotamento di un job non è supportato per le pipeline batch.

  • La pipeline continua a sostenere i costi di mantenimento delle risorse Google Cloud associate fino al termine dell'elaborazione e della scrittura.

  • Puoi aggiornare una pipeline in fase di svuotamento. Se la pipeline è bloccata, l'aggiornamento della pipeline con il codice che corregge l'errore che crea il problema consente di eseguire correttamente lo svuotamento senza perdita di dati.

  • Puoi annullare un job in fase di svuotamento.

  • Lo svuotamento di un job può richiedere molto tempo, ad esempio quando la pipeline ha una grande quantità di dati nel buffer.

  • Se la pipeline in modalità flusso include un DoFn divisobile, devi troncare il risultato prima di eseguire l'opzione di svuotamento. Per ulteriori informazioni sul troncamento dei file DoFns suddivisibili, consulta la documentazione di Apache Beam.

  • In alcuni casi, un job Dataflow potrebbe non essere in grado di completare l'operazione di estrazione. Puoi consultare i log del job per determinare la causa principale e intraprendere le azioni appropriate.

Conservazione dei dati

  • I flussi di dati di Dataflow tollerano il riavvio dei worker e i job non riusciti in caso di errori. Il servizio Dataflow esegue un nuovo tentativo finché non intervieni su un'azione come l'annullamento o il riavvio del job. Quando svuota il job, Dataflow continua a riprovare, il che può portare a pipeline bloccate. In questo caso, per abilitare uno svuotamento riuscito senza perdita di dati, aggiorna la pipeline con il codice che corregge l'errore che crea il problema.

  • Dataflow non conferma i messaggi finché il servizio Dataflow li esegue in modo duraturo. Ad esempio, con Kafka puoi visualizzare questo processo come un trasferimento sicuro della proprietà del messaggio da Kafka a Dataflow, eliminando il rischio di perdita di dati.

Lavori bloccati

  • Lo svuotamento non risolve le pipeline bloccate. Se lo spostamento dei dati è bloccato, la pipeline rimane bloccata dopo il comando di svuotamento. Per risolvere il problema di una pipeline bloccata, utilizza il comando update per aggiornare la pipeline con il codice che risolve l'errore che crea il problema. Puoi anche annullare i job bloccati, ma l'annullamento dei job potrebbe comportare la perdita di dati.

Timer

  • Se il codice della pipeline in modalità flusso include un timer di loop, il job potrebbe essere lento o non può essere scaricato. Poiché lo svuotamento non termina prima del completamento di tutti i timer, lo svuotamento delle pipeline con timer a loop infiniti non termina mai.

  • Dataflow attende il completamento di tutti i timer del tempo di elaborazione anziché attivarli immediatamente, il che potrebbe causare svuotamenti lenti.

Effetti dello svuotamento di un job

Quando svuota una pipeline di inserimento flussi, Dataflow chiude immediatamente tutte le finestre in-process e attiva tutti gli attivatori.

Il sistema non attende il completamento di finestre temporali in sospeso in un'operazione di svuotamento.

Ad esempio, se la pipeline ha un intervallo di dieci minuti in una finestra di due ore quando scarichi il job, Dataflow non attende il completamento della finestra rimanente. Consente di chiudere immediatamente la finestra con risultati parziali. Dataflow determina la chiusura delle finestre aperte avanzando la filigrana dei dati su infinito. Questa funzionalità è compatibile anche con origini dati personalizzate.

Durante lo svuotamento di una pipeline che utilizza una classe di origine dati personalizzata, Dataflow interrompe l'invio di richieste di nuovi dati, avanza la filigrana dei dati all'infinito e chiama il metodo finalize() dell'origine sull'ultimo checkpoint.

Lo svuotamento può causare la visualizzazione di finestre parzialmente riempite. In questo caso, se riavvii la pipeline svuotata, la stessa finestra potrebbe essere attivata una seconda volta, il che può causare problemi ai dati. Ad esempio, nel seguente scenario, i file potrebbero avere nomi in conflitto e i dati potrebbero essere sovrascritti:

Se svuota una pipeline con windowing orario alle 12:34, la finestra dalle 12:00 alle 13:00 si chiude solo con i dati che sono stati attivati entro i primi 34 minuti della finestra. La pipeline non legge i nuovi dati dopo le 12:34.

Se riavvii immediatamente la pipeline, viene attivata di nuovo la finestra dalle 12:00 alle 13:00, con solo i dati letti dalle 12:35 alle 13:00. Non vengono inviati duplicati, ma se un nome file viene ripetuto, i dati vengono sovrascritti.

Nella console Google Cloud puoi visualizzare i dettagli delle trasformazioni della tua pipeline. Il seguente diagramma mostra gli effetti di un'operazione di svuotamento in-process. Tieni presente che la filigrana viene avanzata fino al valore massimo.

Una vista graduale di un'operazione di svuotamento.

Figura 1. Una vista graduale di un'operazione di svuotamento.

Forza l'annullamento di un job Dataflow

Utilizza l'annullamento forzato solo quando non riesci ad annullare il job con altri metodi. La forzatura dell'annullamento termina il job senza eliminare tutte le risorse. Se utilizzi ripetutamente l'annullamento forzato, le risorse trapelate potrebbero accumularsi e le risorse divulgate utilizzano la tua quota.

Quando forzi l'annullamento di un job, il servizio Dataflow lo interrompe immediatamente, perdendo le VM create dal job Dataflow. L'annullamento regolare deve essere tentato almeno 30 minuti prima di forzarne l'annullamento.

Quando forzi l'annullamento di un job, si verificano le seguenti azioni:

  • Il servizio Dataflow interrompe tutte le operazioni di importazione e elaborazione dei dati.

Informazioni importanti sull'annullamento forzato di un job

  • La forzatura dell'annullamento di un job interrompe immediatamente l'elaborazione della pipeline.

  • La forzatura dell'annullamento di un job è destinata solo ai job che sono rimasti bloccati nel normale processo di annullamento.

  • Le istanze worker create dal job Dataflow non vengono rilasciate necessariamente, il che potrebbe comportare la perdita di istanze worker. Le istanze worker compromesse non contribuiscono ai costi del job, ma potrebbero utilizzare la tua quota. Una volta completata l'annullamento del job, puoi eliminare queste risorse.

    Per i job Dataflow Prime, non puoi visualizzare o eliminare le VM divulgate. Nella maggior parte dei casi, queste VM non creano problemi. Tuttavia, se le VM trapelate causano problemi, ad esempio il consumo della quota VM, contatta l'assistenza.

Arresta un job Dataflow

Prima di interrompere un job, devi comprendere gli effetti dell'annullamento, del download o dell'annullamento di un job.

Console

  1. Vai alla pagina Job di Dataflow.

    Vai a Job

  2. Fai clic sul job che vuoi arrestare.

    Per arrestare un job, lo stato deve essere in esecuzione.

  3. Nella pagina dei dettagli del job, fai clic su Arresta.

  4. Esegui una di queste operazioni:

    • Per una pipeline batch, fai clic su Annulla o Forza annullamento.

    • Per una pipeline in modalità flusso, fai clic su Annulla, Svuota o Forza annullamento.

  5. Per confermare la tua scelta, fai clic su Arresta job.

gcloud

Per svuotare o annullare un job Dataflow, puoi utilizzare il comando gcloud dataflow jobs in Cloud Shell o in un terminale locale installato con gcloud CLI.

  1. Accedi alla tua shell.

  2. Elenca gli ID dei job Dataflow attualmente in esecuzione, quindi prendi nota dell'ID del job che vuoi arrestare:

    gcloud dataflow jobs list
    

    Se il flag --region non è impostato, vengono visualizzati i job Dataflow di tutte le aree geografiche disponibili.

  3. Esegui una di queste operazioni:

    • Per svuotare un job di streaming:

      gcloud dataflow jobs drain JOB_ID
      

      Sostituisci JOB_ID con l'ID job che hai copiato in precedenza.

    • Per annullare un job batch o flusso:

      gcloud dataflow jobs cancel JOB_ID
      

      Sostituisci JOB_ID con l'ID job che hai copiato in precedenza.

    • Per forzare l'annullamento di un job batch o flusso:

      gcloud dataflow jobs cancel JOB_ID --force
      

      Sostituisci JOB_ID con l'ID job che hai copiato in precedenza.

API

Per annullare o svuotare un job utilizzando l'API REST Dataflow, puoi scegliere projects.locations.jobs.update o projects.jobs.update. Nel corpo della richiesta, passa lo stato del job richiesto nel campo requestedState dell'istanza del job dell'API scelta.

Importante: è consigliabile utilizzare projects.locations.jobs.update, in quanto projects.jobs.update consente solo di aggiornare lo stato dei job in esecuzione in us-central1.

  • Per annullare il job, imposta lo stato del job su JOB_STATE_CANCELLED.

  • Per svuotare il job, imposta lo stato del job su JOB_STATE_DRAINED.

  • Per forzare l'annullamento del job, imposta lo stato del job su JOB_STATE_CANCELLED con l'etichetta "force_cancel_job": "true". Il corpo della richiesta è:

    ​​{
      "requestedState": "JOB_STATE_CANCELLED",
      "labels": {
        "force_cancel_job": "true"
      }
    }
    

Rileva il completamento del job di Dataflow

Per rilevare il completamento dell'annullamento o dello svuotamento del job, utilizza uno dei seguenti metodi:

  • Utilizza un servizio di orchestrazione del flusso di lavoro come Cloud Composer per monitorare il job di Dataflow.
  • Eseguire la pipeline in modo sincrono in modo da bloccare le attività fino al completamento della pipeline. Per maggiori informazioni, consulta Controllo delle modalità di esecuzione in Impostare le opzioni della pipeline.
  • Utilizza lo strumento a riga di comando in Google Cloud CLI per eseguire il polling dello stato del job. Per ottenere un elenco di tutti i job Dataflow nel progetto, esegui questo comando nella shell o nel terminale:

    gcloud dataflow jobs list
    

    L'output mostra l'ID, il nome, lo stato (STATE) e altre informazioni del job per ogni job. Per ulteriori informazioni, consulta Utilizzo dell'interfaccia a riga di comando di Dataflow.

Archivia job Dataflow

Quando archivi un job Dataflow, questo viene rimosso dall'elenco dei job nella pagina Job di Dataflow nella console. Il job viene spostato in un elenco dei job archiviati. Puoi archiviare solo i job completati, inclusi i job nei seguenti stati:

  • JOB_STATE_CANCELLED
  • JOB_STATE_DRAINED
  • JOB_STATE_DONE
  • JOB_STATE_FAILED
  • JOB_STATE_UPDATED

Per ulteriori informazioni, consulta Rilevamento del completamento del job Dataflow in questo documento. Per informazioni sulla risoluzione dei problemi, consulta Errori del job di archiviazione in "Risolvere gli errori di Dataflow".

Tutti i job completati vengono eliminati dopo un periodo di conservazione di 30 giorni.

Archiviare un job

Segui questi passaggi per rimuovere un job completato dall'elenco dei job principali nella pagina Job di Dataflow.

Console

  1. Nella console Google Cloud, vai alla pagina Job di Dataflow.

    Vai a Job

    Viene visualizzato un elenco di job Dataflow insieme al relativo stato.

  2. Seleziona un job.

  3. Nella pagina Dettagli job, fai clic su Archivia. Se il job non è stato completato, l'opzione Archivia non è disponibile.

API

Per archiviare i job utilizzando l'API, utilizza il campo JobMetadata. Nel campo JobMetadata, per userDisplayProperties, utilizza la coppia chiave-valore "archived":"true".

La richiesta API deve includere anche il parametro di query updateMask.

curl --request PUT \

"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties._archived" \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  --data
'{"job_metadata":{"userDisplayProperties":{"archived":"true"}}}' \
  --compressed

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto
  • REGION: una regione Dataflow
  • JOB_ID: l'ID del job Dataflow

Visualizza e ripristina job archiviati

Segui questi passaggi per visualizzare i job archiviati o per ripristinare i job archiviati nell'elenco dei job principali nella pagina Job di Dataflow.

Console

  1. Nella console Google Cloud, vai alla pagina Job di Dataflow.

    Vai a Job

  2. Fai clic sul pulsante di attivazione/disattivazione Archiviati. Viene visualizzato un elenco di job Dataflow archiviati.

  3. Seleziona un job.

  4. Per ripristinare il job nell'elenco dei job principali nella pagina Job di Dataflow, nella pagina Dettagli job fai clic su Ripristina.

API

Per ripristinare i job utilizzando l'API, utilizza il campo JobMetadata. Nel campo JobMetadata, per userDisplayProperties, utilizza la coppia chiave-valore "archived":"false".

La richiesta API deve includere anche il parametro di query updateMask.

curl --request PUT \

"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties._archived" \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Accept: application/json" \
  -H "Content-Type: application/json" \
  --data
'{"job_metadata":{"userDisplayProperties":{"archived":"false"}}}' \
  --compressed

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto
  • REGION: una regione Dataflow
  • JOB_ID: l'ID del job Dataflow

Passaggi successivi