Questo documento descrive come aggiornare un job di inserimento di flussi in corso. Ti consigliamo di aggiornare il job Dataflow esistente per i seguenti motivi:
- Vuoi migliorare o migliorare in altro modo il codice della pipeline.
- Vuoi correggere dei bug nel codice della pipeline.
- Vuoi aggiornare la pipeline per gestire le modifiche al formato dei dati o per tenere conto della versione o di altre modifiche all'origine dati.
- Vuoi applicare la patch a una vulnerabilità di sicurezza relativa a Container-Optimized OS per tutti i worker Dataflow.
- Vuoi scalare una pipeline Apache Beam in modalità flusso per utilizzare un numero diverso di worker.
Puoi aggiornare i lavori in due modi:
- Aggiornamento job in corso: per i job di elaborazione in modalità flusso che utilizzano Streaming Engine, puoi aggiornare le opzioni dei job
min-num-workers
emax-num-workers
senza interrompere il job o modificare l'ID job. - Job di sostituzione: per eseguire il codice aggiornato della pipeline o per aggiornare le opzioni dei job non supportate dagli aggiornamenti dei job in corso, avvia un nuovo job che sostituisca quello esistente. Per verificare se un job di sostituzione è valido, prima di avviare il nuovo job, convalida il relativo grafico del job.
Quando aggiorni il job, il servizio Dataflow esegue un controllo di compatibilità tra il job attualmente in esecuzione e il potenziale job di sostituzione. Il controllo della compatibilità garantisce che elementi come le informazioni sullo stato intermedio e i dati nel buffer possano essere trasferiti dal job precedente al job di sostituzione.
Puoi anche utilizzare l'infrastruttura di logging integrata dell'SDK Apache Beam per registrare le informazioni quando aggiorni il job. Per maggiori informazioni, consulta
Utilizzare i log della pipeline.
Per identificare i problemi con il codice della pipeline, utilizza il livello di logging di DEBUG
.
- Per istruzioni su come aggiornare i job di inserimento di flussi che utilizzano modelli classici, consulta Aggiornare un job di inserimento di flussi di modelli personalizzato.
- Per istruzioni su come aggiornare i job di inserimento di flussi che utilizzano modelli flessibili, segui le istruzioni di gcloud CLI in questa pagina o consulta Aggiornare un job di modello flessibile.
Aggiornamento delle opzioni del job in corso
Per un job di inserimento di flussi che utilizza Streaming Engine, puoi aggiornare le seguenti opzioni del job senza interrompere il job o modificare l'ID job:
min-num-workers
: numero minimo di istanze di Compute Engine.max-num-workers
: il numero massimo di istanze di Compute Engine.
Per altri aggiornamenti del job, devi sostituire il job attuale con quello aggiornato. Per maggiori informazioni, consulta Avviare un job di sostituzione.
Esegui un aggiornamento in corso
Per eseguire un aggiornamento delle opzioni del job in corso, svolgi i passaggi che seguono.
gcloud
Usa il comando gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Sostituisci quanto segue:
- REGION: l'ID della regione del job
- JOB_ID: l'ID del job da aggiornare
Puoi anche aggiornare singolarmente --min-num-workers
e --max-num-workers
.
REST
Utilizza il metodo projects.locations.jobs.update
:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Sostituisci quanto segue:
- PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
- REGION: l'ID della regione del job
- JOB_ID: l'ID del job da aggiornare
- MINIMUM_WORKERS: numero minimo di istanze di Compute Engine
- MAXIMUM_WORKERS: numero massimo di istanze di Compute Engine
Puoi anche aggiornare min_num_workers
e max_num_workers
singolarmente.
Specifica i parametri da aggiornare nel parametro di query updateMask
e includi i valori aggiornati nel campo runtimeUpdatableParams
del corpo della richiesta. L'esempio seguente aggiorna min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Un job deve essere in esecuzione per essere idoneo per gli aggiornamenti in corso. Si verifica un errore se il job non è stato avviato o è già stato annullato. Allo stesso modo, se avvii un job di sostituzione, attendi che inizi l'esecuzione prima di inviare eventuali aggiornamenti in corso al nuovo job.
Dopo aver inviato una richiesta di aggiornamento, ti consigliamo di attendere il completamento della richiesta prima di inviare un altro aggiornamento. Visualizza i log del job per vedere quando la richiesta viene completata.
Convalida un job di sostituzione
Per verificare se un job di sostituzione è valido, prima di avviare il nuovo job, convalida il relativo grafico del job. In Dataflow, un grafico di job è una rappresentazione grafica di una pipeline. La convalida del grafico del job consente di ridurre il rischio che la pipeline incontri errori o guasti della pipeline dopo l'aggiornamento. Inoltre, puoi convalidare gli aggiornamenti senza dover arrestare il job originale, in modo che il job non presenti tempi di inattività.
Per convalidare il grafico del job, segui i passaggi per avviare un job di sostituzione. Includi l'opzione del servizio Dataflow graph_validate_only
nel comando di aggiornamento.
Java
- Passa l'opzione
--update
. - Imposta l'opzione
--jobName
inPipelineOptions
sullo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione del servizio
--dataflowServiceOptions=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transformNameMapping
. - Se stai inviando un job di sostituzione che utilizza una versione successiva dell'SDK
Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Python
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
inPipelineOptions
sullo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione del servizio
--dataflow_service_options=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
. - Se stai inviando un job di sostituzione che utilizza una versione successiva dell'SDK
Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Go
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
sullo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione del servizio
--dataflow_service_options=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
.
gcloud
Per convalidare il grafico del job per un job del modello flessibile, utilizza il comando gcloud dataflow flex-template run
con l'opzione additional-experiments
:
- Passa l'opzione
--update
. - Imposta JOB_NAME sullo stesso nome del job che vuoi aggiornare.
- Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Includi l'opzione
--additional-experiments=graph_validate_only
. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform-name-mappings
.
Ad esempio:
gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only
Sostituisci JOB_NAME con il nome del job che vuoi aggiornare.
REST
Utilizza il campo additionalExperiments
nell'oggetto
FlexTemplateRuntimeEnvironment
(modelli flessibili) o
RuntimeEnvironment
.
{
additionalExperiments : ["graph_validate_only"]
...
}
L'opzione del servizio graph_validate_only
convalida solo gli aggiornamenti della pipeline. Non usare questa opzione durante la creazione
o l'avvio delle pipeline. Per aggiornare la pipeline, avvia un job di sostituzione senza l'opzione di servizio graph_validate_only
.
Una volta completata la convalida del grafico del job, lo stato del job e i log del job mostrano i seguenti stati:
- Lo stato del job è
JOB_STATE_DONE
. - Nella console Google Cloud, lo stato del job è
Succeeded
. Nei log del job viene visualizzato il seguente messaggio:
Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
Quando la convalida del grafico del job non va a buon fine, lo stato del job e i log del job mostrano i seguenti stati:
- Lo stato del job è
JOB_STATE_FAILED
. - Nella console Google Cloud, lo stato del job è
Failed
. - Nei log del job viene visualizzato un messaggio che descrive l'errore di incompatibilità. I contenuti del messaggio dipendono dall'errore.
Avvia un job di sostituzione
Puoi sostituire un job esistente per i seguenti motivi:
- Per eseguire il codice della pipeline aggiornato.
- Per aggiornare le opzioni dei job che non supportano gli aggiornamenti in corso.
Per verificare se un job di sostituzione è valido, prima di avviare il nuovo job convalida il relativo grafico del job.
Quando avvii un job di sostituzione, imposta le seguenti opzioni della pipeline per eseguire il processo di aggiornamento, oltre alle normali opzioni del job:
Java
- Passa l'opzione
--update
. - Imposta l'opzione
--jobName
inPipelineOptions
sullo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transformNameMapping
. - Se stai inviando un job di sostituzione che utilizza una versione successiva dell'SDK
Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Python
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
inPipelineOptions
sullo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
. - Se stai inviando un job di sostituzione che utilizza una versione successiva dell'SDK
Apache Beam, imposta
--updateCompatibilityVersion
sulla versione dell'SDK Apache Beam utilizzata nel job originale.
Go
- Passa l'opzione
--update
. - Imposta l'opzione
--job_name
sullo stesso nome del job che vuoi aggiornare. - Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform_name_mapping
.
gcloud
Per aggiornare un job del modello flessibile utilizzando gcloud CLI, utilizza il comando gcloud dataflow flex-template run
. L'aggiornamento di altri job tramite gcloud CLI non è supportato.
- Passa l'opzione
--update
. - Imposta JOB_NAME sullo stesso nome del job che vuoi aggiornare.
- Imposta l'opzione
--region
sulla stessa regione del job che vuoi aggiornare. - Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando l'opzione
--transform-name-mappings
.
REST
Queste istruzioni mostrano come aggiornare job non basati sul modello utilizzando l'API REST. Per utilizzare l'API REST per aggiornare un job di modello classico, consulta Aggiornare un job di inserimento di flussi di modello personalizzato. Per utilizzare l'API REST per aggiornare un job del modello flessibile, consulta Aggiornare un job del modello flessibile.
Recupera la risorsa
job
per il job che vuoi sostituire utilizzando il metodoprojects.locations.jobs.get
. Includi il parametro di queryview
con il valoreJOB_VIEW_DESCRIPTION
. L'inclusione diJOB_VIEW_DESCRIPTION
limita la quantità di dati nella risposta in modo che la richiesta successiva non superi i limiti di dimensione. Se hai bisogno di informazioni più dettagliate sul job, utilizza il valoreJOB_VIEW_ALL
.GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
Sostituisci i seguenti valori:
- PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
- REGION: la regione del job che vuoi aggiornare
- JOB_ID: l'ID del job da aggiornare
Per aggiornare il job, utilizza il metodo
projects.locations.jobs.create
. Nel corpo della richiesta, utilizza la risorsajob
che hai recuperato.POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs { "id": JOB_ID, "replaceJobId": JOB_ID, "name": JOB_NAME, "type": "JOB_TYPE_STREAMING", "transformNameMapping": { string: string, ... }, }
Sostituisci quanto segue:
- JOB_ID: lo stesso ID job del job che vuoi aggiornare.
- JOB_NAME: lo stesso nome del job che vuoi aggiornare.
Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e passarla utilizzando il campo
transformNameMapping
.(Facoltativo) Per inviare la richiesta utilizzando curl (Linux, macOS o Cloud Shell), salva la richiesta in un file JSON ed esegui questo comando:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
Sostituisci FILE_PATH con il percorso del file JSON che contiene il corpo della richiesta.
Specifica il nome del job di sostituzione
Java
Quando avvii il job di sostituzione, il valore trasmesso per l'opzione --jobName
deve corrispondere esattamente al nome del job che vuoi sostituire.
Python
Quando avvii il job di sostituzione, il valore trasmesso per l'opzione --job_name
deve corrispondere esattamente al nome del job che vuoi sostituire.
Go
Quando avvii il job di sostituzione, il valore trasmesso per l'opzione --job_name
deve corrispondere esattamente al nome del job che vuoi sostituire.
gcloud
Quando avvii il job di sostituzione, JOB_NAME deve corrispondere esattamente al nome del job che vuoi sostituire.
REST
Imposta il valore del campo replaceJobId
sullo stesso ID job del job che vuoi aggiornare. Per trovare il valore corretto del nome job, seleziona il job precedente nell'interfaccia di monitoraggio di Dataflow.
Nel riquadro laterale Informazioni job, individua il campo ID job.
Per trovare il valore corretto del nome job, seleziona il job precedente nell'interfaccia di Dataflow Monitoring. Quindi, nel riquadro laterale Informazioni job, individua il campo Nome job:
In alternativa, esegui una query su un elenco di job esistenti utilizzando l'interfaccia a riga di comando di Dataflow.
Inserisci il comando gcloud dataflow jobs list
nella shell o nella finestra del terminale per ottenere un elenco dei job Dataflow nel tuo progetto Google Cloud e trova il campo NAME
per il job da sostituire:
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
Crea un mapping di trasformazione
Se la pipeline sostitutiva modifica i nomi delle trasformazioni dai nomi della pipeline precedente, il servizio Dataflow richiede una mappatura delle trasformazioni. La mappatura delle trasformazioni denominate nel codice della pipeline precedente viene mappata ai nomi nel codice della pipeline sostitutivo.
Java
Passa il mapping utilizzando l'opzione della riga di comando --transformNameMapping
, con il seguente formato generale:
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire le voci di mappatura in --transformNameMapping
solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.
Quando esegui con --transformNameMapping
, potrebbe essere necessario eseguire l'escape delle virgolette in base alle esigenze della tua shell. Ad esempio, in Bash:
--transformNameMapping='{"oldTransform1":"newTransform1",...}'
Python
Passa il mapping utilizzando l'opzione della riga di comando --transform_name_mapping
, con il seguente formato generale:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire le voci di mappatura in --transform_name_mapping
solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.
Quando esegui con --transform_name_mapping
, potrebbe essere necessario eseguire l'escape delle virgolette in base alle esigenze della tua shell. Ad esempio, in Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
Go
Passa il mapping utilizzando l'opzione della riga di comando --transform_name_mapping
, con il seguente formato generale:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire le voci di mappatura in --transform_name_mapping
solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.
Quando esegui con --transform_name_mapping
, potrebbe essere necessario eseguire l'escape delle virgolette in base alle esigenze della tua shell. Ad esempio, in Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
gcloud
Passa la mappatura utilizzando l'opzione --transform-name-mappings
, nel seguente formato generale:
--transform-name-mappings= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Devi fornire le voci di mappatura in --transform-name-mappings
solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.
Quando esegui con --transform-name-mappings
, potrebbe essere necessario eseguire l'escape delle virgolette in base alle esigenze della tua shell. Ad esempio, in Bash:
--transform-name-mappings='{"oldTransform1":"newTransform1",...}'
REST
Passa la mappatura utilizzando il campo transformNameMapping
con il seguente formato generale:
"transformNameMapping": {
oldTransform1: newTransform1,
oldTransform2: newTransform2,
...
}
Devi fornire le voci di mappatura in transformNameMapping
solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.
Determinare i nomi delle trasformazioni
Il nome della trasformazione in ogni istanza nella mappa è il nome che hai fornito quando hai applicato la trasformazione nella pipeline. Ad esempio:
Java
.apply("FormatResults", ParDo
.of(new DoFn<KV<String, Long>>, String>() {
...
}
}))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Go
// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)
Puoi anche ottenere i nomi della trasformazione per il job precedente esaminando il grafico di esecuzione del job nell'interfaccia di monitoraggio di Dataflow:
Denominazione della trasformazione composita
I nomi delle trasformazioni sono gerarchici e si basano sulla gerarchia di trasformazione nella pipeline. Se la tua pipeline ha una trasformazione composita, le trasformazioni nidificate vengono denominate in base alla trasformazione contenitore. Ad esempio, supponi che la tua pipeline contenga una trasformazione composita denominata CountWidgets
, che contiene una trasformazione interna denominata Parse
. Il nome completo
della trasformazione è CountWidgets/Parse
e devi specificarlo
nel mapping della trasformazione.
Se la nuova pipeline mappa una trasformazione composita con un nome diverso, anche tutte le trasformazioni nidificate vengono rinominate automaticamente. Devi specificare i nomi modificati per le trasformazioni interne nella mappatura.
Esegui il refactoring della gerarchia di trasformazione
Se la pipeline sostitutiva utilizza una gerarchia di trasformazione diversa rispetto alla pipeline precedente, devi dichiarare esplicitamente la mappatura. Potresti avere una gerarchia di trasformazione diversa perché hai eseguito il refactoring delle trasformazioni composte oppure la pipeline dipende da una trasformazione composita di una libreria modificata.
Ad esempio, la pipeline precedente ha applicato una trasformazione composita, CountWidgets
,
che conteneva una trasformazione interna denominata Parse
. La pipeline sostitutiva esegue il refactoring di CountWidgets
e nidifica Parse
all'interno di un'altra trasformazione denominata Scan
. Affinché l'aggiornamento abbia esito positivo, devi mappare esplicitamente il nome completo della trasformazione nella pipeline precedente (CountWidgets/Parse
) al nome della trasformazione nella nuova pipeline (CountWidgets/Scan/Parse
):
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornire una mappatura null. Supponiamo che la pipeline sostitutiva rimuova completamente la trasformazione CountWidgets/Parse
:
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornire una mappatura null. Supponiamo che la pipeline sostitutiva rimuova completamente la trasformazione CountWidgets/Parse
:
--transform_name_mapping={"CountWidgets/Parse":""}
Go
--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}
Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornire una mappatura null. Supponiamo che la pipeline sostitutiva rimuova completamente la trasformazione CountWidgets/Parse
:
--transform_name_mapping={"CountWidgets/main.Parse":""}
gcloud
--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornire una mappatura null. Supponiamo che la pipeline sostitutiva rimuova completamente la trasformazione CountWidgets/Parse
:
--transform-name-mappings={"CountWidgets/main.Parse":""}
REST
"transformNameMapping": {
CountWidgets/Parse: CountWidgets/Scan/Parse
}
Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornire una mappatura null. Supponiamo che la pipeline sostitutiva rimuova completamente la trasformazione CountWidgets/Parse
:
"transformNameMapping": {
CountWidgets/main.Parse: null
}
Effetti della sostituzione di un job
Quando sostituisci un job esistente, ne viene eseguito uno nuovo che esegue il codice della pipeline aggiornato. Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività mentre il job esistente si arresta, viene eseguito il controllo della compatibilità e viene avviato il nuovo job.
Il job di sostituzione conserva i seguenti elementi:
- I dati sullo stato intermedio del job precedente. Le cache in memoria non vengono salvate.
- Record di dati in buffer o metadati attualmente "in corso" del job precedente. Ad esempio, alcuni record nella pipeline potrebbero essere sottoposti a buffering in attesa della risoluzione di una finestra.
- Aggiornamenti delle opzioni del job in corso applicati al job precedente.
Dati sullo stato intermedio
I dati sullo stato intermedio del job precedente vengono conservati. I dati di stato non includono cache in memoria. Se vuoi conservare i dati della cache in memoria quando aggiorni la pipeline, come soluzione alternativa, esegui il refactoring della pipeline per convertire le cache in dati di stato o input laterali. Per ulteriori informazioni sull'utilizzo degli input laterali, consulta Pattern di input laterali nella documentazione di Apache Beam.
Le pipeline in modalità flusso hanno limiti di dimensioni per ValueState
e per gli input secondari.
Di conseguenza, se hai cache di grandi dimensioni che vuoi conservare, potresti dover utilizzare un'unità di archiviazione esterna, come Memorystore o Bigtable.
Dati in corso
I dati "in corso" vengono ancora elaborati dalle trasformazioni nella nuova pipeline. Tuttavia, le ulteriori trasformazioni aggiunte al codice della pipeline sostitutiva potrebbero o non avere effetto, a seconda di dove viene eseguito il buffer dei record. In questo esempio, la pipeline esistente ha le seguenti trasformazioni:
Java
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, FormatStrings)
Puoi sostituire il job con il codice della nuova pipeline, come segue:
Java
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, RemoveStringsStartingWithA) beam.ParDo(s, FormatStrings)
Anche se aggiungi una trasformazione per escludere le stringhe che iniziano con la
lettera "A", la trasformazione successiva (FormatStrings
) potrebbe comunque visualizzare stringhe
in fase di buffer o in corso che iniziano con "A" e che sono state trasferite dal job precedente.
Modifica finestra
Puoi modificare le strategie di windowing e di attivazione per gli elementi PCollection
nella pipeline sostitutiva, ma presta attenzione.
La modifica delle strategie di windowing o di trigger non influisce sui dati già memorizzati nel buffer o in corso di pubblicazione.
Ti consigliamo di provare solo a apportare modifiche più piccole al windowing della pipeline, ad esempio cambiando la durata dei periodi con tempo fisso o scorrevole. Apportare modifiche significative a windowing o trigger, ad esempio la modifica dell'algoritmo di windowing, potrebbero avere risultati imprevedibili sull'output della pipeline.
Controllo della compatibilità del job
Quando avvii il job di sostituzione, il servizio Dataflow esegue un controllo della compatibilità tra il job di sostituzione e il job precedente. Se il controllo della compatibilità ha esito positivo, il job precedente viene interrotto. Il job di sostituzione viene quindi avviato nel servizio Dataflow mantenendo lo stesso nome del job. Se il controllo della compatibilità non va a buon fine, il job precedente continua a essere eseguito sul servizio Dataflow e il job di sostituzione restituisce un errore.
Java
A causa di una limitazione, devi utilizzare l'esecuzione di blocco per visualizzare gli errori relativi a tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione attuale è costituita dai seguenti passaggi:
- Utilizza pipeline.run().waitUntilFinish() nel codice della pipeline.
- Esegui il programma della pipeline sostitutiva con l'opzione
--update
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di blocco runner digitando
Ctrl+C
.
In alternativa, puoi monitorare lo stato del job di sostituzione nell'interfaccia di monitoraggio di Dataflow. Se il job è stato avviato correttamente, ha superato il controllo di compatibilità.
Python
A causa di una limitazione, devi utilizzare l'esecuzione di blocco per visualizzare gli errori relativi a tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione attuale è costituita dai seguenti passaggi:
- Utilizza pipeline.run().wait_until_finish() nel codice della pipeline.
- Esegui il programma della pipeline sostitutiva con l'opzione
--update
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di blocco runner digitando
Ctrl+C
.
In alternativa, puoi monitorare lo stato del job di sostituzione nell'interfaccia di monitoraggio di Dataflow. Se il job è stato avviato correttamente, ha superato il controllo di compatibilità.
Go
A causa di una limitazione, devi utilizzare l'esecuzione di blocco per visualizzare gli errori relativi a tentativi di aggiornamento non riusciti nella console o nel terminale.
In particolare, devi specificare l'esecuzione che non blocca utilizzando i flag --execute_async
o --async
. La soluzione alternativa attuale è costituita dai seguenti passaggi:
- Esegui il programma della pipeline sostitutiva con l'opzione
--update
e senza i flag--execute_async
o--async
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di blocco runner digitando
Ctrl+C
.
gcloud
A causa di una limitazione, devi utilizzare l'esecuzione di blocco per visualizzare gli errori relativi a tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione attuale è costituita dai seguenti passaggi:
- Per le pipeline Java, utilizza pipeline.run().waitUntilFinish() nel codice della pipeline. Per le pipeline Python, utilizza pipeline.run().wait_until_finish() nel codice della pipeline. Per le pipeline Go, segui i passaggi nella scheda Go.
- Esegui il programma della pipeline sostitutiva con l'opzione
--update
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di blocco runner digitando
Ctrl+C
.
REST
A causa di una limitazione, devi utilizzare l'esecuzione di blocco per visualizzare gli errori relativi a tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione attuale è costituita dai seguenti passaggi:
- Per le pipeline Java, utilizza pipeline.run().waitUntilFinish() nel codice della pipeline. Per le pipeline Python, utilizza pipeline.run().wait_until_finish() nel codice della pipeline. Per le pipeline Go, segui i passaggi nella scheda Go.
- Esegui il programma di pipeline sostitutivo con il campo
replaceJobId
. - Attendi che il job di sostituzione superi il controllo di compatibilità.
- Esci dal processo di blocco runner digitando
Ctrl+C
.
Il controllo di compatibilità utilizza la mappatura della trasformazione fornita per garantire che Dataflow possa trasferire i dati di stato intermedio dai passaggi del job precedente al job di sostituzione. Il controllo della compatibilità assicura inoltre che i PCollection
nella pipeline utilizzino gli stessi codificatori.
La modifica di un Coder
può causare l'esito negativo del controllo di compatibilità perché i dati in corso o i record con buffer potrebbero non essere serializzati correttamente nella pipeline di sostituzione.
Evitare le interruzioni di compatibilità
Determinate differenze tra la pipeline precedente e quella sostitutiva possono causare l'esito negativo del controllo della compatibilità. Queste differenze includono:
- Modifica del grafico della pipeline senza fornire una mappatura. Quando aggiorni un job, Dataflow tenta di abbinare le trasformazioni del job precedente a quelle del job di sostituzione. Questo processo di corrispondenza consente a Dataflow di trasferire i dati sullo stato intermedio per ogni passaggio. Se rinomini o rimuovi qualche passaggio, devi fornire una mappatura della trasformazione in modo che Dataflow possa abbinare i dati sullo stato di conseguenza.
- Modifica degli input secondari per un passaggio. L'aggiunta di input aggiuntivi o la loro rimozione da una trasformazione nella pipeline sostitutiva comporta l'esito negativo del controllo di compatibilità.
- Cambia il codificatore per un passaggio. Quando aggiorni un job, Dataflow conserva tutti i record di dati attualmente sottoposti a buffer e li gestisce nel job di sostituzione. Ad esempio, potrebbero verificarsi dati nel buffer durante la risoluzione del windowing. Se il job di sostituzione utilizza una codifica dei dati diversa o incompatibile, Dataflow non è in grado di serializzare o deserializzare questi record.
Rimozione di un'operazione "stateful" dalla pipeline. Se rimuovi le operazioni stateful dalla pipeline, il job di sostituzione potrebbe non superare il controllo di compatibilità. Dataflow può fondere più passaggi per una maggiore efficienza. Se rimuovi un'operazione che dipende dallo stato da un passaggio fuso, il controllo non va a buon fine. Le operazioni stateful includono:
- Trasformazioni che producono o consumano input collaterali.
- Lettura I/O.
- Trasformazioni che utilizzano lo stato con chiave.
- Trasformazioni con unione di finestre.
Modifica delle variabili
DoFn
stateful. Per i job di elaborazione in modalità flusso in corso, se la pipeline includeDoFn
stateful, la modifica delle variabiliDoFn
stateful potrebbe causare un errore della pipeline.Tentativo di eseguire il job di sostituzione in una zona geografica diversa. Esegui il job di sostituzione nella stessa zona in cui hai eseguito il job precedente.
Aggiornamento degli schemi
Apache Beam consente agli elementi PCollection
di avere schemi con campi denominati, nel qual caso non sono necessari programmatori espliciti. Se i nomi e i tipi di campi per uno schema specifico non sono stati modificati (inclusi i campi nidificati), lo schema non comporterà l'esito negativo del controllo degli aggiornamenti. Tuttavia, l'aggiornamento potrebbe essere comunque bloccato se altri segmenti della nuova pipeline non sono compatibili.
Evolvere gli schemi
Spesso è necessario evolvere lo schema di PCollection
a causa dell'evoluzione dei requisiti aziendali. Il servizio Dataflow consente di apportare le seguenti modifiche a uno schema durante l'aggiornamento della pipeline:
- Aggiunta di uno o più nuovi campi a uno schema, inclusi i campi nidificati.
- Impostazione di un tipo di campo obbligatorio (non nullo) come facoltativo (nullo).
La rimozione di campi, la modifica dei nomi dei campi o dei tipi di campi non è consentita durante l'aggiornamento.
Passare dati aggiuntivi in un'operazione ParDo esistente
Puoi passare dati aggiuntivi (fuori banda) a un'operazione ParDo esistente utilizzando uno dei seguenti metodi, a seconda del caso d'uso:
- Serializza le informazioni come campi nella sottoclasse
DoFn
. - Tutte le variabili a cui fanno riferimento i metodi in un
DoFn
anonimo vengono serializzate automaticamente. - Calcola i dati all'interno di
DoFn.startBundle()
. - Trasmetti dati utilizzando
ParDo.withSideInputs
.
Per ulteriori informazioni, consulta le seguenti pagine:
- Guida alla programmazione Apache Beam: ParDo, in particolare le sezioni sulla creazione di un DoFn e di input secondari.
- Riferimento all'SDK Apache Beam per Java: ParDo