Esegui l'upgrade di una pipeline in modalità flusso

Questa pagina fornisce indicazioni e suggerimenti per l'upgrade delle pipeline in modalità flusso. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente dell'SDK Apache Beam oppure aggiornare il codice della pipeline. Sono disponibili diverse opzioni a seconda degli scenari.

Mentre le pipeline batch si interrompono al completamento del job, le pipeline in modalità flusso spesso vengono eseguite in modo continuo per fornire un'elaborazione ininterrotta. Pertanto, quando esegui l'upgrade delle pipeline in modalità flusso, devi tenere conto delle seguenti considerazioni:

  • Potresti dover ridurre al minimo o evitare le interruzioni della pipeline. In alcuni casi, potresti essere in grado di tollerare un'interruzione temporanea dell'elaborazione durante il deployment di una nuova versione di una pipeline. In altri casi, l'applicazione potrebbe non essere in grado di tollerare le interruzioni.
  • I processi di aggiornamento della pipeline devono gestire le modifiche allo schema in modo da ridurre al minimo le interruzioni nell'elaborazione dei messaggi e in altri sistemi collegati. Ad esempio, se cambia lo schema per i messaggi in una pipeline di elaborazione eventi, potrebbero essere necessarie modifiche allo schema anche nei data sink a valle.

Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline in modalità flusso, a seconda della pipeline e dei requisiti di aggiornamento:

Per ulteriori informazioni sui problemi che potresti riscontrare durante un aggiornamento e su come prevenirli, consulta Convalida di un job di sostituzione e Controllo di compatibilità del job.

Best practice

  • Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da eventuali modifiche al codice della pipeline.
  • Testa la pipeline dopo ogni modifica prima di apportare ulteriori aggiornamenti.
  • Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla tua pipeline.

Esegui aggiornamenti in corso

Puoi aggiornare alcune pipeline di flussi di dati in corso senza interrompere il job. Questo scenario prende il nome di aggiornamento del lavoro in corso. Gli aggiornamenti dei lavori in corso sono disponibili solo in circostanze limitate:

  • Il job deve utilizzare Streaming Engine.
  • Il job deve essere in esecuzione.
  • Stai modificando solo il numero di worker utilizzati dal job.

Per ulteriori informazioni, consulta Impostare l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.

Per istruzioni che spiegano come eseguire un aggiornamento del job in corso, consulta Aggiornare una pipeline esistente.

Avviare un job di sostituzione

Se il job aggiornato è compatibile con il job esistente, puoi aggiornare la pipeline utilizzando l'opzione update. Quando sostituisci un job esistente, un nuovo job esegue il codice aggiornato della pipeline. Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività durante l'arresto del job esistente, l'esecuzione del controllo di compatibilità e l'avvio del nuovo job. Per ulteriori dettagli, consulta Effetti della sostituzione di un job.

Dataflow esegue un controllo di compatibilità per garantire che il codice aggiornato della pipeline possa essere distribuito in modo sicuro sulla pipeline in esecuzione. Alcune modifiche al codice comportano l'esito negativo del controllo della compatibilità, ad esempio quando vengono aggiunti o rimossi input aggiuntivi da un passaggio esistente. Se il controllo di compatibilità ha esito negativo, non puoi eseguire un aggiornamento del job in loco.

Per istruzioni che spiegano come avviare un job di sostituzione, consulta Avviare un job di sostituzione.

Se l'aggiornamento della pipeline non è compatibile con il job attuale, devi interrompere e sostituire la pipeline. Se la tua pipeline non può tollerare i tempi di inattività, esegui pipeline parallele.

Arresta e sostituisci le pipeline

Se puoi interrompere temporaneamente l'elaborazione, puoi annullare o svuotare la pipeline, per poi sostituirla con la pipeline aggiornata. L'annullamento di una pipeline provoca l'interruzione immediata dell'elaborazione da parte di Dataflow e l'arresto delle risorse il più rapidamente possibile, il che può causare una perdita di dati in fase di elaborazione, noti come dati in fase di elaborazione. Per evitare perdite di dati, nella maggior parte dei casi è preferibile lo svuotamento. Puoi anche utilizzare gli snapshot Dataflow per salvare lo stato di una pipeline di flusso, in modo da avviare una nuova versione del job Dataflow senza perdere lo stato. Per ulteriori informazioni, consulta Utilizzare gli snapshot Dataflow.

Lo svuotamento di una pipeline chiude immediatamente tutte le finestre in elaborazione e attiva tutti gli attivatori. Anche se i dati in corso non vanno persi, lo svuotamento potrebbe causare la conservazione di dati incompleti nelle finestre. In questo caso, le finestre in fase di elaborazione emettono risultati parziali o incompleti. Per maggiori informazioni, consulta Effetti dello svuotamento di un job. Al termine del job esistente, avvia un nuovo job di flussi di dati contenente il codice aggiornato della pipeline, che consente la ripresa dell'elaborazione.

Con questo metodo, si verificano tempi di inattività tra il momento in cui viene interrotto il job in modalità flusso esistente e il momento in cui la pipeline sostitutiva è pronta per riprendere l'elaborazione dei dati. Tuttavia, annullare o svuotare una pipeline esistente e quindi avviare un nuovo job con la pipeline aggiornata è meno complicato rispetto all'esecuzione di pipeline parallele.

Per istruzioni più dettagliate, consulta Svuotamento di un job Dataflow. Dopo aver svuotato il job attuale, avviane uno nuovo con lo stesso nome.

Rielaborazione dei messaggi con snapshot Pub/Sub e Seek

In alcune situazioni, dopo aver sostituito o annullato una pipeline svuotata, potrebbe essere necessario rielaborare i messaggi Pub/Sub consegnati in precedenza. Ad esempio, potresti dover utilizzare la logica di business aggiornata per rielaborare i dati. Pub/Sub Seek è una funzionalità che consente di ripetere i messaggi da uno snapshot Pub/Sub. Puoi utilizzare Pub/Sub Seek con Dataflow per rielaborare i messaggi dal momento in cui viene creato lo snapshot della sottoscrizione.

Durante lo sviluppo e il test, puoi anche utilizzare Pub/Sub Seek per riprodurre ripetutamente i messaggi noti per verificare l'output dalla pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot della sottoscrizione mentre l'abbonamento viene utilizzato da una pipeline. In questo caso, la ricerca può invalidare la logica della filigrana di Dataflow e influire sull'elaborazione "exactly-once" dei messaggi Pub/Sub.

Di seguito è riportato un flusso di lavoro consigliato per l'interfaccia a riga di comando gcloud per utilizzare Pub/Sub Seek con le pipeline Dataflow in una finestra del terminale:

  1. Per creare uno snapshot della sottoscrizione, utilizza il comando gcloud pubsub snapshots create:

    gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
    
  2. Per svuotare o annullare la pipeline, utilizza il comando gcloud dataflow jobs drain o gcloud dataflow jobs cancel:

    gcloud dataflow jobs drain JOB_ID
    

    o

    gcloud dataflow jobs cancel JOB_ID
    
  3. Per eseguire la ricerca fino allo snapshot, utilizza il comando gcloud pubsub subscriptions seek:

    gcloud pubsub subscriptions seek SNAPSHOT_NAME
    
  4. Eseguire il deployment di una nuova pipeline che utilizza la sottoscrizione.

Esegui pipeline parallele

Se devi evitare interruzioni della pipeline in modalità flusso durante un aggiornamento, esegui pipeline parallele. Creare un nuovo job di flusso con il codice aggiornato della pipeline ed eseguire la nuova pipeline in parallelo con quella esistente.

Quando crei la nuova pipeline, utilizza la stessa strategia di windowing utilizzata per la pipeline esistente. Lascia che la pipeline esistente continui a essere eseguita finché la filigrana non supera il timestamp della prima finestra di completamento elaborata dalla pipeline aggiornata. Quindi svuota o annulla la pipeline esistente. La pipeline aggiornata continua a essere eseguita al suo posto e assume in modo efficace il processo di elaborazione in modo autonomo.

Il seguente diagramma illustra questa procedura.

La pipeline B si sovrappone alla pipeline B per un periodo di 5 minuti.

Nel diagramma, la pipeline B è il job aggiornato che prende il controllo dalla pipeline A. Il valore t è il timestamp della prima finestra completa elaborata dalla pipeline B. Il valore w corrisponde alla filigrana della pipeline A. Per semplicità, si presume una filigrana perfetta senza dati in ritardo. Il tempo di elaborazione e il tempo di esecuzione sono rappresentati sull'asse orizzontale. Entrambe le pipeline usano finestre fisse di cinque minuti (a cascata). I risultati vengono attivati dopo che la filigrana supera la fine di ogni finestra.

Poiché gli output simultanei si verificano durante il periodo di tempo in cui le due pipeline si sovrappongono, configurale in modo da scrivere i risultati in destinazioni diverse. I sistemi downstream possono quindi utilizzare un'astrazione sui due sink di destinazione, come una vista di database, per eseguire query sui risultati combinati. Questi sistemi possono anche utilizzare l'astrazione per deduplicare i risultati dal periodo in sovrapposizione.

L'esempio seguente descrive l'approccio di utilizzo di una pipeline che legge i dati di input da Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.

  1. Nello stato iniziale, la pipeline di inserimento flussi esistente (pipeline A) è in esecuzione e legge i messaggi di un argomento Pub/Sub (argomento) utilizzando una sottoscrizione (sottoscrizione A). I risultati vengono scritti in una tabella BigQuery (Tabella A). I risultati vengono visualizzati in una vista di BigQuery, che funge da facciata per mascherare le modifiche sottostanti alla tabella. Questo processo è l'applicazione di un metodo di progettazione chiamato pattern di facciata. Il seguente diagramma mostra lo stato iniziale.

    Una pipeline con una sola sottoscrizione e scrittura in una singola tabella BigQuery.

  2. Crea una nuova sottoscrizione (sottoscrizione B) per la pipeline aggiornata. Esegui il deployment della pipeline aggiornata (pipeline B), che legge dall'argomento Pub/Sub (Argomento) utilizzando Subscription B e scrive in una tabella BigQuery separata (tabella B). Il seguente diagramma illustra questo flusso.

    Due pipeline, ciascuna con una sottoscrizione. Ogni pipeline scrive in una tabella BigQuery separata. Una vista facciata legge da entrambe le tabelle.

    A questo punto, la pipeline A e la pipeline B sono in esecuzione in parallelo e scrivono i risultati in tabelle separate. Registri il tempo t come timestamp della prima finestra di completamento elaborata dalla pipeline B.

  3. Quando la filigrana della pipeline A supera il tempo t, svuota la pipeline A. Quando svuota la pipeline, tutte le finestre aperte vengono chiuse e l'elaborazione dei dati in corso viene completata. Se la pipeline contiene finestre e finestre complete (a condizione che non siano presenti dati in ritardo), prima di svuotare la pipeline A, consenti l'esecuzione di entrambe le pipeline fino a quando non avrai completato le finestre sovrapposte. Interrompi il job di flusso per la pipeline A dopo che tutti i dati in corso sono stati elaborati e scritti nella tabella A. Il seguente diagramma mostra questa fase.

    La pipeline A svuota e non legge più la sottoscrizione A. Inoltre, non invia più dati alla tabella A al termine dello svuotamento. L'intera elaborazione è gestita dalla seconda pipeline.

  4. Al momento è in esecuzione solo la pipeline B. Puoi eseguire query da una vista BigQuery (Vista Façade), che funge da facciata per la Tabella A e la Tabella B. Per le righe che hanno lo stesso timestamp in entrambe le tabelle, configura la visualizzazione in modo che restituisca le righe della Tabella B oppure, se le righe non esistono nella Tabella B, torna alla Tabella A. Il seguente diagramma mostra la lettura (Façade View) dalla Tabella A e dalla Tabella B.

    La pipeline A non c'è più e viene eseguita solo la pipeline B.

    A questo punto, puoi eliminare la sottoscrizione A.

Quando vengono rilevati problemi con il deployment di una nuova pipeline, la presenza di pipeline parallele semplifica il rollback. In questo esempio, potresti voler mantenere in esecuzione la pipeline A mentre monitori la pipeline B per il corretto funzionamento. Se si verificano problemi con la pipeline B, puoi eseguire il rollback alla pipeline A.

Limitazioni

Questo approccio presenta le seguenti limitazioni:

  • L'esecuzione di due pipeline sullo stesso input può generare dati duplicati nell'output. Il sistema downstream deve essere a conoscenza e in grado di tollerare i dati duplicati.
  • Durante la lettura da un'origine Pub/Sub, non è consigliato utilizzare la stessa sottoscrizione per più pipeline, in quanto questo può causare problemi di correttezza. Tuttavia, in alcuni casi d'uso, come le pipeline ETL (Extract, Transform, Load), l'utilizzo della stessa sottoscrizione tra due pipeline potrebbe ridurre la duplicazione. È probabile che i problemi di scalabilità automatica in questo scenario siano mitigati utilizzando la funzionalità di aggiornamento dei job in corso. Per maggiori informazioni, consulta Ottimizzare la scalabilità automatica delle pipeline di flusso Pub/Sub.
  • Durante la lettura da un'origine Pub/Sub, l'utilizzo di una seconda sottoscrizione genera duplicati, ma non causa problemi di correttezza dei dati e scalabilità automatica.

Gestire le mutazioni dello schema

I sistemi di gestione dei dati spesso devono supportare le mutazioni dello schema nel tempo, talvolta a causa di cambiamenti nei requisiti aziendali e altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione per evitare interruzioni dei sistemi informativi aziendali.

Consideriamo una pipeline che legge i messaggi che contengono payload JSON da un argomento Pub/Sub. La pipeline converte ogni messaggio in un'istanza di TableRow e poi scrive le righe in una tabella BigQuery. Lo schema della tabella di output è simile a quello dei messaggi elaborati dalla pipeline. Nel diagramma seguente, lo schema è denominato Schema A.

Pipeline che legge una sottoscrizione e scrive in una tabella di output BigQuery utilizzando lo schema A.

Nel tempo, lo schema dei messaggi potrebbe cambiare in modi non banali. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Lo schema A si evolve in un nuovo schema. Nella discussione che segue, il nuovo schema è denominato Schema B. In questo caso, la pipeline A deve essere aggiornata e lo schema della tabella di output deve supportare lo schema B.

Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza centro. Ad esempio, puoi aggiungere nuovi campi o allentare le modalità colonna, ad esempio modificando REQUIRED in NULLABLE, senza tempi di inattività. In genere queste mutazioni non influiscono sulle query esistenti. Tuttavia, le mutazioni dello schema che modificano o rimuovono i campi dello schema esistenti interrompono le query o comportano altre interruzioni. Il seguente approccio supporta le modifiche senza tempi di inattività.

Separa i dati scritti dalla pipeline in una tabella entità e in una o più tabelle temporanee. La tabella dell'entità archivia i dati storici scritti dalla pipeline. Le tabelle temporanee memorizzano l'output della pipeline più recente. Puoi definire una vista di facciata di BigQuery sulle tabelle principale e di gestione temporanea, che consente ai consumatori di eseguire query su dati storici e aggiornati.

Il seguente diagramma rivede il flusso della pipeline precedente per includere una tabella temporanea (Tabella temporanea A), una tabella dell'entità e una vista di facciata.

Pipeline che legge una sottoscrizione e scrive in una tabella temporanea di BigQuery. Una seconda tabella (entità) ha l'output di una versione precedente dello schema. Una vista di facciata esegue la lettura sia della tabella temporanea che della tabella dell'entità.

Nel flusso rivisto, la pipeline A elabora i messaggi che utilizzano Schema A e scrive l'output nella tabella temporanea A, che ha uno schema compatibile. La tabella dell'entità contiene dati storici scritti dalle versioni precedenti della pipeline, nonché risultati che vengono uniti periodicamente dalla tabella di gestione temporanea. I consumatori possono eseguire query su dati aggiornati, compresi i dati storici e in tempo reale, utilizzando la vista di facciata.

Quando lo schema dei messaggi cambia da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano lo Schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. L'esecuzione di pipeline parallele garantisce che l'elaborazione dei flussi di dati prosegua senza interruzioni. La terminazione e la sostituzione delle pipeline comporta un'interruzione nell'elaborazione, perché nessuna pipeline è in esecuzione per un periodo di tempo.

La pipeline aggiornata scrive in una tabella temporanea aggiuntiva (Tabella temporanea B) che utilizza lo schema B. Puoi utilizzare un flusso di lavoro orchestrato per creare la nuova tabella temporanea prima di aggiornare la pipeline. Aggiorna la vista della facciata in modo da includere i risultati della nuova tabella temporanea, potenzialmente utilizzando un passaggio del flusso di lavoro correlato.

Il seguente diagramma mostra il flusso aggiornato che mostra la tabella temporanea B con lo schema B e il modo in cui la vista facciata viene aggiornata per includere i contenuti della tabella principale e di entrambe le tabelle temporanee.

La pipeline ora utilizza lo schema B e scrive nella tabella temporanea B. Una vista facciata legge le informazioni dalla tabella Entità, dalla Tabella temporanea A e dalla Tabella temporanea B.

Come processo separato dall'aggiornamento della pipeline, puoi unire le tabelle temporanee alla tabella entità, periodicamente o secondo necessità. Il seguente diagramma mostra come la Tabella temporanea A viene unita nella tabella dell'entità.

La tabella temporanea A viene unita nella tabella dell'entità. La vista facciata legge la tabella temporanea B e quella dell'entità.

Passaggi successivi