Ejecuta un DAG de Apache Airflow en Cloud Composer 3 (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta guía de inicio rápido, se muestra cómo crear un entorno de Cloud Composer y ejecutar un DAG de Apache Airflow en Cloud Composer 3.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Install the Google Cloud CLI.
  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. Si quieres obtener los permisos que necesitas para completar esta guía de inicio rápido, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:

    Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

    También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Crear un entorno

Crea un nuevo entorno llamado example-environment en us-central1 región con la versión más reciente de Cloud Composer 3.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-3-airflow-2.9.1-build.8

Crea un archivo DAG

Un DAG de Airflow es una colección de tareas organizadas que que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python.

En esta guía, se usa un ejemplo de DAG de Airflow definido en el archivo quickstart.py. El código de Python de este archivo hace lo siguiente:

  1. Crea un DAG composer_sample_dag. Este DAG se ejecuta todos los días.
  2. Ejecuta una tarea print_dag_run_conf. La tarea imprime la configuración de ejecución del DAG con el operador bash.

Guarda una copia del archivo quickstart.py en tu máquina local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Sube el archivo DAG al bucket de tu entorno

Cada entorno de Cloud Composer tiene un bucket de Cloud Storage asociado. Airflow solo en los programas de Cloud Composer DAG que se encuentran en la carpeta /dags de este bucket.

Para programar el DAG, sube quickstart.py desde tu máquina local a tu la carpeta /dags del entorno:

Para subir quickstart.py con Google Cloud CLI, ejecuta el siguiente comando en la carpeta en la que se encuentra el archivo quickstart.py:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Ver el DAG

Después de subir el archivo DAG, Airflow hace lo siguiente:

  1. Analiza el archivo DAG que subiste. Es posible que el DAG tarde unos minutos en estar disponible para Airflow.
  2. Agrega el DAG a la lista de DAGs disponibles.
  3. Ejecuta el DAG según el programa que proporcionaste en el archivo DAG.

Para verificar que tu DAG se procese sin errores y esté disponible en Airflow, obsérvalo en la IU de DAG. La IU de DAG es la interfaz de Cloud Composer para ver información de DAG en la consola de Google Cloud. Cloud Composer también proporciona acceso a la IU de Airflow, que es una interfaz web nativa de Airflow.

  1. Espera unos cinco minutos para darle tiempo a Airflow de procesar el archivo DAG que subiste antes y completar la primera ejecución del DAG (se explica más adelante).

  2. Ejecuta el siguiente comando en Google Cloud CLI. Este comando ejecuta el dags list Comando de la CLI de Airflow que enumera los DAG en tu en un entorno de nube.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifica que el DAG de composer_quickstart aparezca en el resultado del comando.

    Salida de ejemplo:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Ver detalles de ejecución del DAG

Una sola ejecución de un DAG se denomina ejecución de DAG. Airflow ejecuta de inmediato una ejecución de DAG para el DAG de ejemplo porque la fecha de inicio en el archivo DAG está establecida en ayer. De esta manera, Airflow se pone al día con la programación del DAG especificado.

El DAG de ejemplo contiene una tarea, print_dag_run_conf, que ejecuta el comando echo en la consola. Este comando genera información meta sobre el DAG (identificador numérico de la ejecución de DAG).

Ejecuta el siguiente comando en Google Cloud CLI. Este comando muestra una lista de las ejecuciones de DAG para el DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Salida de ejemplo:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

La CLI de Airflow no proporciona un comando para ver los registros de las tareas. Puedes usar otros métodos para ver los registros de tareas de Airflow: la IU de DAG de Cloud Composer, la IU de Airflow o Cloud Logging. Esta guía muestra una forma de consultar Cloud Logging en busca de registros de una ejecución de DAG específica.

Ejecuta el siguiente comando en Google Cloud CLI. Este comando lee los registros de Cloud Logging de una ejecución de DAG específica del DAG composer_quickstart. El El argumento --format formatea el resultado para que solo se muestre el texto del mensaje de registro. el código fuente.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Reemplaza lo siguiente:

  • RUN_ID con el valor run_id del resultado del comando tasks states-for-dag-run que ejecutaste antes. Por ejemplo, 2024-02-17T15:38:38.969307+00:00.

Salida de ejemplo:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página, borra el proyecto de Cloud que tiene los recursos.

Borra los recursos que se usaron en este instructivo:

  1. Borra el entorno de Cloud Composer:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. Selecciona example-environment y haz clic en Borrar.

    3. Espera hasta que se borre el entorno.

  2. Borra el bucket de tu entorno. Aunque borres el entorno de Cloud Composer, no se borra el bucket.

    1. En la consola de Google Cloud, ve a la página Almacenamiento > Navegador.

      Ir a Almacenamiento > Navegador

    2. Selecciona el bucket del entorno y haz clic en Borrar. Por ejemplo, este bucket puede llamarse us-central1-example-environ-c1616fe8-bucket.

  3. Borra el disco persistente de la cola de Redis de tu entorno. Aunque borres el entorno de Cloud Composer, no se borrará el disco persistente.

    1. En la consola de Google Cloud, ve a Compute Engine > Discos.

      Ir a Discos

    2. Selecciona el disco persistente de la cola de Redis del entorno y haz clic en Borrar.

      Por ejemplo, este disco puede llamarse gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Los discos de Cloud Composer 1 siempre tienen el tipo Standard persistent disk y un tamaño de 2 GB.

¿Qué sigue?