Esegui un DAG Apache Airflow in Cloud Composer 1 (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa guida rapida mostra come creare un ambiente Cloud Composer ed eseguire un DAG Apache Airflow in Cloud Composer 1.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Install the Google Cloud CLI.
  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. Per ottenere le autorizzazioni necessarie per completare questa guida introduttiva, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:

    Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.

    Potresti anche riuscire a ottenere le autorizzazioni richieste tramite la ruoli o altri ruoli predefiniti ruoli.

Creazione di un ambiente

Crea un nuovo ambiente denominato example-environment nella regione us-central1 con l'ultima versione di Cloud Composer 1.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

Creare un file DAG

Un DAG Airflow è una raccolta di attività organizzate che che vuoi pianificare ed eseguire. I DAG sono definiti in file Python standard.

Questa guida utilizza un esempio di DAG Airflow definito nel file quickstart.py. Il codice Python in questo file esegue le seguenti operazioni:

  1. Crea un DAG, composer_sample_dag. Questo DAG viene eseguito ogni giorno.
  2. Esegue un'attività, print_dag_run_conf. L'attività stampa la configurazione della esecuzione del DAG utilizzando l'operatore bash.

Salva una copia del file quickstart.py sul computer locale:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Carica il file DAG nel bucket dell'ambiente

A ogni ambiente Cloud Composer è associato un bucket Cloud Storage. Airflow in Cloud Composer pianifica solo i DAG che si trovano nella cartella /dags di questo bucket.

Per pianificare il DAG, carica quickstart.py dalla tua macchina locale nella cartella /dags dell'ambiente:

Per caricare quickstart.py con Google Cloud CLI, esegui questo comando in la cartella in cui si trova il file quickstart.py:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Visualizza il DAG

Dopo aver caricato il file DAG, Airflow fa quanto segue:

  1. Analizza il file DAG che hai caricato. Potrebbero essere necessari alcuni minuti i DAG per renderli disponibili per Airflow.
  2. Aggiunge il DAG all'elenco dei DAG disponibili.
  3. Esegue il DAG in base alla pianificazione specificata nel file DAG.

Controlla che il DAG venga elaborato senza errori e che sia disponibile in Airflow visualizzandolo nell'interfaccia utente del DAG. La UI di DAG è l'interfaccia di Cloud Composer per la visualizzazione Informazioni sui DAG nella console Google Cloud. Cloud Composer offre inoltre accesso alla UI di Airflow, che è un ambiente web di Airflow nativo a riga di comando.

  1. Attendi circa cinque minuti per concedere ad Airflow il tempo di elaborare il file DAG caricato in precedenza e per completare la prima esecuzione del DAG (come spiegato più avanti).

  2. Esegui il seguente comando nell'interfaccia a riga di comando Google Cloud. Questo comando esegue dags list Comando dell'interfaccia a riga di comando di Airflow che elenca i DAG completamente gestito di Google Cloud.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifica che il DAG composer_quickstart sia elencato nell'output del comando.

    Output di esempio:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Visualizzare i dettagli dell'esecuzione del DAG

Una singola esecuzione di un DAG è chiamata esecuzione di DAG. Airflow esegue immediatamente un'esecuzione del DAG di esempio perché la data di inizio nel file DAG è impostata su ieri. In questo modo, Airflow raggiunge i DAG specificati programmazione.

Il DAG di esempio contiene un'attività, print_dag_run_conf, che esegue il comando echo nella console. Questo comando restituisce metadati sul DAG (Identificatore numerico dell'esecuzione di DAG).

Esegui questo comando in Google Cloud CLI. Questo comando elenca le esecuzioni del DAG per il DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Output di esempio:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

L'interfaccia a riga di comando di Airflow non fornisce un comando per visualizzare i log delle attività. Puoi utilizzare la modalità Altri metodi per visualizzare i log delle attività di Airflow: UI DAG di Cloud Composer, UI di Airflow o Cloud Logging. Questa guida mostra un modo per eseguire query su Cloud Logging per i log di una specifica esecuzione del DAG.

Esegui il seguente comando nell'interfaccia a riga di comando Google Cloud. Questo comando legge i log da Cloud Logging per un'esecuzione specifica del DAG del DAG composer_quickstart. La L'argomento --format formatta l'output in modo che solo il testo del messaggio di log .

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Sostituisci:

  • RUN_ID con il valore run_id dell'output del comando tasks states-for-dag-run eseguito in precedenza. Per ad esempio 2024-02-17T15:38:38.969307+00:00.

Output di esempio:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Esegui la pulizia

Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate su questa pagina, elimina il progetto Google Cloud con le risorse.

Elimina le risorse utilizzate in questo tutorial:

  1. Elimina l'ambiente Cloud Composer:

    1. Nella console Google Cloud, vai alla pagina Ambienti.

      Vai ad Ambienti

    2. Seleziona example-environment e fai clic su Elimina.

    3. Attendi che l'ambiente venga eliminato.

  2. Elimina il bucket dell'ambiente. L'eliminazione dell'ambiente Cloud Composer non comporta l'eliminazione del relativo bucket.

    1. Nella console Google Cloud, vai a Storage > Pagina Browser.

      Vai ad Spazio di archiviazione > Browser

    2. Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato us-central1-example-environ-c1616fe8-bucket.

  3. Elimina il disco permanente dalla coda Redis del tuo ambiente. L'eliminazione del L'ambiente Cloud Composer non elimina il disco permanente.

    1. Nella console Google Cloud, vai a Compute Engine > Dischi.

      Vai a dischi

    2. Seleziona il disco permanente della coda Redis dell'ambiente e fai clic su Elimina.

      Ad esempio, questo disco può essere denominato gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. I dischi per Cloud Composer 1 hanno sempre il Standard persistent disk tipo e le dimensioni di 2 GB.

Passaggi successivi