Esegui l'upgrade di una pipeline in modalità flusso

Questa pagina fornisce indicazioni e suggerimenti per l'upgrade delle pipeline di streaming. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente dell'SDK Apache Beam oppure aggiornare il codice della pipeline. Sono disponibili diverse opzioni per soddisfare scenari differenti.

Mentre le pipeline batch si interrompono al completamento del job, le pipeline in modalità flusso spesso vengono eseguite in modo continuo per fornire elaborazione ininterrotta. Di conseguenza, quando esegui l'upgrade di pipeline in modalità flusso, devi tenere conto dei seguenti aspetti:

  • Potrebbe essere necessario ridurre al minimo o evitare le interruzioni della pipeline. In alcuni casi, potresti tollerare un'interruzione temporanea dell'elaborazione durante il deployment di una nuova versione di una pipeline. In altri casi, l'applicazione potrebbe non tollerare alcuna interruzione.
  • I processi di aggiornamento della pipeline devono gestire le modifiche allo schema in modo da ridurre al minimo le interruzioni nell'elaborazione dei messaggi e in altri sistemi collegati. Ad esempio, se lo schema per i messaggi in una pipeline di elaborazione di eventi cambia, le modifiche allo schema potrebbero essere necessarie anche nei data sink a valle.

Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline in modalità flusso, a seconda della pipeline e dei requisiti di aggiornamento:

Per ulteriori informazioni sui problemi che potresti riscontrare durante un aggiornamento e su come evitarli, consulta Convalidare un job di sostituzione e Controllo compatibilità dei job.

best practice

  • Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da eventuali modifiche al codice della pipeline.
  • Testa la pipeline dopo ogni modifica prima di apportare aggiornamenti aggiuntivi.
  • Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla tua pipeline.

Esegui aggiornamenti in corso

Puoi aggiornare alcune pipeline in modalità flusso in corso senza arrestare il job. Questo scenario viene chiamato aggiornamento del job in corso. Gli aggiornamenti dei job in corso sono disponibili solo in circostanze limitate:

  • Il job deve utilizzare Streaming Engine.
  • Il job deve essere in esecuzione.
  • Stai modificando solo il numero di worker utilizzati dal job.

Per ulteriori informazioni, consulta Impostare l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.

Per istruzioni che spiegano come eseguire un aggiornamento del job in corso, consulta Aggiornare una pipeline esistente.

Avvia un job di sostituzione

Se il job aggiornato è compatibile con il job esistente, puoi aggiornare la pipeline utilizzando l'opzione update. Quando sostituisci un job esistente, un nuovo job esegue il codice della pipeline aggiornato. Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività mentre il job esistente si arresta, viene eseguito il controllo della compatibilità e viene avviato il nuovo job. Per maggiori dettagli, consulta Effetti della sostituzione di un job.

Dataflow esegue un controllo di compatibilità per garantire che sia possibile eseguire il deployment del codice della pipeline aggiornato nella pipeline in esecuzione. Alcune modifiche al codice impediscono il controllo della compatibilità, ad esempio quando gli input collaterali vengono aggiunti o rimossi da un passaggio esistente. Quando il controllo della compatibilità non va a buon fine, non puoi eseguire un aggiornamento del job sul posto.

Per istruzioni che spiegano come avviare un job di sostituzione, consulta Avviare un job di sostituzione.

Se l'aggiornamento della pipeline non è compatibile con il job attuale, devi arrestare e sostituire la pipeline. Se la tua pipeline non può tollerare i tempi di inattività, esegui pipeline parallele.

Arresta e sostituisci pipeline

Se puoi interrompere temporaneamente l'elaborazione, puoi annullare o svuotare la pipeline, quindi sostituirla con la pipeline aggiornata. L'annullamento di una pipeline comporta l'interruzione immediata di Dataflow e l'arresto delle risorse il più rapidamente possibile da parte di Dataflow, il che può causare una perdita di dati in corso di elaborazione. Per evitare perdite di dati, nella maggior parte dei casi, lo svuotamento è l'azione preferita.

Lo svuotamento di una pipeline chiude immediatamente tutte le finestre in-process e attiva tutti gli attivatori. Anche se i dati in corso non vengono persi, lo svuotamento potrebbe far sì che le finestre abbiano dati incompleti. In questo caso, le finestre in-process emettono risultati parziali o incompleti. Per ulteriori informazioni, consulta Effetti dello svuotamento di un job. Dopo il completamento del job esistente, avvia un nuovo job di inserimento di flussi che contiene il codice della pipeline aggiornato, in modo da riprendere l'elaborazione.

Con questo metodo, si verifica un tempo di inattività tra il momento in cui il job di inserimento di flussi esistente viene interrotto e il momento in cui la pipeline sostitutiva è pronta per riprendere l'elaborazione dei dati. Tuttavia, annullare o svuotare una pipeline esistente e poi avviare un nuovo job con la pipeline aggiornata è meno complicato rispetto all'esecuzione di pipeline parallele.

Per istruzioni più dettagliate, consulta Svuotamento di un job Dataflow. Dopo aver svuotato il job corrente, avviane uno nuovo con lo stesso nome.

Rielaborazione dei messaggi con lo snapshot e la ricerca Pub/Sub

In alcune situazioni, dopo aver sostituito o annullato una pipeline svuotata, potrebbe essere necessario rielaborare i messaggi Pub/Sub consegnati in precedenza. Ad esempio, per rielaborare i dati potrebbe essere necessario utilizzare una logica di business aggiornata. Pub/Sub Seek è una funzionalità che consente di riprodurre i messaggi da uno snapshot Pub/Sub. Puoi utilizzare Pub/Sub Seek con Dataflow per rielaborare i messaggi dal momento della creazione dello snapshot della sottoscrizione.

Durante lo sviluppo e il test, puoi anche usare Pub/Sub Seek per riprodurre ripetutamente i messaggi noti e verificare l'output della pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot della sottoscrizione quando la sottoscrizione viene utilizzata da una pipeline. Se lo fai, la ricerca può invalidare la logica della filigrana di Dataflow e potrebbe influire sull'elaborazione "exactly-once" dei messaggi Pub/Sub.

Un flusso di lavoro gcloud CLI consigliato per l'utilizzo di Pub/Sub Seek con pipeline Dataflow in una finestra del terminale è il seguente:

  1. Per creare uno snapshot della sottoscrizione, utilizza il comando gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Per svuotare o annullare la pipeline, usa il comando gcloud dataflow jobs drain o gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    o

    gcloud dataflow jobs cancel JOB_ID
    
  3. Per cercare lo snapshot, utilizza il comando gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Eseguire il deployment di una nuova pipeline che utilizza la sottoscrizione.

Esegui pipeline parallele

Se devi evitare interruzioni della pipeline in modalità flusso durante un aggiornamento, esegui pipeline parallele. Crea un nuovo job di inserimento flussi con il codice pipeline aggiornato ed esegui la nuova pipeline in parallelo con quella esistente.

Quando crei la nuova pipeline, utilizza la stessa strategia di windowing che hai utilizzato per la pipeline esistente. Lascia che la pipeline esistente continui a essere eseguita finché la filigrana non supera il timestamp della prima finestra completa elaborata dalla pipeline aggiornata. Quindi, svuota o annulla la pipeline esistente. La pipeline aggiornata continua a essere eseguita al suo posto e prende in carico il processo di elaborazione in modo autonomo.

Il seguente diagramma illustra questo processo.

La pipeline B si sovrappone alla pipeline B per un periodo di 5 minuti.

Nel diagramma, la pipeline B è il job aggiornato che prende il controllo da pipeline A. Il valore t è il timestamp della prima finestra completa elaborata dalla pipeline B. Il valore w corrisponde alla filigrana della pipeline A. Per semplicità, si presume una filigrana perfetta senza dati recenti. I tempi di elaborazione e di esecuzione sono rappresentati sull'asse orizzontale. Entrambe le pipeline utilizzano finestre fisse (rullante) di cinque minuti. I risultati vengono attivati dopo che la filigrana supera la fine di ogni finestra.

Poiché l'output in parallelo si verifica durante il periodo di tempo in cui le due pipeline si sovrappongono, configurale per scrivere i risultati in destinazioni diverse. I sistemi downstream possono quindi utilizzare un'astrazione sui due sink di destinazione, ad esempio una vista database, per eseguire query sui risultati combinati. Questi sistemi possono utilizzare l'astrazione anche per deduplicare i risultati del periodo in sovrapposizione.

L'esempio seguente descrive l'approccio che prevede l'utilizzo di una pipeline che legge i dati di input di Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.

  1. Nello stato iniziale, la pipeline di inserimento flussi esistente (Pipeline A) è in esecuzione e legge i messaggi di un argomento Pub/Sub (Argomento) utilizzando una sottoscrizione (Sottoscrizione A). I risultati vengono scritti in una tabella BigQuery (Tabella A). I risultati vengono consumati tramite una vista BigQuery, che agisce come una facciata per mascherare le modifiche sottostanti alle tabelle. Si tratta dell'applicazione di un metodo di progettazione chiamato schema di façade. Il seguente diagramma mostra lo stato iniziale.

    Una pipeline con una sottoscrizione e scrittura in una singola tabella BigQuery.

  2. Crea una nuova sottoscrizione (Sottoscrizione B) per la pipeline aggiornata. Esegui il deployment della pipeline aggiornata (Pipeline B), che legge dall'argomento Pub/Sub (Argomento) utilizzando la sottoscrizione B e scrive in una tabella BigQuery separata (Tabella B). Il seguente diagramma illustra questo flusso.

    Due pipeline, ciascuna con un abbonamento. Ogni pipeline scrive in una tabella BigQuery separata. Una visualizzazione façade legge entrambe le tabelle.

    A questo punto, la pipeline A e la pipeline B sono in esecuzione in parallelo e scrivono i risultati in tabelle separate. Registri il tempo t come timestamp della prima finestra completa elaborata da Pipeline B.

  3. Quando la filigrana della pipeline A supera il tempo t, svuota la pipeline A. Quando scarichi la pipeline, tutte le finestre aperte si chiudono e l'elaborazione dei dati in corso viene completata. Se le finestre complete sono importanti (supponendo che non ci siano dati recenti), prima di svuotare Pipeline A, lascia l'esecuzione di entrambe le pipeline finché non avrai completato le finestre sovrapposte. Arresta il job di inserimento di flussi per pipeline A dopo che tutti i dati in corso sono stati elaborati e scritti nella Tabella A. Il seguente diagramma mostra questa fase.

    La pipeline A svuota, non legge più la sottoscrizione A e non invia più dati alla Tabella A una volta completato lo svuotamento. L'intera elaborazione viene gestita dalla seconda pipeline.

  4. Al momento è in esecuzione solo la pipeline B. Puoi eseguire query da una vista BigQuery (Vista facciata), che agisce come una facciata per la Tabella A e la Tabella B. Per le righe che hanno lo stesso timestamp in entrambe le tabelle, configura la visualizzazione in modo che restituisca le righe della Tabella B oppure, se le righe non esistono nella Tabella B, torna alla Tabella A. Il seguente schema mostra la lettura (Vista di facciata) sia della Tabella A sia della Tabella B.

    La pipeline A non è più disponibile ed viene eseguita solo la pipeline B.

    A questo punto, puoi eliminare l'Abbonamento A.

Quando vengono rilevati problemi con il deployment di una nuova pipeline, avere pipeline parallele può semplificare il rollback. In questo esempio, potresti voler mantenere in esecuzione Pipeline A mentre monitori Pipeline B per verificarne il funzionamento. Se si verificano problemi con la pipeline B, puoi eseguire il rollback alla pipeline A.

Gestire le mutazioni dello schema

I sistemi di gestione dei dati spesso devono supportare mutazioni dello schema nel tempo, a volte a causa di cambiamenti nei requisiti aziendali e altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione per evitare interruzioni dei sistemi informativi aziendali.

Prendi in considerazione una pipeline che legga i messaggi che contengono payload JSON da un argomento Pub/Sub. La pipeline converte ogni messaggio in un'istanza di TableRow, quindi scrive le righe in una tabella BigQuery. Lo schema della tabella di output è simile ai messaggi elaborati dalla pipeline. Nel diagramma seguente, lo schema è denominato Schema A.

Pipeline che legge una sottoscrizione e scrive in una tabella di output BigQuery utilizzando lo schema A.

Nel tempo, lo schema dei messaggi potrebbe variare in modo non banale. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Lo schema A si evolve in un nuovo schema. Nella discussione che segue, il nuovo schema sarà denominato Schema B. In questo caso, è necessario aggiornare la pipeline A e lo schema della tabella di output deve supportare lo schema B.

Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza il centro. Ad esempio, puoi aggiungere nuovi campi o ridurre le modalità delle colonne, ad esempio modificando REQUIRED in NULLABLE, senza tempi di inattività. Queste mutazioni di solito non influiscono sulle query esistenti. Tuttavia, le mutazioni dello schema che modificano o rimuovono campi dello schema esistenti interrompono le query o causano altre interruzioni. Il seguente approccio consente di apportare modifiche senza richiedere tempi di inattività.

Separa i dati scritti dalla pipeline in una tabella entità e in una o più tabelle temporanee. La tabella principale archivia i dati storici scritti dalla pipeline. Le tabelle temporanee archiviano l'output della pipeline più recente. Puoi definire una visualizzazione di facciata di BigQuery sulle tabelle principali e temporanee, che consente ai consumatori di eseguire query su dati storici e aggiornati.

Il seguente diagramma rivede il flusso della pipeline precedente in modo da includere una tabella temporanea (Tabella temporanea A), una tabella principale e una visualizzazione façade.

Pipeline che legge una sottoscrizione e scrive in una tabella temporanea BigQuery. Una seconda tabella (entità) ha l'output di una versione precedente dello schema. Una visualizzazione façade legge le informazioni sia dalla tabella temporanea che dalla tabella principale.

Nel flusso rivisto, pipeline A elabora i messaggi che utilizzano lo schema A e scrive l'output nella tabella temporanea A, che ha uno schema compatibile. La tabella principale contiene dati storici scritti da versioni precedenti della pipeline, nonché i risultati che vengono uniti periodicamente dalla tabella temporanea. I consumatori possono eseguire query su dati aggiornati, compresi dati storici e in tempo reale, utilizzando la vista di façade.

Quando lo schema dei messaggi cambia da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano lo Schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. Eseguendo pipeline parallele, puoi assicurarti che l'elaborazione dei flussi di dati continui senza interruzioni. L'interruzione e la sostituzione delle pipeline comporta un'interruzione dell'elaborazione, perché nessuna pipeline è in esecuzione per un periodo di tempo.

La pipeline aggiornata scrive in una tabella temporanea aggiuntiva (Tabella temporanea B) che utilizza lo Schema B. Puoi usare un flusso di lavoro orchestrato per creare la nuova tabella temporanea prima di aggiornare la pipeline. Aggiorna la visualizzazione della facciata in modo da includere i risultati della nuova tabella temporanea, utilizzando potenzialmente un passaggio del flusso di lavoro correlato.

Il seguente diagramma mostra il flusso aggiornato che mostra la tabella temporanea B con lo schema B e il modo in cui la visualizzazione di façade viene aggiornata in modo da includere i contenuti della tabella principale e di entrambe le tabelle temporanee.

La pipeline ora utilizza lo schema B e scrive nella tabella temporanea B. Una vista facciata legge la tabella Principal, la tabella temporanea A e la tabella temporanea B.

Come processo separato dall'aggiornamento della pipeline, puoi unire le tabelle temporanee nella tabella entità periodicamente o in base alle esigenze. Il seguente schema mostra come viene unita la tabella temporanea A nella tabella principale.

La tabella temporanea A viene unita nella tabella entità. La vista facciata legge dalla tabella temporanea B e dalla tabella principale.

Passaggi successivi