Esegui un DAG Apache Airflow in Cloud Composer 1 (Google Cloud CLI)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Questa guida rapida mostra come creare un ambiente Cloud Composer e come eseguire un DAG Apache Airflow in Cloud Composer 1.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  9. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Verify that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  13. Per ottenere le autorizzazioni necessarie per completare questa guida rapida, chiedi all'amministratore di concederti i seguenti ruoli IAM nel tuo progetto:

    Per ulteriori informazioni sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

    Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

  14. Crea il account di servizio di un ambiente

    Quando crei un ambiente, specifichi un account di servizio. Questo service account è chiamato service account dell'ambiente. Il tuo ambiente utilizza questo account di servizio per eseguire la maggior parte delle operazioni.

    Il account di servizio per il tuo ambiente non è un account utente. Un account di serviziot è un particolare tipo di account utilizzato da un'applicazione o da un'istanza di macchina virtuale (VM), non da una persona fisica.

    Per creare un account di servizio per il tuo ambiente:

    1. Crea un nuovo service account, come descritto nella documentazione di Identity and Access Management.

    2. Assegna un ruolo, come descritto nella documentazione di Identity and Access Management. Il ruolo richiesto è Worker Composer (composer.worker).

    Creazione di un ambiente

    Crea un nuovo ambiente denominato example-environment nella regione us-central1 con l'ultima versione di Cloud Composer 1.

    gcloud composer environments create example-environment \
        --location us-central1 \
        --image-version composer-1.20.12-airflow-1.10.15
    

    Creare un file DAG

    Un DAG di Airflow è una raccolta di attività organizzate che vuoi pianificare ed eseguire. I DAG sono definiti in file Python standard.

    Questa guida utilizza un DAG di Airflow di esempio definito nel file quickstart.py. Il codice Python in questo file esegue le seguenti operazioni:

    1. Crea un DAG, composer_sample_dag. Questo DAG viene eseguito ogni giorno.
    2. Esegue un'attività, print_dag_run_conf. L'attività stampa la configurazione dell'esecuzione del DAG utilizzando l'operatore bash.

    Salva una copia del file quickstart.py sulla tua macchina locale:

    import datetime
    
    from airflow import models
    from airflow.operators import bash
    
    # If you are running Airflow in more than one time zone
    # see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
    # for best practices
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    
    default_args = {
        "owner": "Composer Example",
        "depends_on_past": False,
        "email": [""],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=5),
        "start_date": YESTERDAY,
    }
    
    with models.DAG(
        "composer_quickstart",
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:
        # Print the dag_run id from the Airflow logs
        print_dag_run_conf = bash.BashOperator(
            task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
        )

    Carica il file DAG nel bucket dell'ambiente

    Ogni ambiente Cloud Composer ha un bucket Cloud Storage associato. Airflow in Cloud Composer pianifica solo i DAG che si trovano nella cartella /dags di questo bucket.

    Per pianificare il DAG, carica quickstart.py dalla tua macchina locale nella cartella /dags del tuo ambiente:

    Per caricare quickstart.py con Google Cloud CLI, esegui questo comando nella cartella in cui si trova il file quickstart.py:

    gcloud composer environments storage dags import \
    --environment example-environment --location us-central1 \
    --source quickstart.py
    

    Visualizzare il DAG

    Dopo aver caricato il file DAG, Airflow esegue le seguenti operazioni:

    1. Analizza il file DAG che hai caricato. Potrebbero essere necessari alcuni minuti prima che il DAG diventi disponibile per Airflow.
    2. Aggiunge il DAG all'elenco dei DAG disponibili.
    3. Esegue il DAG in base alla pianificazione fornita nel file DAG.

    Verifica che il DAG venga elaborato senza errori e sia disponibile in Airflow visualizzandolo nell'interfaccia utente DAG. La UI DAG è l'interfaccia di Cloud Composer per visualizzare le informazioni sui DAG nella console Google Cloud . Cloud Composer fornisce anche l'accesso alla UI di Airflow, un'interfaccia web nativa di Airflow.

    1. Attendi circa cinque minuti per dare ad Airflow il tempo di elaborare il file DAG che hai caricato in precedenza e di completare la prima esecuzione del DAG (spiegata in seguito).

    2. Esegui questo comando in Google Cloud CLI. Questo comando esegue il dags list comando dell'interfaccia a riga di comando Airflow che elenca i DAG nel tuo ambiente.

      gcloud composer environments run example-environment \
      --location us-central1 \
      dags list
      
    3. Verifica che il DAG composer_quickstart sia elencato nell'output del comando.

      Output di esempio:

      Executing the command: [ airflow dags list ]...
      Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
      Use ctrl-c to interrupt the command
      dag_id              | filepath              | owner            | paused
      ====================+=======================+==================+=======
      airflow_monitoring  | airflow_monitoring.py | airflow          | False
      composer_quickstart | dag-quickstart-af2.py | Composer Example | False
      

    Visualizzare i dettagli dell'esecuzione del DAG

    Una singola esecuzione di un DAG è detta esecuzione di DAG. Airflow esegue immediatamente un'esecuzione DAG per il DAG di esempio perché la data di inizio nel file DAG è impostata su ieri. In questo modo, Airflow recupera la pianificazione del DAG specificato.

    Il DAG di esempio contiene un'attività, print_dag_run_conf, che esegue il comando echo nella console. Questo comando restituisce i metadati del DAG (l'identificatore numerico dell'esecuzione del DAG).

    Esegui questo comando in Google Cloud CLI. Questo comando elenca le esecuzioni di DAG per il DAG composer_quickstart:

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list-runs -- --dag-id composer_quickstart
    

    Output di esempio:

    dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
    ====================+=============================================+=========+==================================+==================================+=================================
    composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
    

    La CLI di Airflow non fornisce un comando per visualizzare i log delle attività. Puoi utilizzare altri metodi per visualizzare i log delle attività Airflow: interfaccia utente DAG di Cloud Composer, interfaccia utente Airflow o Cloud Logging. Questa guida mostra un modo per eseguire query su Cloud Logging per i log di un'esecuzione DAG specifica.

    Esegui questo comando in Google Cloud CLI. Questo comando legge i log di Cloud Logging per una specifica esecuzione del DAG composer_quickstart. L'argomento --format formatta l'output in modo che venga visualizzato solo il testo del messaggio di log.

    gcloud logging read \
    --format="value(textPayload)" \
    --order=asc \
    "resource.type=cloud_composer_environment \
    resource.labels.location=us-central1 \
    resource.labels.environment_name=example-environment \
    labels.workflow=composer_quickstart \
    (labels.\"execution-date\"=\"RUN_ID\")"
    

    Sostituisci:

    • RUN_ID con il valore run_id dell'output del comando tasks states-for-dag-run eseguito in precedenza. Ad esempio, 2024-02-17T15:38:38.969307+00:00.

    Output di esempio:

    ...
    
    Starting attempt 1 of 2
    Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
    15:38:38.969307+00:00
    Started process 22544 to run task
    
    ...
    
    Running command: ['/usr/bin/bash', '-c', 'echo 115746']
    Output:
    115746
    
    ...
    
    Command exited with return code 0
    Marking task as SUCCESS. dag_id=composer_quickstart,
    task_id=print_dag_run_conf, execution_date=20240217T153838,
    start_date=20240218T153841, end_date=20240218T153841
    Task exited with return code 0
    0 downstream tasks scheduled from follow-on schedule check
    

    Esegui la pulizia

    Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.

    Elimina le risorse utilizzate in questo tutorial:

    1. Elimina l'ambiente Cloud Composer:

      1. Nella console Google Cloud , vai alla pagina Ambienti.

        Vai ad Ambienti

      2. Seleziona example-environment e fai clic su Elimina.

      3. Attendi l'eliminazione dell'ambiente.

    2. Elimina il bucket del tuo ambiente. L'eliminazione dell'ambiente Cloud Composer non elimina il bucket.

      1. Nella console Google Cloud , vai alla pagina Storage > Browser.

        Vai ad Archiviazione > Browser.

      2. Seleziona il bucket dell'ambiente e fai clic su Elimina. Ad esempio, questo bucket può essere denominato us-central1-example-environ-c1616fe8-bucket.

    Passaggi successivi