Aggiungi e aggiorna i DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa pagina descrive come gestire i DAG nell'ambiente Cloud Composer.

Cloud Composer utilizza un bucket Cloud Storage per archiviare i DAG del tuo ambiente Cloud Composer. Il tuo ambiente sincronizza i DAG di questo bucket con i componenti Airflow, come i worker e gli scheduler di Airflow.

Prima di iniziare

  • Poiché Apache Airflow non fornisce un isolamento DAG efficace, consigliamo di mantenere ambienti di produzione e di test separati per evitare interferenze con il DAG. Per ulteriori informazioni, consulta la sezione Testare i DAG.
  • Assicurati che il tuo account disponga di autorizzazioni sufficienti per: e gestire i DAG.
  • Le modifiche al DAG vengono propagate ad Airflow entro 3-5 minuti. Puoi visualizzare l'attività nell'interfaccia web di Airflow.

Accedi al bucket del tuo ambiente

Per accedere al bucket associato al tuo ambiente:

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, trova una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella /dags nel bucket del tuo ambiente.

gcloud

gcloud CLI ha comandi separati aggiungi ed elimina i DAG nel bucket del tuo ambiente.

Se vuoi interagire con il bucket del tuo ambiente, puoi anche utilizzare Google Cloud CLI. Per ottenere l'indirizzo del bucket del tuo ambiente, esegui la seguente gcloud CLI :

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente individuarlo.

Esempio:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Creare una richiesta API environments.get. Nella nella risorsa Ambiente, Risorsa EnvironmentConfig, in la risorsa dagGcsPrefix è l'indirizzo del bucket dell'ambiente.

Esempio:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Utilizza la libreria google-auth per recuperare le credenziali e la libreria requests per chiamare l'API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Aggiungere o aggiornare un DAG

Per aggiungere o aggiornare un DAG, sposta il file .py Python del DAG nella cartella /dags del bucket dell'ambiente.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, trova una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. La Si apre la pagina Dettagli bucket. Mostra i contenuti della cartella /dags nel bucket del tuo ambiente.

  3. Fai clic su Carica file. Quindi seleziona il file .py Python per il DAG utilizzando la finestra di dialogo del browser e conferma.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente individuarlo.
  • LOCAL_FILE_TO_UPLOAD è il file .py Python per il DAG.

Esempio:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Aggiorna un DAG con esecuzioni di DAG attive

Se aggiorni un DAG con esecuzioni di DAG attive:

  • Tutte le attività attualmente in esecuzione terminano utilizzando il file DAG originale.
  • Tutte le attività pianificate ma non attualmente in esecuzione utilizzano il modello del file DAG.
  • Tutte le attività che non sono più presenti nel file DAG aggiornato sono contrassegnate come rimosso.

Aggiornare i DAG che vengono eseguiti con una frequenza elevata

Dopo aver caricato un file DAG, Airflow impiega un po' di tempo per caricarlo e aggiornare il DAG. Se il DAG viene eseguito con una pianificazione frequente, potresti assicurati che il DAG utilizzi la versione aggiornata del file DAG. Ecco come fare:

  1. Metti in pausa il DAG nella UI di Airflow.

  2. Carica un file DAG aggiornato.

  3. Attendi finché non vedi gli aggiornamenti nella UI di Airflow. Ciò significa che il DAG è stato analizzato correttamente dallo scheduler e aggiornato nel database Airflow.

    Se la UI di Airflow mostra i DAG aggiornati, ciò non garantisce che I worker di Airflow hanno la versione aggiornata del file DAG. Questo accade perché i file DAG vengono sincronizzati in modo indipendente per gli scheduler e i worker.

  4. Ti consigliamo di estendere il tempo di attesa per assicurarti che il file DAG sia sincronizzato con tutti i worker del tuo ambiente. La sincronizzazione avviene diverse volte al minuto. In un ambiente sano, aspettare circa 20-30 secondi sono sufficienti per la sincronizzazione di tutti i worker.

  5. (Facoltativo) Se vuoi avere la certezza che tutti i worker dispongano della nuova versione del file DAG, controlla i log di ogni singolo worker. Ecco come fare:

    1. Apri la scheda Log per il tuo ambiente nella console Google Cloud.

    2. Vai a Log di Composer > Infrastruttura > Sincronizza Cloud Storage e ispeziona i log per ogni worker in del tuo ambiente. Cerca il log Syncing dags directory più recente elemento con timestamp successivo al caricamento del nuovo file DAG. Se un elemento Finished syncing che lo segue, i DAG vengono sincronizzati con successo su questo worker.

  6. Riattiva il DAG.

Elimina un DAG nel tuo ambiente

Per eliminare un DAG, rimuovi il file .py Python del DAG dalla cartella /dags dell'ambiente nel bucket dell'ambiente.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, trova una riga con il nome del tuo ambiente e nella colonna Cartella DAG fai clic sul link DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella /dags in del bucket dell'ambiente.

  3. Seleziona il file DAG, fai clic su Elimina e conferma l'operazione.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente individuarlo.
  • DAG_FILE con il file .py Python per il DAG.

Esempio:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Rimuovere un DAG dall'interfaccia utente di Airflow

Per rimuovere i metadati di un DAG dall'interfaccia web di Airflow:

Interfaccia utente di Airflow

  1. Vai all'interfaccia utente di Airflow per il tuo ambiente.
  2. Per il DAG, fai clic su Elimina DAG.

gcloud

Esegui il seguente comando in gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • DAG_NAME è il nome del DAG da eliminare.

Passaggi successivi