Questa pagina fornisce indicazioni e consigli per eseguire l'upgrade del tuo streaming pipeline di dati. Ad esempio, potresti dover eseguire l'upgrade a una versione più recente SDK Apache Beam o aggiornare il codice della pipeline. Diverso sono disponibili opzioni adatte a diversi scenari.
Le pipeline batch invece si interrompono al completamento del job, le pipeline spesso vengono eseguite in modo continuo per e l'elaborazione dei dati. Quando esegui l'upgrade delle pipeline in modalità flusso, devi quindi tenere conto per le seguenti considerazioni:
- Potresti dover ridurre al minimo o evitare le interruzioni della pipeline. In alcuni casi, potresti essere in grado di tollerare un'interruzione temporanea dell'elaborazione mentre del deployment di una pipeline. In altri casi, l'applicazione potrebbe non essere in grado di tollerare interruzioni.
- I processi di aggiornamento della pipeline devono gestire le modifiche allo schema in modo da riduce al minimo le interruzioni dell'elaborazione dei messaggi e di altri sistemi collegati. Ad esempio, se lo schema per i messaggi in una pipeline di elaborazione degli eventi modifiche, potrebbero essere necessarie modifiche allo schema anche nei data sink di dati downstream.
Puoi utilizzare uno dei seguenti metodi per aggiornare le pipeline in modalità flusso, a seconda della pipeline e dei requisiti di aggiornamento:
Per ulteriori informazioni sui problemi che potresti riscontrare durante aggiornare e come prevenirli, consulta Convalidare un lavoro di sostituzione e Controllo della compatibilità dei job.
Best practice
- Esegui l'upgrade della versione dell'SDK Apache Beam separatamente da qualsiasi modifiche al codice della pipeline.
- Testa la pipeline dopo ogni modifica prima di apportare ulteriori aggiornamenti.
- Esegui regolarmente l'upgrade della versione dell'SDK Apache Beam utilizzata dalla tua pipeline.
Esegui aggiornamenti in corso
Puoi aggiornare alcune pipeline di flusso in corso senza interrompere un lavoro. Questo scenario prende il nome di aggiornamento del lavoro in corso. Aggiornamenti sulle offerte di lavoro in corso sono disponibili solo in circostanze limitate:
- Il job deve utilizzare Streaming Engine.
- Il job deve essere in esecuzione.
- Stai modificando solo il numero di worker utilizzati dal job.
Per ulteriori informazioni, vedi Imposta l'intervallo di scalabilità automatica nella pagina Scalabilità automatica orizzontale.
Per istruzioni che spiegano come eseguire un aggiornamento di un job in corso, consulta Aggiorna una pipeline esistente.
Avviare un job di sostituzione
Se il job aggiornato è compatibile con il job esistente, puoi aggiornare
mediante l'opzione update
. Quando sostituisci un job esistente, viene
il job esegue il codice aggiornato della pipeline.
Il servizio Dataflow conserva il nome del job, ma esegue la sostituzione
job con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività
durante l'arresto del job esistente, l'esecuzione del controllo di compatibilità e il nuovo job
. Per ulteriori dettagli, vedi
Effetti della sostituzione di un job.
Dataflow esegue verifica di compatibilità per garantire che il codice aggiornato della pipeline possa essere il deployment nella pipeline in esecuzione. Alcune modifiche al codice causano la compatibilità controllo per non riuscire, ad esempio quando vengono aggiunti o rimossi input aggiuntivi da un passaggio. Se il controllo di compatibilità ha esito negativo, non puoi eseguire un aggiornamento del job in loco.
Per istruzioni su come avviare un job di sostituzione, consulta Avvia un job di sostituzione.
Se l'aggiornamento della pipeline non è compatibile con il job attuale, devi interrompere e sostituire la pipeline. Se la pipeline non può tollerare i tempi di inattività, eseguire pipeline parallele.
Arresta e sostituisci le pipeline
Se puoi interrompere temporaneamente l'elaborazione, puoi annulla o scarica la pipeline, per poi sostituirla con quella aggiornata. L'annullamento di una pipeline fa sì che Dataflow interrompere immediatamente l'elaborazione e arrestare le risorse il più rapidamente possibile, può causare una perdita di dati in fase di elaborazione, noti come dati "in-flight". Per evitare perdite di dati, nella maggior parte dei casi è preferibile lo svuotamento. Puoi anche usare gli snapshot Dataflow per salvare lo stato di un pipeline di flusso, che ti consente di avviare una nuova versione Job Dataflow senza perdita di stato. Per ulteriori informazioni, vedi Usa gli snapshot Dataflow.
Lo svuotamento di una pipeline chiude immediatamente qualsiasi processo in corso finestre e attiva tutti attivatori. Anche se i dati in corso non vengono persi, lo svuotamento potrebbe provocare la dati incompleti. In questo caso, le finestre in fase di elaborazione emettono parziali o incomplete che consentono di analizzare i dati e visualizzare i risultati. Per ulteriori informazioni, vedi Effetti dello svuotamento di un job. Al termine del job esistente, avvia un nuovo job di flussi di dati contiene il codice aggiornato della pipeline, che consente la ripresa dell'elaborazione.
Con questo metodo, si verificano tempi di inattività tra il momento in cui il job in modalità flusso esistente viene interrotto e la data e l'ora in cui viene eseguita la pipeline sostitutiva pronti per riprendere l'elaborazione dei dati. Tuttavia, l'annullamento o lo svuotamento di un account esistente di Cloud Shell e quindi di avviare un nuovo job con la pipeline aggiornata sarà inferiore complicata rispetto all'esecuzione di pipeline parallele.
Per istruzioni più dettagliate, vedi Svuota un job Dataflow. Dopo aver svuotato il job attuale, avviane uno nuovo con lo stesso nome.
Rielaborazione dei messaggi con snapshot Pub/Sub e Seek
In alcuni casi, dopo aver sostituito o annullato una pipeline svuotata, potresti dover rielaborare i messaggi Pub/Sub consegnati in precedenza. Ad esempio, potresti dover utilizzare la logica di business aggiornata per rielaborare i dati. Ricerca Pub/Sub è una funzionalità che consente di ripetere i messaggi da uno snapshot Pub/Sub. Puoi utilizzare la modalità Ricerca Pub/Sub con Dataflow per rielaborare i messaggi inviati dal momento della creazione dello snapshot della sottoscrizione.
Durante lo sviluppo e il test, puoi anche usare Pub/Sub Seek per ripeti ripetutamente i messaggi noti per verificare l'output dalla pipeline. Quando utilizzi Pub/Sub Seek, non cercare uno snapshot della sottoscrizione quando se l'abbonamento è utilizzato da una pipeline. In tal caso, la ricerca può invalidare Logica della filigrana di Dataflow e potrebbe influire sul metodo "exactly-once" dei messaggi Pub/Sub.
Un file consigliato gcloud CLI per l'utilizzo di Pub/Sub Seek con Dataflow delle pipeline in una finestra del terminale è il seguente:
Per creare uno snapshot della sottoscrizione, utilizza
gcloud pubsub snapshots create
:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
Per svuotare o annullare la pipeline, utilizza
gcloud dataflow jobs drain
o il comandogcloud dataflow jobs cancel
:gcloud dataflow jobs drain JOB_ID
o
gcloud dataflow jobs cancel JOB_ID
Per eseguire la ricerca fino all'istantanea, utilizza
gcloud pubsub subscriptions seek
:gcloud pubsub subscriptions seek SNAPSHOT_NAME
Eseguire il deployment di una nuova pipeline che utilizza la sottoscrizione.
Esegui pipeline parallele
Se devi evitare interruzioni della pipeline in modalità flusso durante un aggiornamento, di eseguire pipeline parallele. Crea un nuovo job di flussi di dati con lo stato aggiornato codice della pipeline ed eseguire la nuova pipeline in parallelo con quella esistente.
Quando crei la nuova pipeline, utilizza la stessa strategia di windowing che hai utilizzato per la pipeline esistente. Lascia in esecuzione la pipeline esistente fino al giorno la filigrana supera il timestamp della prima finestra completa elaborati dalla pipeline aggiornata. Quindi svuota o annulla la pipeline esistente. La pipeline aggiornata continua a funzionare al suo posto e prende il controllo in modo efficace dell'elaborazione dei dati in modo indipendente.
Il seguente diagramma illustra questa procedura.
Nel diagramma, la pipeline B è il job aggiornato che prende il controllo Pipeline A. Il valore t è il timestamp della prima finestra completa elaborati dalla pipeline B. Il valore w corrisponde alla filigrana della pipeline A. Per semplicità, si presume una filigrana perfetta senza dati in ritardo. L'elaborazione e sono rappresentate sull'asse orizzontale. Entrambe le pipeline utilizzano finestre fisse di cinque minuti (a cascata). I risultati vengono attivati dopo la filigrana oltrepassa la fine di ogni finestra.
Poiché l'output simultaneo si verifica durante il periodo di tempo in cui le due pipeline si sovrappongono, configura le due pipeline per scrivere risultati in diversi destinazioni. I sistemi downstream possono quindi utilizzare un'astrazione nei sink di destinazione, ad esempio una vista di database, per eseguire query sui risultati combinati. Questi possono inoltre utilizzare l'astrazione per deduplicare i risultati punto.
L'esempio seguente descrive l'approccio di utilizzo di una pipeline che legge i dati di input da Pub/Sub, esegue alcune elaborazioni e scrive i risultati in BigQuery.
Nello stato iniziale, la pipeline in modalità flusso esistente (pipeline A) è in esecuzione e legge i messaggi di un argomento Pub/Sub (Argomento) mediante un abbonamento (Abbonamento A). I risultati sono scritte in una tabella BigQuery (Tabella A). I risultati sono utilizzato tramite una vista BigQuery, che funge da facciata mascherare le modifiche alla tabella sottostante. Questo processo è l'applicazione di un metodo di progettazione chiamato schema della facciata. Il seguente diagramma mostra lo stato iniziale.
Crea una nuova sottoscrizione (Sottoscrizione B) per l'aggiornamento una pipeline o un blocco note personalizzato. Esegui il deployment della pipeline aggiornata (pipeline B), che legge dall'argomento Pub/Sub (argomento) utilizzando Subscription B e scrive in una tabella BigQuery separata (Tabella B). La il seguente diagramma illustra questo flusso.
A questo punto, la pipeline A e la pipeline B sono in esecuzione in parallelo e scrivere i risultati in tabelle separate. Registriamo il tempo t come Timestamp del primo intervallo di tempo completato elaborato dalla pipeline B.
Quando la filigrana della pipeline A supera il tempo t, svuota la pipeline A. Quando svuota la pipeline, tutte le finestre aperte si chiudono e l'elaborazione per vengono completati i dati mentre sono in corso. Se la pipeline contiene finestre e viene completata sono importanti (supponendo che non siano presenti dati in ritardo), prima di svuotare la pipeline A, lascia che entrambe le pipeline vengano eseguite completamente sovrapposte. Arresta il job di flussi di dati per Pipeline A dopo l'elaborazione e la scrittura di tutti i dati in corso Tabella A. Il seguente diagramma mostra questa fase.
Al momento è in esecuzione solo la pipeline B. Puoi eseguire query da un La vista BigQuery (Façade View), che funge da facciata per Tabella A e Tabella B. Per le righe che hanno lo stesso timestamp in entrambi i valori tabelle, configura la visualizzazione in modo che restituisca le righe della Tabella B; oppure Se le righe non esistono nella Tabella B, torna alla Tabella A. Le seguenti mostra la lettura (Vista facciale) della Tabella A e della Tabella B.
A questo punto, puoi eliminare la sottoscrizione A.
Quando vengono rilevati problemi con il deployment di una nuova pipeline, per semplificare il rollback. In questo esempio, Vuoi mantenere la pipeline A in esecuzione mentre monitori la pipeline B per controllare operativa. Se si verificano problemi con la pipeline B, puoi eseguire il rollback Pipeline A.
Limitazioni
Questo approccio presenta le seguenti limitazioni:
- L'esecuzione di due pipeline sullo stesso input potrebbe generare duplicati i dati nell'output. Il sistema downstream deve essere a conoscenza della tollerare i dati duplicati.
- Durante la lettura da un'origine Pub/Sub, utilizzando la stessa sottoscrizione per più pipeline è sconsigliato e può portare a problemi di correttezza. Tuttavia, in alcuni casi d'uso, come pipeline ETL (Extract, Transform, Load), l'utilizzo della stessa sottoscrizione in due pipeline può ridurre i duplicati. In questo scenario sono probabili problemi di scalabilità automatica, ma possono essere limitati mediante la funzionalità di aggiornamento dei lavori in corso. Per ulteriori informazioni, vedi Ottimizza la scalabilità automatica per le pipeline in modalità flusso Pub/Sub.
- Durante la lettura da un'origine Pub/Sub, utilizzando una seconda sottoscrizione genera duplicati, ma non causa problemi di correttezza dei dati. e la scalabilità automatica.
Gestire le mutazioni dello schema
I sistemi di gestione dei dati spesso devono adattarsi alle mutazioni dello schema nel tempo, a volte a causa di cambiamenti nei requisiti aziendali, altre volte per motivi tecnici. L'applicazione degli aggiornamenti dello schema in genere richiede un'attenta pianificazione ed esecuzione evitare interruzioni dei sistemi informativi aziendali.
Considera una pipeline che legge i messaggi che contengono payload JSON
di un argomento Pub/Sub. La pipeline converte ogni messaggio in un
TableRow
gcloud e scrive le righe in una tabella BigQuery. Lo schema
della tabella di output è simile ai messaggi elaborati dalla pipeline.
Nel diagramma seguente, lo schema è denominato Schema A.
Nel tempo, lo schema dei messaggi potrebbe cambiare in modi non banali. Ad esempio, i campi vengono aggiunti, rimossi o sostituiti. Lo schema A si evolve in un nuovo schema. Nella Nella discussione che segue, il nuovo schema viene chiamato Schema B. Nella in questo caso, la pipeline A deve essere aggiornata e lo schema della tabella di output deve per supportare lo schema B.
Per la tabella di output, puoi eseguire alcune mutazioni dello schema senza centro.
Ad esempio, puoi aggiungere nuovi campi o
modalità a colonne,
ad esempio da REQUIRED
a NULLABLE
, senza tempi di inattività.
In genere queste mutazioni non influiscono sulle query esistenti. Tuttavia, lo schema
le mutazioni che modificano o rimuovono campi dello schema esistenti interrompono le query o i risultati
in altre interruzioni. Il seguente approccio supporta le modifiche senza
richiedono tempi di inattività.
Separa i dati scritti della pipeline in una tabella entità e in una o più tabelle temporanee. La tabella dell'entità archivia i dati storici scritti dalla pipeline. Le tabelle temporanee memorizzano l'output della pipeline più recente. Puoi definire Vista facciata di BigQuery sulle tabelle entità e di gestione temporanea che consente ai consumatori di eseguire query su dati storici e aggiornati.
Il seguente diagramma rivede il flusso della pipeline precedente per includere una tabella temporanea (Tabella temporanea A), una tabella entità e una vista facciata.
Nel flusso rivisto, la pipeline A elabora i messaggi che utilizzano lo schema A e scrive l'output nella Tabella temporanea A, che ha uno schema compatibile. La la tabella entità contiene dati storici scritti dalle versioni precedenti pipeline di dati, nonché i risultati che vengono uniti periodicamente dalla tabella di gestione temporanea. I consumatori possono eseguire query su dati aggiornati, inclusi dati storici e in tempo reale, mediante la vista delle facciate.
Quando lo schema del messaggio cambia da Schema A a Schema B, potresti aggiornare il codice della pipeline in modo che sia compatibile con i messaggi che utilizzano lo schema B. La pipeline esistente deve essere aggiornata con la nuova implementazione. Di eseguire pipeline parallele, puoi assicurarti che l'elaborazione dei flussi di dati prosegua senza interruzioni. Terminazione e sostituzione delle pipeline provoca un'interruzione dell'elaborazione, perché nessuna pipeline è in esecuzione per un periodo di nel tempo.
La pipeline aggiornata scrive in una tabella temporanea aggiuntiva (Tabella temporanea B) che utilizza Schema B. Puoi utilizzare un modello orchestrato per creare la nuova tabella temporanea prima di aggiornare la pipeline. Aggiorna la vista della facciata in modo da includere i risultati della nuova tabella temporanea, utilizzando potenzialmente un passaggio del flusso di lavoro correlato.
Il seguente diagramma mostra flusso aggiornato che mostra la Tabella temporanea B con Schema B e il modo in cui la vista facciata viene aggiornata in modo da includere i contenuti della tabella principale e entrambe le tabelle temporanee.
Come processo separato dall'aggiornamento della pipeline, puoi unire la gestione temporanea le tabelle nella tabella dell'entità, periodicamente o in base alle necessità. Le seguenti il diagramma mostra come la Tabella temporanea A viene unita nella tabella dell'entità.
Passaggi successivi
- Consulta la procedura dettagliata aggiornando una pipeline esistente.