Aggiornare una pipeline esistente

Questo documento descrive come aggiornare un job di streaming in corso. Potresti voler aggiornare le impostazioni Job Dataflow per i seguenti motivi:

  • Vuoi migliorare o ottimizzare il codice della pipeline.
  • Vuoi correggere i bug nel codice della pipeline.
  • Vuoi aggiornare la pipeline per gestire le modifiche al formato dei dati o per tenere conto della versione o di altre modifiche nell'origine dati.
  • Vuoi applicare la patch a una vulnerabilità di sicurezza relativa a Container-Optimized OS per tutti i worker Dataflow.
  • Vuoi scalare una pipeline Apache Beam in modalità flusso usano un numero diverso di worker.

Puoi aggiornare i lavori in due modi:

  • Aggiornamento dei job in esecuzione: per i job di streaming che utilizzano Streaming Engine, puoi aggiornare le opzioni dei job min-num-workers e max-num-workers senza interrompere il job o modificare l'ID job.
  • Job sostitutivo: per eseguire il codice aggiornato della pipeline o per aggiornare le opzioni di job non supportate dagli aggiornamenti in corso, avvia un nuovo job che sostituisca quello esistente. Per verificare se un job sostitutivo è valido, prima di avviare il nuovo job, convalida il relativo grafico del job.

Quando aggiorni il job, il servizio Dataflow esegue un controllo di compatibilità tra il job in esecuzione e il potenziale job sostitutivo. Il controllo di compatibilità assicura che, ad esempio, informazioni sullo stato intermedio e i dati memorizzati nel buffer possono essere quello precedente a quello di sostituzione.

Puoi anche utilizzare l'infrastruttura di logging integrata dell'SDK Apache Beam per registrare le informazioni quando aggiorni il job. Per ulteriori informazioni, consulta Utilizzare i log della pipeline. Per identificare i problemi con il codice della pipeline, utilizza il livello di logging DEBUG.

Aggiornamento dell'opzione del job in corso

Per un job di flussi di dati che utilizza Streaming Engine, puoi aggiornare le seguenti opzioni del job senza interrompere il job o modificarne l'ID:

  • min-num-workers: il numero minimo di istanze Compute Engine.
  • max-num-workers: il numero massimo di istanze Compute Engine.
  • worker-utilization-hint: il utilizzo CPU target, nell'intervallo [0,1, 0,9]

Per gli altri aggiornamenti del job, devi sostituire il job corrente con il job aggiornato. Per ulteriori informazioni, consulta Lanciare un job sostitutivo.

Esegui un aggiornamento in corso

Per eseguire l'aggiornamento delle opzioni di job in corso di pubblicazione, svolgi i passaggi che seguono.

gcloud

Usa il comando gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

Sostituisci quanto segue:

  • REGION: l'ID della regione del job
  • MINIMUM_WORKERS: il numero minimo di Compute Engine istanze
  • MAXIMUM_WORKERS: il numero massimo di Compute Engine istanze
  • TARGET_UTILIZATION: un valore nell'intervallo [0,1, 0,9]
  • JOB_ID: l'ID del job da aggiornare

Puoi anche aggiornare --min-num-workers, --max-num-workers e worker-utilization-hint singolarmente.

REST

Utilizza il metodo projects.locations.jobs.update:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS,
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Sostituisci quanto segue:

  • MASK: un elenco separato da virgole di parametri da aggiornare, dal seguenti:
    • runtime_updatable_params.max_num_workers
    • runtime_updatable_params.min_num_workers
    • runtime_updatable_params.worker_utilization_hint
  • PROJECT_ID: l'ID progetto Google Cloud del job Dataflow
  • REGION: l'ID della regione del job
  • JOB_ID: l'ID del job da aggiornare
  • MINIMUM_WORKERS: il numero minimo di Compute Engine istanze
  • MAXIMUM_WORKERS: il numero massimo di Compute Engine istanze
  • TARGET_UTILIZATION: un valore compreso nell'intervallo [0,1, 0,9]

Puoi anche aggiornare min_num_workers, max_num_workers e worker_utilization_hint singolarmente. Specifica quali parametri aggiornare nel parametro di query updateMask. includi i valori aggiornati nel campo runtimeUpdatableParams del corpo della richiesta. L'esempio seguente aggiorna min_num_workers:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Un job deve essere in stato di esecuzione per essere idoneo per gli aggiornamenti in corso. Un l'errore si verifica se il job non è stato avviato o è già stato annullato. Analogamente, se avvii un job sostitutivo, attendi che inizi prima di inviare aggiornamenti in corso al nuovo job.

Dopo aver inviato una richiesta di aggiornamento, ti consigliamo di attendere il suo completamento prima di inviare un altro aggiornamento. Visualizza i log del job per vedere quando la richiesta viene completata.

Convalida un job di sostituzione

Per verificare se un job sostitutivo è valido, prima di avviare il nuovo job, per convalidare il grafico del job. In Dataflow, un grafo di job è una rappresentazione grafica di una pipeline. Convalidando il grafico del job, riduci il rischio che la pipeline abbia errori o guasti della pipeline dopo l'aggiornamento. Inoltre, puoi convalidare gli aggiornamenti senza dover interrompere quello originale in modo che non presenti tempi di inattività.

Per convalidare il grafo dei job, segui i passaggi per avviare un job sostitutivo. Includi graph_validate_only Opzione di servizio Dataflow nel comando update.

Java

  • Passa l'opzione --update.
  • Imposta l'opzione --jobName in PipelineOptions con lo stesso nome del job che vuoi aggiornare.
  • Imposta l'opzione --region sulla stessa regione del job che vuoi aggiornare.
  • Includi l'opzione per il servizio --dataflowServiceOptions=graph_validate_only.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e trasmetterla utilizzando l'opzione--transformNameMapping.
  • Se stai inviando un job sostitutivo che utilizza una versione successiva del SDK Apache Beam, imposta --updateCompatibilityVersion sul Versione dell'SDK Apache Beam utilizzata nel job originale.

Python

  • Supera l'opzione --update.
  • Imposta l'opzione --job_name in PipelineOptions con lo stesso nome del job che vuoi aggiornare.
  • Imposta l'opzione --region sulla stessa regione della regione del job che vuoi aggiornare.
  • Includi l'opzione per il servizio --dataflow_service_options=graph_validate_only.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire trasforma la mappatura e lo passi utilizzando il metodo Opzione --transform_name_mapping.
  • Se invii un job sostitutivo che utilizza una versione successiva dell'SDK Apache Beam, imposta --updateCompatibilityVersion sulla versione dell'SDK Apache Beam utilizzata nel job originale.

Vai

  • Passa l'opzione --update.
  • Imposta l'opzione --job_name con lo stesso nome del job che vuoi aggiornare.
  • Imposta l'opzione --region sulla stessa regione della regione del job che vuoi aggiornare.
  • Includi l'opzione per il servizio --dataflow_service_options=graph_validate_only.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire trasforma la mappatura e lo passi utilizzando il metodo Opzione --transform_name_mapping.

gcloud

Per convalidare il grafico per un job di modello flessibile, utilizza la gcloud dataflow flex-template run con l'opzione additional-experiments:

  • Supera l'opzione --update.
  • Imposta JOB_NAME sullo stesso nome del job da aggiornare.
  • Imposta l'opzione --region sulla stessa regione del job che vuoi aggiornare.
  • Includi l'opzione --additional-experiments=graph_validate_only.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e trasmetterla utilizzando l'opzione--transform-name-mappings.

Ad esempio:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

Sostituisci JOB_NAME con il nome del job da aggiornare.

REST

Utilizza il campo additionalExperiments nell'oggetto FlexTemplateRuntimeEnvironment (modelli flessibili) o RuntimeEnvironment.

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

L'opzione di servizio graph_validate_only convalida solo gli aggiornamenti della pipeline. Non utilizzare questa opzione per creare o avviare le pipeline. Per aggiornare la pipeline, avvia un job di sostituzione senza Opzione di servizio graph_validate_only.

Quando la convalida del grafico del job è andata a buon fine, lo stato del job e i log del job mostrano i seguenti stati:

  • Lo stato del job è JOB_STATE_DONE.
  • Nella console Google Cloud, lo stato del job è Succeeded.
  • Nei log dei job viene visualizzato il seguente messaggio:

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

Quando la convalida del grafico del job ha esito negativo, lo stato del job i log dei job mostrano i seguenti stati:

  • Lo stato del job è JOB_STATE_FAILED.
  • Nella console Google Cloud, lo stato del job è Failed.
  • Viene visualizzato un messaggio nella log dei job che descrivono i errore di incompatibilità. I contenuti del messaggio dipendono dall'errore.

Avvia un job sostitutivo

Potresti sostituire un job esistente per i seguenti motivi:

  • Per eseguire il codice aggiornato della pipeline.
  • Per aggiornare le opzioni dei job che non supportano in tempo reale.

Per verificare se un job sostitutivo è valido, prima di lanciare il nuovo job, convalida il relativo grafo dei job.

Quando avvii un job di sostituzione, imposta le seguenti opzioni della pipeline per eseguire il processo di aggiornamento, oltre alle opzioni standard del job:

Java

  • Passa l'opzione --update.
  • Imposta l'opzione --jobName in PipelineOptions con lo stesso nome del job che vuoi aggiornare.
  • Imposta l'opzione --region sulla stessa regione del job che vuoi aggiornare.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire trasforma la mappatura e lo passi utilizzando il metodo Opzione --transformNameMapping.
  • Se invii un job sostitutivo che utilizza una versione successiva dell'SDK Apache Beam, imposta --updateCompatibilityVersion sulla versione dell'SDK Apache Beam utilizzata nel job originale.

Python

  • Supera l'opzione --update.
  • Imposta l'opzione --job_name in PipelineOptions con lo stesso nome del job che vuoi aggiornare.
  • Imposta l'opzione --region sulla stessa regione della regione del job che vuoi aggiornare.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire trasforma la mappatura e lo passi utilizzando il metodo Opzione --transform_name_mapping.
  • Se invii un job sostitutivo che utilizza una versione successiva dell'SDK Apache Beam, imposta --updateCompatibilityVersion sulla versione dell'SDK Apache Beam utilizzata nel job originale.

Vai

  • Passa l'opzione --update.
  • Imposta l'opzione --job_name con lo stesso nome del job che vuoi aggiornare.
  • Imposta l'opzione --region sulla stessa regione della regione del job che vuoi aggiornare.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire trasforma la mappatura e lo passi utilizzando il metodo Opzione --transform_name_mapping.

gcloud

Per aggiornare un job del modello flessibile utilizzando gcloud CLI, utilizza gcloud dataflow flex-template run . L'aggiornamento di altri job utilizzando gcloud CLI non è supportato.

  • Passa l'opzione --update.
  • Imposta JOB_NAME sullo stesso nome del job da aggiornare.
  • Imposta l'opzione --region sulla stessa regione del job che vuoi aggiornare.
  • Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e trasmetterla utilizzando l'opzione--transform-name-mappings.

REST

Queste istruzioni mostrano come aggiornare i job non modello utilizzando il REST tramite Google Cloud CLI o tramite l'API Compute Engine. Per utilizzare l'API REST per aggiornare un job di streaming con modello classico, consulta Aggiornare un job di streaming con modello personalizzato. Per utilizzare l'API REST per aggiornare un job di modello flessibile, consulta Aggiornare un job di modello flessibile.

  1. Recupera la risorsa job per il job che vuoi sostituire utilizzando il metodo projects.locations.jobs.get. Includi il parametro di query view con il valore JOB_VIEW_DESCRIPTION. L'inclusione di JOB_VIEW_DESCRIPTION limita la quantità di dati nella risposta in modo che la richiesta successiva non superi i limiti di dimensione. Se hai bisogno informazioni più dettagliate sul job, utilizza il valore JOB_VIEW_ALL.

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    Sostituisci i seguenti valori:

    • PROJECT_ID: l'ID del progetto Google Cloud della Job Dataflow
    • REGION: la regione del job da aggiornare
    • JOB_ID: l'ID del job da aggiornare
  2. Per aggiornare il job, utilizza il metodo projects.locations.jobs.create. Nel corpo della richiesta, utilizza la risorsa job che hai recuperato.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    Sostituisci quanto segue:

    • JOB_ID: lo stesso ID job dell'attività che vuoi aggiornare.
    • JOB_NAME: lo stesso nome del job che vuoi aggiornare.

    Se i nomi delle trasformazioni nella pipeline sono cambiati, devi fornire una mappatura delle trasformazioni e trasmetterla utilizzando il campo transformNameMapping.

  3. (Facoltativo) Per inviare la richiesta utilizzando curl (Linux, macOS o Cloud Shell), salva il a un file JSON ed esegui questo comando:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    Sostituisci FILE_PATH con il percorso del file JSON contenente il corpo della richiesta.

Specifica il nome del job sostitutivo

Java

Quando lanci il job di sostituzione, il valore che passi per --jobName deve corrispondere esattamente al nome del job da sostituire.

Python

Quando avvii il job sostitutivo, il valore che passi per l'opzione --job_name deve corrispondere esattamente al nome del job che vuoi sostituire.

Vai

Quando avvii il job sostitutivo, il valore che passi per l'opzione --job_name deve corrispondere esattamente al nome del job che vuoi sostituire.

gcloud

Quando avvii il job sostitutivo, JOB_NAME deve corrispondere esattamente al nome del job che vuoi sostituire.

REST

Imposta il valore del campo replaceJobId sullo stesso ID job del job da aggiornare. Per trovare il valore corretto per il nome del job, seleziona il job precedente nel Interfaccia di monitoraggio di Dataflow. Poi, nel riquadro laterale Informazioni sul job, individua il campo ID job.

Per trovare il valore corretto del nome del job, seleziona il job precedente nell'interfaccia di monitoraggio Dataflow. Quindi, nel Nel riquadro laterale Informazioni lavoro, individua il campo Nome job:

Il riquadro laterale Informazioni sul job per un job Dataflow in esecuzione.
Figura 1: riquadro laterale Informazioni sul job per un job Dataflow in esecuzione con il campo Nome job.

In alternativa, esegui una query su un elenco di job esistenti utilizzando l'interfaccia a riga di comando Dataflow. Inserisci il comando gcloud dataflow jobs list nella finestra della shell o del terminale per ottenere un elenco di job Dataflow nel tuo progetto Google Cloud e individua il campo NAME per il job che vuoi sostituire:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

Creare una mappatura di trasformazione

Se la pipeline sostitutiva cambia i nomi delle trasformazioni rispetto ai nomi presenti precedente, il servizio Dataflow richiede una trasformazione il mapping. La mappatura delle trasformazioni mappa le trasformazioni denominate nel codice della pipeline precedente ai nomi nel codice della pipeline di sostituzione.

Java

Passa la mappatura utilizzando l'opzione a riga di comando --transformNameMapping, utilizzando il seguente formato generale:

--transformNameMapping= . 
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Devi fornire le voci di mappatura in --transformNameMapping solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.

Quando esegui il comando con --transformNameMapping, potresti dover eseguire l'escape delle virgolette in base alla tua shell. Ad esempio, in Bash:

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

Trasmetti il mapping utilizzando l'opzione della riga di comando --transform_name_mapping. utilizzando il seguente formato generale:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Devi fornire le voci di mappatura in --transform_name_mapping solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.

Quando esegui il comando con --transform_name_mapping, potresti dover eseguire l'escape delle virgolette in base alla tua shell. Ad esempio, in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Vai

Trasmetti il mapping utilizzando l'opzione della riga di comando --transform_name_mapping. utilizzando il seguente formato generale:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Devi fornire le voci di mappatura in --transform_name_mapping solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.

Quando corri con --transform_name_mapping, potresti dover fuggire le citazioni appropriate per la tua shell. Ad esempio, in Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

Trasmetti la mappatura utilizzando --transform-name-mappings usando il seguente formato generale:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Devi fornire le voci di mappatura in --transform-name-mappings solo per trasformare i nomi che sono cambiati tra la pipeline precedente e una pipeline di sostituzione.

Quando esegui il comando con --transform-name-mappings, potrebbe essere necessario inserire i caratteri di escape nelle virgolette in base alla tua shell. Per esempio, in Bash:

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

Passa la mappatura utilizzando il campo transformNameMapping , con il seguente formato generale:

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

Devi fornire le voci di mappatura in transformNameMapping solo per i nomi delle trasformazioni che sono cambiati tra la pipeline precedente e la pipeline di sostituzione.

Determinare i nomi delle trasformazioni

Il nome della trasformazione in ogni istanza della mappa è il nome che hai fornito quando hai applicato la trasformazione nella pipeline. Ad esempio:

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Vai

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

Puoi anche ottenere i nomi delle trasformazioni per il tuo job precedente esaminando di esecuzione del job Interfaccia di monitoraggio di Dataflow:

Il grafico di esecuzione di una pipeline WordCount.
Figura 2: grafico di esecuzione per una pipeline WordCount in Dataflow Interfaccia di monitoraggio.

Denominazione delle trasformazioni composite

I nomi delle trasformazioni sono gerarchici, basati sulla gerarchia delle trasformazioni una pipeline o un blocco note personalizzato. Se la tua pipeline ha un trasformazione composita, alle trasformazioni nidificate vengono denominate in base alla trasformazione contenitore. Ad esempio, supponiamo che la pipeline contenga una trasformazione composita denominata CountWidgets, che contiene una trasformazione interna denominata Parse. Il nome completo della trasformazione è CountWidgets/Parse e devi specificare che il nome completo nella mappatura della trasformazione.

Se la nuova pipeline mappa una trasformazione composita a un nome diverso, anche tutte le trasformazioni nidificate vengono rinominate automaticamente. Devi specificare i nomi modificati per le trasformazioni interne nella mappatura delle trasformazioni.

Ristruttura la gerarchia delle trasformazioni

Se la pipeline di sostituzione utilizza una gerarchia di trasformazione diversa rispetto alla precedente, devi dichiarare esplicitamente la mappatura. Potresti avere una gerarchia di trasformazioni diversa perché hai sottoposto a refactoring le trasformazioni composite o perché la tua pipeline dipende da una trasformazione composita di una libreria modificata.

Ad esempio, la pipeline precedente applicava una trasformazione composita, CountWidgets, che conteneva una trasformazione interna denominata Parse. La pipeline sostitutiva esegue il refactoring di CountWidgets e nidifica Parse all'interno di un'altra trasformazione denominata Scan. Affinché l'aggiornamento vada a buon fine, devi mappare esplicitamente il codice completo nome della trasformazione nella pipeline precedente (CountWidgets/Parse) alla trasformazione nella nuova pipeline (CountWidgets/Scan/Parse):

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Se elimini completamente una trasformazione nella pipeline di sostituzione, devi fornire una mappatura nulla. Supponiamo che la pipeline di sostituzione rimuova completamente la trasformazione CountWidgets/Parse:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornisce un mapping nullo. Supponiamo che la pipeline di sostituzione rimuova completamente la trasformazione CountWidgets/Parse:

--transform_name_mapping={"CountWidgets/Parse":""}

Vai

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornisce un mapping nullo. Supponiamo che la pipeline di sostituzione rimuova Trasformazione completa di CountWidgets/Parse:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornisce un mapping nullo. Supponiamo che la pipeline di sostituzione rimuova completamente la trasformazione CountWidgets/Parse:

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

Se elimini una trasformazione completamente nella pipeline sostitutiva, devi fornisce un mapping nullo. Supponiamo che la pipeline di sostituzione rimuova completamente la trasformazione CountWidgets/Parse:

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

Effetti della sostituzione di un job

Quando sostituisci un job esistente, un nuovo job esegue il codice aggiornato della pipeline. Il servizio Dataflow conserva il nome del job, ma esegue il job di sostituzione con un ID job aggiornato. Questo processo potrebbe causare tempi di inattività durante l'arresto del job esistente, l'esecuzione del controllo di compatibilità e il nuovo job .

Il job di sostituzione conserva i seguenti elementi:

Dati dello stato intermedio

I dati dello stato intermedio del job precedente vengono conservati. I dati di stato non includono le cache in memoria. Se vuoi conservare i dati della cache in memoria quando aggiorni la pipeline, come soluzione alternativa, esegui il refactoring della pipeline per convertire memorizza nella cache dati relativi allo stato o a input aggiuntivi. Per ulteriori informazioni sull'utilizzo degli input aggiuntivi, vedi Pattern di input aggiuntivi nella documentazione di Apache Beam.

Le pipeline di streaming hanno limiti di dimensioni per ValueState e per gli input laterali. Di conseguenza, se hai cache di grandi dimensioni che vuoi conservare, potresti dover utilizzare uno spazio di archiviazione esterno, come Memorystore o Bigtable.

Dati in volo

"In volo" mentre i dati vengono ancora elaborati dalle trasformazioni nella nuova pipeline. Tuttavia, le trasformazioni aggiuntive che aggiungi nel codice della pipeline sostitutiva potrebbe diventare effettiva o meno, a seconda della posizione in cui i record sono memorizzati nel buffer. In questo esempio, la pipeline esistente contiene le seguenti trasformazioni:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Vai

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

Puoi sostituire il job con il nuovo codice della pipeline come segue:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Vai

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

Anche se aggiungi una trasformazione per filtrare le stringhe che iniziano con la lettera "A", la trasformazione successiva (FormatStrings) potrebbe comunque vedere stringhe in buffer o in transito che iniziano con "A" e che sono state trasferite dal job precedente.

Cambia windowing

Puoi modificare le strategie di definizione delle finestre e di attivazione per gli elementi PCollection nella pipeline di sostituzione, ma fai attenzione. La modifica delle strategie di windowing o trigger non influisce sui dati che vengono già nel buffer o comunque in corso.

Ti consigliamo di provare solo piccole modifiche alla finestra temporale della pipeline, come la modifica della durata delle finestre temporali fisse o scorrevoli. Apportare modifiche sostanziali alle finestre o agli attivatori, ad esempio modificare l'algoritmo di definizione delle finestre, potrebbe avere risultati imprevedibili sull'output della pipeline.

Verifica della compatibilità dei job

Quando avvii il job sostitutivo, il servizio Dataflow esegue un controllo di compatibilità tra il job sostitutivo e quello precedente. Se il controllo di compatibilità va a buon fine, il job precedente viene interrotto. Il job di sostituzione viene quindi avviato sul servizio Dataflow mantenendo lo stesso nome job. Se il controllo di compatibilità non va a buon fine, il job precedente continua a essere eseguito nel servizio Dataflow e il job sostitutivo restituisce un errore.

Java

A causa di una limitazione, devi utilizzare l'esecuzione bloccata per visualizzare gli errori di tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:

  1. Utilizza pipeline.run().waitUntilFinish() nel codice della pipeline.
  2. Esegui il programma della pipeline di sostituzione con l'opzione --update.
  3. Attendi che il job sostitutivo superi il controllo di compatibilità.
  4. Esci dalla procedura dell'esecutore di blocco digitando Ctrl+C.

In alternativa, puoi monitorare lo stato del job di sostituzione nel Interfaccia di monitoraggio di Dataflow. Se il job è stato avviato correttamente, ha superato anche il controllo di compatibilità.

Python

A causa di una limitazione, devi utilizzare l'esecuzione bloccata per visualizzare gli errori di tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:

  1. Utilizza pipeline.run().wait_until_finish() nel codice della pipeline.
  2. Esegui il programma della pipeline di sostituzione con l'opzione --update.
  3. Attendi che il job sostitutivo superi il controllo di compatibilità.
  4. Esci dal processo di esecuzione bloccata digitando Ctrl+C.

In alternativa, puoi monitorare lo stato del job di sostituzione nel Interfaccia di monitoraggio di Dataflow. Se il job è stato avviato correttamente, ha superato anche il controllo di compatibilità.

Vai

A causa di una limitazione, devi usare il blocco per visualizzare gli errori di tentativi di aggiornamento non riusciti nella console o nel terminale. Nello specifico, devi specificare l'esecuzione non bloccante utilizzando i flag --execute_async o --async. L'attualeworkaround consiste nei seguenti passaggi:

  1. Esegui il programma della pipeline di sostituzione con l'opzione --update e senza i flag --execute_async o --async.
  2. Attendi che il job di sostituzione superi il controllo di compatibilità.
  3. Esci dal processo di esecuzione bloccata digitando Ctrl+C.

gcloud

A causa di una limitazione, devi utilizzare l'esecuzione bloccata per visualizzare gli errori di tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:

  1. Per le pipeline Java, usa pipeline.run().waitUntilFinish() nel codice della pipeline. Per le pipeline Python, utilizza pipeline.run().wait_until_finish() nel codice della pipeline. Per le pipeline Go, segui i passaggi nella scheda Go.
  2. Esegui il programma della pipeline di sostituzione con l'opzione --update.
  3. Attendi che il job sostitutivo superi il controllo di compatibilità.
  4. Esci dalla procedura dell'esecutore di blocco digitando Ctrl+C.

REST

A causa di una limitazione, devi utilizzare l'esecuzione bloccata per visualizzare gli errori di tentativi di aggiornamento non riusciti nella console o nel terminale. La soluzione alternativa attuale prevede i seguenti passaggi:

  • Per le pipeline Java, utilizza pipeline.run().waitUntilFinish() nel codice della pipeline. Per le pipeline Python, utilizza pipeline.run().wait_until_finish() nel codice della pipeline. Per le pipeline Go, segui i passaggi nella scheda Go.
  • Esegui il programma della pipeline sostitutiva con il campo replaceJobId.
  • Attendi che il job di sostituzione superi il controllo di compatibilità.
  • Esci dal processo di esecuzione bloccata digitando Ctrl+C.

Il controllo di compatibilità utilizza la mappatura delle trasformazioni fornita per garantire che Dataflow possa trasferire i dati dello stato intermedio dai passaggi del job precedente al job sostitutivo. Il controllo di compatibilità garantisce anche che gli elementi PCollection della pipeline utilizzano gli stessi Codificatori. La modifica di un Coder può causare il fallimento del controllo di compatibilità perché eventuali dati in transito o record in buffer potrebbero non essere serializzati correttamente nella pipeline di sostituzione.

Previeni le interruzioni di compatibilità

Alcune differenze tra la pipeline precedente e quella sostitutiva la verifica della compatibilità potrebbe non andare a buon fine. Queste differenze includono:

  • Modifica del grafico della pipeline senza fornire una mappatura. Quando aggiorni un job, Dataflow tenta di abbinare le trasformazioni del job precedente a quelle del job sostitutivo. Questa procedura di corrispondenza Dataflow trasferisce i dati intermedi sullo stato per ogni passaggio. Se rinomini o rimuovi i passaggi, devi fornire una mappatura delle trasformazioni in modo che Dataflow possa abbinare i dati di stato di conseguenza.
  • Modifica degli input laterali di un passaggio. Aggiunta in corso... input aggiuntivi o la loro rimozione da una trasformazione nella pipeline di sostituzione, la verifica della compatibilità non vada a buon fine.
  • Cambia il programmatore per un passaggio. Quando aggiorni un job, Dataflow conserva tutti i record di dati attualmente presenti nel buffer li gestisce nel job di sostituzione. Ad esempio, potrebbero verificarsi dati presenti nel buffer durante il windowing in fase di risoluzione. Se il job di sostituzione utilizza una codifica dei dati diversa o incompatibile, Dataflow non è in grado di serializzare o deserializzare questi record.
  • Rimozione di una classe "stateful" operativa dalla tua pipeline. Se rimuovi operazioni stateful dalla pipeline, il job di sostituzione potrebbe non riuscire controllo di compatibilità. Dataflow può Fondere più passaggi per l'efficienza. Se rimuovi un'operazione dipendente dallo stato da un passaggio unito, il controllo non va a buon fine. Le operazioni con stato includono:

    • Trasformazioni che producono o consumano input aggiuntivi.
    • Letture I/O.
    • Trasformazioni che usano lo stato con chiave.
    • Trasformazioni con unione delle finestre.
  • Modifica delle variabili DoFn stateful. Per i job di streaming in corso, se la pipeline include DoFn con stato, la modifica delle variabili DoFn con stato potrebbe causare un errore della pipeline.

  • Tentativo di eseguire il job di sostituzione in una zona geografica diversa. Esegui il job di sostituzione nella stessa zona in cui hai eseguito il job precedente.

Aggiornamento degli schemi

Apache Beam consente agli PCollection di avere schemi con campi denominati, nel qual caso non servono programmatori espliciti. Se i nomi e i tipi di campo relativi a un determinato schema sono invariati (compresi i campi nidificati), questo schema non causa la controllo degli aggiornamenti. Tuttavia, l'aggiornamento potrebbe essere ancora bloccato se della nuova pipeline non sono compatibili.

Evolvere gli schemi

Spesso è necessario evolvere lo schema di un PCollection a causa dell'evoluzione delle esigenze aziendali. Il servizio Dataflow consente di apportare le seguenti modifiche a uno schema durante l'aggiornamento della pipeline:

  • Aggiunta di uno o più nuovi campi a uno schema, inclusi i campi nidificati.
  • Rendere facoltativo un tipo di campo obbligatorio (non annullabile).

La rimozione di campi, la modifica dei nomi dei campi o dei tipi di campo non è consentita durante l'aggiornamento.

Passare dati aggiuntivi a un'operazione ParDo esistente

Puoi passare dati aggiuntivi (out-of-band) a un'operazione ParDo esistente utilizzando uno dei seguenti metodi, a seconda del caso d'uso:

  • Serializza le informazioni come campi nella sottoclasse DoFn.
  • Tutte le variabili a cui fanno riferimento i metodi in un elemento DoFn anonimo vengono serializzato automaticamente.
  • Esegui il calcolo dei dati all'interno di DoFn.startBundle().
  • Passa i dati utilizzando ParDo.withSideInputs.

Per ulteriori informazioni, consulta le seguenti pagine: