Aggiunta e aggiornamento dei DAG

Cloud Composer 1 | Cloud Composer 2

Questa pagina descrive come gestire i DAG nel tuo ambiente Cloud Composer.

Cloud Composer utilizza un bucket Cloud Storage per archiviare i DAG del tuo ambiente Cloud Composer. Il tuo ambiente sincronizza i DAG da questo bucket ai componenti Airflow, ad esempio i worker e gli scheduler di Airflow.

Prima di iniziare

  • Poiché Apache Airflow non fornisce un livello di isolamento DAG elevato, ti consigliamo di mantenere ambienti di produzione e di test separati per evitare interferenze DAG. Per maggiori informazioni, consulta la pagina Test dei DAG.
  • Assicurati che il tuo account disponga di autorizzazioni sufficienti per gestire i DAG.
  • Le modifiche al DAG si propagano a Airflow entro 3-5 minuti. Puoi visualizzare lo stato dell'attività nell'interfaccia web di Airflow.

Accedi al bucket del tuo ambiente

Per accedere al bucket associato al tuo ambiente:

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, individua una riga con il nome dell'ambiente e fai clic sul link DAG nella colonna Cartella DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella /dags nel bucket del tuo ambiente.

gcloud

L'interfaccia a riga di comando gcloud ha comandi separati per aggiungere ed eliminare i DAG nel bucket del tuo ambiente.

Se vuoi interagire con il bucket del tuo ambiente, puoi anche utilizzare lo strumento a riga di comando gsutil. Per ottenere l'indirizzo del bucket del tuo ambiente, esegui il seguente comando gcloud CLI:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.

Esempio:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crea una richiesta API environments.get. Nella risorsa Ambiente, all'interno della risorsa EnvironmentConfig, la risorsa dagGcsPrefix è l'indirizzo del bucket del tuo ambiente.

Esempio:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Utilizza la libreria google-auth per ottenere le credenziali e utilizzare la libreria requests per chiamare l'API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Aggiungere o aggiornare un DAG

Per aggiungere o aggiornare un DAG, sposta il file Python .py per il DAG nella cartella /dags del bucket dell'ambiente.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, individua una riga con il nome dell'ambiente e fai clic sul link DAG nella colonna Cartella DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella /dags nel bucket del tuo ambiente.

  3. Fai clic su Carica file. Quindi seleziona il file Python .py per il DAG utilizzando la finestra di dialogo del browser e conferma.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • LOCAL_FILE_TO_UPLOAD è il file Python .py per il DAG.

Esempio:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Aggiorna un DAG con esecuzioni di DAG attive

Se aggiorni un DAG con esecuzioni di DAG attive:

  • Tutte le attività attualmente in esecuzione vengono terminate utilizzando il file DAG originale.
  • Tutte le attività pianificate ma non attualmente in esecuzione utilizzano il file DAG aggiornato.
  • Tutte le attività che non sono più presenti nel file DAG aggiornato sono contrassegnate come rimosse.

Aggiornamento dei DAG eseguiti in base a una pianificazione frequente

Dopo aver caricato un file DAG, è necessario un po' di tempo prima che Airflow carichi il file e aggiorni il DAG. Se il DAG viene eseguito in base a una pianificazione frequente, ti conviene assicurarti che utilizzi la versione aggiornata del file DAG. Ecco come fare:

  1. Metti in pausa il DAG nella UI di Airflow.
  2. Carica un file DAG aggiornato.
  3. Attendi fino a quando non vengono visualizzati gli aggiornamenti nella UI di Airflow. Ciò significa che il DAG è stato analizzato correttamente dallo scheduler e aggiornato nel database Airflow.

    Se la UI di Airflow mostra i DAG aggiornati, non è garantito che i worker Airflow dispongano della versione aggiornata del file DAG. Questo accade perché i file DAG vengono sincronizzati in modo indipendente per gli scheduler e i worker.

  4. Potresti prolungare il tempo di attesa per assicurarti che il file DAG sia sincronizzato con tutti i worker nel tuo ambiente. La sincronizzazione avviene diversi volte al minuto. In un ambiente integro, attendere circa 20-30 secondi è sufficiente per la sincronizzazione di tutti i worker.

  5. (Facoltativo) Se vuoi assicurarti che tutti i worker abbiano la nuova versione del file DAG, controlla i log per ogni singolo worker. Ecco come fare:

    1. Apri la scheda Log per il tuo ambiente nella console Google Cloud.

    2. Vai a Log di Composer > Infrastruttura > elemento Sincronizzazione Cloud Storage e ispeziona i log per ogni worker nel tuo ambiente. Cerca l'elemento di log Syncing dags directory più recente con un timestamp successivo al caricamento del nuovo file DAG. Se vedi un elemento Finished syncing che lo segue, i DAG sono sincronizzati correttamente su questo worker.

  6. Riattiva il DAG.

Eliminazione di un DAG

Questa sezione descrive come eliminare un DAG.

Eliminazione di un DAG nel tuo ambiente

Per eliminare un DAG, rimuovi il file Python .py per il DAG dalla cartella /dags dell'ambiente nel bucket dell'ambiente.

Console

  1. Nella console Google Cloud, vai alla pagina Ambienti.

    Vai ad Ambienti

  2. Nell'elenco degli ambienti, individua una riga con il nome dell'ambiente e fai clic sul link DAG nella colonna Cartella DAG. Viene visualizzata la pagina Dettagli bucket. Mostra i contenuti della cartella /dags nel bucket del tuo ambiente.

  3. Seleziona il file DAG, fai clic su Elimina e conferma l'operazione.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • DAG_FILE con il file Python .py per il DAG.

Esempio:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Rimozione di un DAG dall'interfaccia utente di Airflow

Puoi rimuovere un DAG dalla UI di Airflow in Airflow 1.10.0 o versioni successive.

Per rimuovere i metadati per un DAG dall'interfaccia web di Airflow:

UI di Airflow

  1. Vai alla UI di Airflow per il tuo ambiente.
  2. Per il DAG, fai clic su Elimina DAG.

gcloud

Nelle versioni di Airflow 1 precedenti alla 1.14.0, esegui questo comando nellgcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

In Airflow 2, Airflow 1.14.0 e versioni successive, esegui questo comando nellgcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Sostituisci:

  • ENVIRONMENT_NAME con il nome dell'ambiente.
  • LOCATION con la regione in cui si trova l'ambiente.
  • DAG_NAME è il nome del DAG da eliminare.

Passaggi successivi