Migrazione degli ambienti a Airflow 2

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina spiega come trasferire i DAG, i dati e la configurazione dai tuoi ambienti Airflow 1.10.* esistenti agli ambienti con Airflow 2 e versioni successive di Airflow.

Altre guide alla migrazione

Da A Metodo Guida
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 Affiancate, utilizzando gli snapshot Guida alla migrazione (snapshot)
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 Affiancate, utilizzando gli snapshot Guida alla migrazione (snapshot)
Cloud Composer 1, Airflow 2 Cloud Composer 2, Airflow 2 Trasferimento manuale fianco a fianco Guida alla migrazione manuale
Cloud Composer 1, Airflow 1 Cloud Composer 2, Airflow 2 Trasferimento manuale fianco a fianco Guida alla migrazione manuale
Flusso d'aria 1 Flusso d'aria 2 Trasferimento manuale fianco a fianco Questa guida (migrazione manuale)

Upgrade affiancati

Cloud Composer fornisce lo script di trasferimento del database Cloud Composer per eseguire la migrazione del database di metadati, dei DAG, dei dati e dei plug-in dagli ambienti Cloud Composer con Airflow 1.10.14 e Airflow 1.10.15 ad ambienti Cloud Composer esistenti con Airflow 2.0.1 e versioni successive di Airflow.

Si tratta di un percorso alternativo a quello descritto in questa guida. Alcune parti di questa guida sono ancora valide quando utilizzi lo script fornito. Ad esempio, potresti voler controllare la compatibilità dei DAG con Airflow 2 prima di eseguirne la migrazione o assicurarti che le esecuzioni di DAG simultanee non si verifichino e che non siano presenti esecuzioni di DAG aggiuntive o mancanti.

Prima di iniziare

Prima di iniziare a utilizzare gli ambienti Cloud Composer con Airflow 2, considera le modifiche che Airflow 2 apporta agli ambienti Cloud Composer.

Alta disponibilità scheduler

Puoi utilizzare più di uno scheduler Airflow nel tuo ambiente. Puoi impostare il numero di scheduler quando crei un ambiente o aggiornando un ambiente esistente.

Esecutore Celery + Kubernetes

Airflow 2 Celery + Kubernetes Executor è supportato in Cloud Composer 3.

Modifiche che provocano un errore

Airflow 2 introduce molti cambiamenti importanti, alcuni dei quali si stanno interrompendo:

Differenze tra gli ambienti con Airflow 2 e Airflow 1.10.*

Principali differenze tra gli ambienti Cloud Composer con Airflow 1.10.* e gli ambienti con Airflow 2:

  • Gli ambienti con Airflow 2 utilizzano Python 3.8. Si tratta di una versione più recente di quella utilizzata negli ambienti Airflow 1.10.*. Python 2, Python 3.6, Python 3.7 non è supportato.
  • Airflow 2 utilizza un formato dell'interfaccia a riga di comando diverso. Cloud Composer supporta il nuovo formato in ambienti con Airflow 2 tramite il comando gcloud composer environments run.
  • I pacchetti PyPI preinstallati sono diversi negli ambienti Airflow 2. Per un elenco dei pacchetti PyPI preinstallati, vedi Elenco delle versioni di Cloud Composer.
  • La serie DAG è sempre abilitata in Airflow 2. Di conseguenza, il caricamento dei DAG asincrono non è più necessario e non è supportato in Airflow 2. Di conseguenza, la configurazione dei parametri [core]store_serialized_dags e [core]store_dag_code non è supportata per Airflow 2 e i tentativi di impostarli verranno segnalati come errori.
  • I plug-in del server web di Airflow non sono supportati. Questo non influisce sui plug-in di scheduler o worker, inclusi gli operatori e i sensori di Airflow.
  • Negli ambienti Airflow 2, il ruolo utente predefinito di Airflow è Op. Per gli ambienti con Airflow 1.10.*, il ruolo predefinito è Admin.

Passaggio 1: controlla la compatibilità con Airflow 2

Per verificare la presenza di potenziali conflitti con Airflow 2, utilizza gli script di controllo dell'upgrade forniti da Airflow nel tuo ambiente Airflow 1.10.* esistente.

gcloud

  1. Se il tuo ambiente utilizza Airflow 1.10.14 e versioni precedenti, esegui l'upgrade dell'ambiente a una versione di Cloud Composer che utilizza Airflow 1.10.15 e versioni successive. Cloud Composer supporta i comandi di controllo degli upgrade a partire da Airflow 1.10.15.

  2. Esegui i controlli dell'upgrade tramite il comando gcloud composer environments run. Alcuni controlli dell'upgrade pertinenti per Airflow autonomo 1.10.15 non sono rilevanti per Cloud Composer. Il seguente comando esclude questi controlli.

    gcloud composer environments run \
        AIRFLOW_1_ENV  \
        --location=AIRFLOW_1_LOCATION \
        upgrade_check \
        -- --ignore VersionCheckRule --ignore LoggingConfigurationRule \
        --ignore PodTemplateFileRule --ignore SendGridEmailerMovedRule
    

    Sostituisci:

    • AIRFLOW_1_ENV con il nome del tuo ambiente Airflow 1.10.*
    • AIRFLOW_1_LOCATION con la regione in cui si trova l'ambiente.
  3. Verifica l'output del comando. Gli script di controllo degli aggiornamenti segnalano potenziali problemi di compatibilità negli ambienti esistenti.

  4. Implementa altre modifiche ai DAG, come descritto nella guida all'upgrade ad Airflow 2.0 o versioni successive, nella sezione sull'upgrade dei DAG.

Passaggio 2: crea un ambiente Airflow 2, trasferisci gli override della configurazione e le variabili di ambiente

Crea un ambiente Airflow 2 e trasferisci gli override della configurazione e le variabili di ambiente:

  1. Segui i passaggi per creare un ambiente. Prima di creare un ambiente, specifica anche gli override della configurazione e le variabili di ambiente, come spiegato ulteriormente.

  2. Quando selezioni un'immagine, scegli un'immagine con Airflow 2.

  3. Trasferisci manualmente i parametri di configurazione dal tuo ambiente Airflow 1.10.* al nuovo ambiente Airflow 2.

    Console

    1. Quando crei un ambiente, espandi la sezione Networking, override della configurazione di Airflow e funzionalità aggiuntive.

    2. In Override della configurazione di Airflow, fai clic su Aggiungi override della configurazione di Airflow.

    3. Copia tutti gli override della configurazione dal tuo ambiente Airflow 1.10.*

      Alcune opzioni di configurazione utilizzano un nome e una sezione diversi in Airflow 2. Per maggiori informazioni, consulta Modifiche alla configurazione.

    4. In Variabili di ambiente, fai clic su Aggiungi variabile di ambiente.

    5. Copia tutte le variabili di ambiente dal tuo ambiente Airflow 1.10.*

    6. Fai clic su Crea per creare un ambiente.

Passaggio 3: installa i pacchetti PyPI nell'ambiente Airflow 2

Dopo aver creato il tuo ambiente Airflow 2, installa i pacchetti PyPI al suo interno:

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Seleziona il tuo ambiente Airflow 2.

  3. Vai alla scheda Pacchetti PyPI e fai clic su Modifica.

  4. Copia i requisiti del pacchetto PyPI dal tuo ambiente Airflow 1.10.*. Fai clic su Salva e attendi che l'ambiente venga aggiornato.

    Poiché gli ambienti Airflow 2 utilizzano un insieme diverso di pacchetti preinstallati e una versione Python diversa, potresti riscontrare conflitti nei pacchetti PyPI difficili da risolvere. Un modo per diagnosticare i problemi di dipendenza dei pacchetti è verificare la presenza di errori del pacchetto PyPI installando i pacchetti in un pod worker Airflow.

Passaggio 4: trasferisci variabili e pool su Airflow 2

Airflow 1.10.* supporta l'esportazione di variabili e pool in file JSON. Puoi quindi importare questi file nel tuo ambiente Airflow 2.

Devi trasferire i pool solo se hai pool personalizzati diversi da default_pool. In caso contrario, salta i comandi che esportano e importano i pool.

gcloud

  1. Esporta le variabili dal tuo ambiente Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         variables -- -e /home/airflow/gcs/data/variables.json
    

    Sostituisci:

    • AIRFLOW_1_ENV con il nome del tuo ambiente Airflow 1.10.*
    • AIRFLOW_1_LOCATION con la regione in cui si trova l'ambiente.
  2. Esporta i pool dal tuo ambiente Airflow 1.10.*:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         pool -- -e /home/airflow/gcs/data/pools.json
    
  3. Recupera l'URI del bucket di ambiente Airflow 2.

    1. Esegui questo comando:

      gcloud composer environments describe AIRFLOW_2_ENV \
          --location AIRFLOW_2_LOCATION \
           --format="value(config.dagGcsPrefix)"
      

      Sostituisci:

      • AIRFLOW_2_ENV con il nome del tuo ambiente Airflow 2.
      • AIRFLOW_2_LOCATION con la regione in cui si trova l'ambiente.
    2. Nell'output, rimuovi la cartella /dags. Il risultato sarà l'URI del bucket dell'ambiente Airflow 2.

      Ad esempio, cambia gs://us-central1-example-916807e1-bucket/dags in gs://us-central1-example-916807e1-bucket.

  4. Trasferisci i file JSON con variabili e pool nel tuo ambiente Airflow 2:

    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=variables.json
    
    gcloud composer environments storage data export \
        --destination=AIRFLOW_2_BUCKET/data \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION \
        --source=pools.json
    

    Sostituisci AIRFLOW_2_BUCKET con l'URI del bucket dell'ambiente Airflow 2, ottenuto nel passaggio precedente.

  5. Importa variabili e pool in Airflow 2:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables import \
        -- /home/airflow/gcs/data/variables.json
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools import \
        -- /home/airflow/gcs/data/pools.json
    
  6. Verifica che le variabili e i pool siano importati:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        variables list
    
    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        pools list
    
  7. Rimuovi i file JSON dai bucket:

    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
    gcloud composer environments storage data delete \
        variables.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
    gcloud composer environments storage data delete \
        pools.json \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    

Passaggio 5: Trasferisci altri dati dal bucket di ambiente Airflow 1.10.*

gcloud

  1. Trasferisci i plug-in nel tuo ambiente Airflow 2. A tale scopo, esporta i plug-in dal tuo bucket di ambiente Airflow 1.10.* nella cartella /plugins del bucket di ambiente Airflow 2:

    gcloud composer environments storage plugins export \
        --destination=AIRFLOW_2_BUCKET/plugins \
        --environment=AIRFLOW_1_ENV \
        --location=AIRFLOW_1_LOCATION
    
  2. Verifica che la cartella /plugins sia stata importata correttamente:

    gcloud composer environments storage plugins list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    
  3. Esporta la cartella /data dal tuo ambiente Airflow 1.10.* nell'ambiente Airflow 2:

        gcloud composer environments storage data export \
            --destination=AIRFLOW_2_BUCKET/data \
            --environment=AIRFLOW_1_ENV \
            --location=AIRFLOW_1_LOCATION
    
  4. Verifica che la cartella /data sia stata importata correttamente:

    gcloud composer environments storage data list \
        --environment=AIRFLOW_2_ENV \
        --location=AIRFLOW_2_LOCATION
    

Passaggio 6: trasferisci connessioni e utenti

Airflow 1.10.* non supporta l'esportazione di utenti e connessioni. Per trasferire utenti e connessioni, crea manualmente nuovi account utente e nuove connessioni nel tuo ambiente Airflow 2.

gcloud

  1. Per ottenere un elenco delle connessioni nel tuo ambiente Airflow 1.10.*, esegui:

    gcloud composer environments run AIRFLOW_1_ENV \
        --location AIRFLOW_1_LOCATION \
         connections -- --list
    
  2. Per creare una nuova connessione nel tuo ambiente Airflow 2, esegui il comando connections dell'interfaccia a riga di comando di Airflow tramite gcloud. Ad esempio:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        connections add \
        -- --conn-host postgres.example.com \
        --conn-port 5432 \
        --conn-type postgres \
        --conn-login example_user \
        --conn-password example_password \
        --conn-description "Example connection" \
        example_connection
    
  3. Per visualizzare un elenco di utenti nel tuo ambiente Airflow 1.10.*:

    1. Apri l'interfaccia web di Airflow per il tuo ambiente Airflow 1.10.*

    2. Vai ad Amministratore > Utenti.

  4. Per creare un nuovo account utente nel tuo ambiente Airflow 2, esegui il comando users create dell'interfaccia a riga di comando di Airflow tramite gcloud. Ad esempio:

    gcloud composer environments run \
        AIRFLOW_2_ENV \
        --location AIRFLOW_2_LOCATION \
        users create \
        -- --username example_username \
        --firstname Example-Name \
        --lastname Example-Surname \
        --email example-user@example.com \
        --use-random-password \
        --role Admin
    

Passaggio 7: assicurati che i DAG siano pronti per Airflow 2

Prima di trasferire i DAG nel tuo ambiente Airflow 2, assicurati che:

  1. Gli script dei controlli dell'upgrade per i DAG vengono eseguiti correttamente e non ci sono altri problemi di compatibilità.

  2. I DAG utilizzano istruzioni di importazione corrette.

    Ad esempio, la nuova istruzione di importazione per BigQueryCreateDataTransferOperator può avere il seguente aspetto:

    from airflow.providers.google.cloud.operators.bigquery_dts \
        import BigQueryCreateDataTransferOperator
    
  3. Viene eseguito l'upgrade dei DAG per Airflow 2. Questa modifica è compatibile con Airflow 1.10.14 e versioni successive.

Passaggio 8: trasferisci i DAG all'ambiente Airflow 2

Quando trasferisci i DAG tra gli ambienti, potrebbero verificarsi i seguenti potenziali problemi:

  • Se un DAG è abilitato (non in pausa) in entrambi gli ambienti, ogni ambiente esegue la propria copia del DAG, come programmato. Ciò potrebbe causare esecuzioni simultanee dei DAG per gli stessi dati e lo stesso tempo di esecuzione.

  • A causa del recupero DAG, Airflow pianifica esecuzioni di DAG aggiuntive, a partire dalla data di inizio specificata nei DAG. Questo accade perché la nuova istanza Airflow non tiene conto della cronologia delle esecuzioni dei DAG dall'ambiente 1.10.*. Ciò potrebbe comportare un numero elevato di esecuzioni di DAG pianificate a partire dalla data di inizio specificata.

Impedisci le esecuzioni simultanee di DAG

Nel tuo ambiente Airflow 2, esegui l'override dell'opzione di configurazione di Airflow 2 dags_are_paused_at_creation. Dopo aver apportato questa modifica, tutti i nuovi DAG vengono messi in pausa per impostazione predefinita.

Sezione Chiave Valore
core dags_are_paused_at_creation True

Impedisci esecuzioni di DAG aggiuntive o mancanti

Specifica una nuova data di inizio statica nei DAG che trasferisci nell'ambiente Airflow 2.

Per evitare lacune e sovrapposizioni nelle date di esecuzione, la prima esecuzione del DAG deve essere eseguita nell'ambiente Airflow 2 alla successiva occorrenza dell'intervallo di pianificazione. Per farlo, imposta la nuova data di inizio nel DAG in modo che sia precedente alla data dell'ultima esecuzione nell'ambiente Airflow 1.10.*.

Ad esempio, se il DAG viene eseguito ogni giorno alle 15:00, alle 17:00 e alle 21:00 nell'ambiente Airflow 1.10.*, l'ultima esecuzione del DAG è avvenuta alle 15:00 e prevedi di trasferire il DAG alle 15:15, la data di inizio dell'ambiente Airflow 2 può essere oggi alle 14:45. Dopo aver abilitato il DAG nell'ambiente Airflow 2, Airflow programma l'esecuzione di un DAG per le 17:00.

Per fare un altro esempio, se il DAG viene eseguito ogni giorno alle 00:00 nell'ambiente Airflow 1.10.*, l'ultima esecuzione del DAG è avvenuta alle 00:00 il 26 aprile 2021 e prevedi di trasferire il DAG alle 13:00 del 26 aprile 2021, quindi la data di inizio dell'ambiente Airflow 252020202020 aprile 20202020 Dopo aver abilitato il DAG nell'ambiente Airflow 2, Airflow pianifica l'esecuzione di un DAG per le 00:00 del 27 aprile 2021.

Trasferisci i tuoi DAG uno alla volta nell'ambiente Airflow 2

Per ogni DAG, segui questa procedura per trasferirlo:

  1. Assicurati che la nuova data di inizio nel DAG sia impostata come descritto nella sezione precedente.

  2. Carica il DAG aggiornato nell'ambiente Airflow 2. Questo DAG è stato messo in pausa nell'ambiente Airflow 2 a causa dell'override della configurazione, quindi non sono ancora pianificate esecuzioni di DAG.

  3. Nell'interfaccia web di Airflow, vai a DAG e controlla se sono stati segnalati errori di sintassi dei DAG.

  4. Nel momento in cui prevedi di trasferire il DAG:

    1. Metti in pausa il DAG nel tuo ambiente Airflow 1.10.*.

    2. Riattiva il DAG nel tuo ambiente Airflow 2.

    3. Verifica che la nuova esecuzione del DAG sia pianificata all'ora corretta.

    4. Attendi che l'esecuzione del DAG venga eseguita nell'ambiente Airflow 2 e controlla se l'esecuzione è riuscita.

  5. A seconda che l'esecuzione del DAG abbia esito positivo:

    • Se l'esecuzione del DAG ha esito positivo, puoi procedere e utilizzare il DAG dal tuo ambiente Airflow 2. Infine, valuta la possibilità di eliminare la versione Airflow 1.10.* del DAG.

    • Se l'esecuzione del DAG non è riuscita, prova a risolvere i problemi del DAG fino a quando non viene eseguito correttamente in Airflow 2.

      Se necessario, puoi sempre utilizzare la versione Airflow 1.10.* del DAG:

      1. Metti in pausa il DAG nel tuo ambiente Airflow 2.

      2. Riattiva il DAG nel tuo ambiente Airflow 1.10.*. In questo modo pianifica una nuova esecuzione di DAG per la stessa data e ora dell'esecuzione del DAG non riuscita.

      3. Quando è tutto pronto per continuare con la versione Airflow 2 del DAG, regola la data di inizio, carica la nuova versione del DAG nell'ambiente Airflow 2 e ripeti la procedura.

Passaggio 9: monitora l'ambiente Airflow 2

Dopo aver trasferito tutti i DAG e la configurazione nell'ambiente Airflow 2, monitoralo per individuare potenziali problemi, esecuzioni di DAG non riuscite e integrità complessiva dell'ambiente. Se l'ambiente Airflow 2 viene eseguito senza problemi per un periodo di tempo sufficiente, puoi rimuovere l'ambiente Airflow 1.10.*.

Passaggi successivi