Esegui un DAG Apache Airflow in Cloud Composer 3 (Google Cloud CLI)
Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Questa guida rapida mostra come creare un ambiente Cloud Composer e eseguire un DAG Apache Airflow in Cloud Composer 3.
Se non hai mai utilizzato Airflow, consulta il tutorial sui concetti di Airflow nella documentazione di Apache Airflow per saperne di più su concetti, oggetti e utilizzo di Airflow.
Se vuoi utilizzare la console Google Cloud, consulta Esegui un DAG Apache Airflow in Cloud Composer.
Se vuoi creare un ambiente utilizzando Terraform, consulta Creare ambienti (Terraform).
Prima di iniziare
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Cloud Composer API:
gcloud services enable composer.googleapis.com
-
Per ottenere le autorizzazioni necessarie per completare questa guida introduttiva, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:
-
Per visualizzare, creare e gestire l'ambiente Cloud Composer:
-
Amministratore ambienti e oggetti Storage (
roles/composer.environmentAndStorageObjectAdmin
) -
Utente account di servizio (
roles/iam.serviceAccountUser
)
-
Amministratore ambienti e oggetti Storage (
-
Per visualizzare i log:
Visualizzatore log (
roles/logging.viewer
)
Per saperne di più sulla concessione dei ruoli, consulta Gestire l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
-
Per visualizzare, creare e gestire l'ambiente Cloud Composer:
Creare l'account di servizio di un ambiente
Quando crei un ambiente, specifichi un account di servizio. Questo account di servizio è chiamato account di servizio dell'ambiente. Il tuo ambiente utilizza questo account di servizio per eseguire la maggior parte delle operazioni.
L'account di servizio per il tuo ambiente non è un account utente. Un account di servizio è un particolare tipo di account utilizzato da un'applicazione o da un'istanza di macchina virtuale (VM), non da una persona fisica.
Per creare un account di servizio per il tuo ambiente:
Crea un nuovo account di servizio, come descritto nella documentazione di Identity and Access Management.
Concedere un ruolo, come descritto nella documentazione di Identity and Access Management. Il ruolo richiesto è Worker Composer (
composer.worker
).
Creazione di un ambiente
Crea un nuovo ambiente denominato example-environment
nella regione us-central1
con l'ultima versione di Cloud Composer 3.
gcloud composer environments create example-environment \
--location us-central1 \
--image-version composer-3-airflow-2.10.2-build.7
Creare un file DAG
Un DAG Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti in file Python standard.
Questa guida utilizza un DAG Airflow di esempio definito nel file quickstart.py
.
Il codice Python in questo file esegue le seguenti operazioni:
- Crea un DAG,
composer_sample_dag
. Questo DAG viene eseguito ogni giorno. - Esegue un'attività,
print_dag_run_conf
. L'attività stampa la configurazione della esecuzione del DAG utilizzando l'operatore bash.
Salva una copia del file quickstart.py
sulla tua macchina locale:
Carica il file DAG nel bucket dell'ambiente
A ogni ambiente Cloud Composer è associato un bucket Cloud Storage. Airflow in Cloud Composer pianifica solo i DAG che si trovano nella cartella /dags
di questo bucket.
Per pianificare il DAG, carica quickstart.py
dalla tua macchina locale nella cartella /dags
dell'ambiente:
Per caricare quickstart.py
con Google Cloud CLI, esegui il seguente comando nella
cartella in cui si trova il file quickstart.py
:
gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py
Visualizza il DAG
Dopo aver caricato il file DAG, Airflow esegue le seguenti operazioni:
- Analizza il file DAG che hai caricato. Potrebbero essere necessari alcuni minuti prima che il DAG sia disponibile per Airflow.
- Aggiunge il DAG all'elenco dei DAG disponibili.
- Esegue il DAG in base alla pianificazione specificata nel file DAG.
Controlla che il DAG venga elaborato senza errori e che sia disponibile in Airflow visualizzandolo nell'interfaccia utente del DAG. L'interfaccia utente DAG è l'interfaccia di Cloud Composer per visualizzare le informazioni sul DAG nella console Google Cloud. Cloud Composer fornisce inoltre accesso all'interfaccia utente di Airflow, un'interfaccia web nativa di Airflow.
Attendi circa cinque minuti per dare ad Airflow il tempo di elaborare il file DAG caricato in precedenza e di completare la prima esecuzione del DAG (spiegato di seguito).
Esegui il seguente comando nell'interfaccia a riga di comando Google Cloud. Questo comando esegue il comando dell'interfaccia a riga di comando Airflow
dags list
che elenca i DAG nel tuo ambiente.gcloud composer environments run example-environment \ --location us-central1 \ dags list
Verifica che il DAG
composer_quickstart
sia elencato nell'output del comando.Output di esempio:
Executing the command: [ airflow dags list ]... Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb Use ctrl-c to interrupt the command dag_id | filepath | owner | paused ====================+=======================+==================+======= airflow_monitoring | airflow_monitoring.py | airflow | False composer_quickstart | dag-quickstart-af2.py | Composer Example | False
Visualizzare i dettagli dell'esecuzione del DAG
Una singola esecuzione di un DAG è chiamata esecuzione del DAG. Airflow esegue immediatamente un'esecuzione del DAG di esempio perché la data di inizio nel file DAG è impostata su ieri. In questo modo, Airflow si allinea alla pianificazione del DAG specificato.
Il DAG di esempio contiene un'attività, print_dag_run_conf
, che esegue il comando echo
nella console. Questo comando genera metadati sul DAG
(identificatore numerico dell'esecuzione del DAG).
Esegui il seguente comando nell'interfaccia a riga di comando Google Cloud. Questo comando elenca le esecuzioni del DAG
per il DAG composer_quickstart
:
gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart
Output di esempio:
dag_id | run_id | state | execution_date | start_date | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
La CLI di Airflow non fornisce un comando per visualizzare i log delle attività. Puoi utilizzare altri metodi per visualizzare i log delle attività di Airflow: interfaccia utente DAG di Cloud Composer, interfaccia utente di Airflow o Cloud Logging. Questa guida illustra un modo per eseguire query su Cloud Logging per trovare i log di un'esecuzione DAG specifica.
Esegui il seguente comando nell'interfaccia a riga di comando Google Cloud. Questo comando legge i log da Cloud Logging per un'esecuzione specifica del DAG del DAG composer_quickstart
. L'argomento
--format
formatta l'output in modo da visualizzare solo il testo del messaggio di log.
gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"
Sostituisci:
RUN_ID
con il valorerun_id
dell'output del comandotasks states-for-dag-run
eseguito in precedenza. Ad esempio,2024-02-17T15:38:38.969307+00:00
.
Output di esempio:
...
Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task
...
Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746
...
Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check
Esegui la pulizia
Per evitare che al tuo Google Cloud account vengano addebitati costi per le risorse utilizzate su questa pagina, elimina il progetto Google Cloud con le risorse.
Elimina le risorse utilizzate in questo tutorial:
Elimina l'ambiente Cloud Composer:
Nella console Google Cloud, vai alla pagina Ambienti.
Seleziona
example-environment
e fai clic su Elimina.Attendi che l'ambiente venga eliminato.
Elimina il bucket dell'ambiente. L'eliminazione dell'ambiente Cloud Composer non comporta l'eliminazione del relativo bucket.
Nella console Google Cloud, vai alla pagina Storage > Browser.
Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato
us-central1-example-environ-c1616fe8-bucket
.
Passaggi successivi