Ejecutar un DAG de Apache Airflow en Cloud Composer 1 (CLI de Google Cloud)

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta guía de inicio rápido se explica cómo crear un entorno de Cloud Composer y ejecutar un DAG de Apache Airflow en Cloud Composer 1.

Antes de empezar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  4. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Install the Google Cloud CLI.

  8. Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

  9. Para inicializar gcloud CLI, ejecuta el siguiente comando:

    gcloud init
  10. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  11. Verify that billing is enabled for your Google Cloud project.

  12. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  13. Para obtener los permisos que necesitas para completar esta guía de inicio rápido, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos en tu proyecto:

    Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

    También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

  14. Crear una cuenta de servicio de un entorno

    Cuando creas un entorno, especificas una cuenta de servicio. Esta cuenta de servicio se llama cuenta de servicio del entorno. Tu entorno usa esta cuenta de servicio para realizar la mayoría de las operaciones.

    La cuenta de servicio de tu entorno no es una cuenta de usuario. Una cuenta de servicio es un tipo especial de cuenta que usan las aplicaciones o las instancias de máquinas virtuales (VMs), no las personas.

    Para crear una cuenta de servicio para tu entorno, sigue estos pasos:

    1. Crea una cuenta de servicio, tal como se describe en la documentación de gestión de identidades y accesos.

    2. Asigna un rol, tal como se describe en la documentación de Gestión de Identidades y Accesos. El rol necesario es Trabajador de Composer (composer.worker).

    Crear entorno

    Crea un entorno llamado example-environment en la región us-central1 con la versión más reciente de Cloud Composer 1.

    gcloud composer environments create example-environment \
        --location us-central1 \
        --image-version composer-1.20.12-airflow-1.10.15
    

    Crear un archivo DAG

    Un DAG de Airflow es una colección de tareas organizadas que quieres programar y ejecutar. Los DAGs se definen en archivos Python estándar.

    En esta guía se usa un ejemplo de DAG de Airflow definido en el archivo quickstart.py. El código Python de este archivo hace lo siguiente:

    1. Crea un DAG, composer_sample_dag. Este DAG se ejecuta todos los días.
    2. Ejecuta una tarea, print_dag_run_conf. La tarea imprime la configuración de la ejecución del DAG mediante el operador bash.

    Guarda una copia del archivo quickstart.py en tu máquina local:

    import datetime
    
    from airflow import models
    from airflow.operators import bash
    
    # If you are running Airflow in more than one time zone
    # see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
    # for best practices
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    
    default_args = {
        "owner": "Composer Example",
        "depends_on_past": False,
        "email": [""],
        "email_on_failure": False,
        "email_on_retry": False,
        "retries": 1,
        "retry_delay": datetime.timedelta(minutes=5),
        "start_date": YESTERDAY,
    }
    
    with models.DAG(
        "composer_quickstart",
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1),
    ) as dag:
        # Print the dag_run id from the Airflow logs
        print_dag_run_conf = bash.BashOperator(
            task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
        )

    Subir el archivo DAG al segmento de tu entorno

    Cada entorno de Cloud Composer tiene un bucket de Cloud Storage asociado. Airflow en Cloud Composer solo programa los DAGs que se encuentran en la carpeta /dags de este bucket.

    Para programar tu DAG, sube quickstart.py desde tu máquina local a la carpeta /dags de tu entorno:

    Para subir quickstart.py con la CLI de Google Cloud, ejecuta el siguiente comando en la carpeta donde se encuentra el archivo quickstart.py:

    gcloud composer environments storage dags import \
    --environment example-environment --location us-central1 \
    --source quickstart.py
    

    Ver el DAG

    Después de subir el archivo DAG, Airflow hace lo siguiente:

    1. Analiza el archivo DAG que has subido. El DAG puede tardar unos minutos en estar disponible en Airflow.
    2. Añade el DAG a la lista de DAGs disponibles.
    3. Ejecuta el DAG según la programación que hayas indicado en el archivo DAG.

    Comprueba que tu DAG se procese sin errores y que esté disponible en Airflow. Para ello, míralo en la interfaz de usuario de DAG. La interfaz de usuario de DAGs es la interfaz de Cloud Composer para ver información de los DAGs en la consola. Google Cloud Cloud Composer también proporciona acceso a la interfaz de usuario de Airflow, que es una interfaz web nativa de Airflow.

    1. Espera unos cinco minutos para que Airflow procese el archivo DAG que has subido anteriormente y para que se complete la primera ejecución del DAG (se explica más adelante).

    2. Ejecuta el siguiente comando en la CLI de Google Cloud. Este comando ejecuta el dags list comando de la CLI de Airflow que muestra los DAGs de tu entorno.

      gcloud composer environments run example-environment \
      --location us-central1 \
      dags list
      
    3. Comprueba que el composer_quickstart DAG aparece en la salida del comando.

      Ejemplo:

      Executing the command: [ airflow dags list ]...
      Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
      Use ctrl-c to interrupt the command
      dag_id              | filepath              | owner            | paused
      ====================+=======================+==================+=======
      airflow_monitoring  | airflow_monitoring.py | airflow          | False
      composer_quickstart | dag-quickstart-af2.py | Composer Example | False
      

    Ver los detalles de una ejecución de DAG

    Una sola ejecución de un DAG se denomina ejecución de DAG. Airflow ejecuta inmediatamente una ejecución de DAG para el DAG de ejemplo porque la fecha de inicio del archivo DAG es ayer. De esta forma, Airflow se pone al día con la programación del DAG especificado.

    El DAG de ejemplo contiene una tarea, print_dag_run_conf, que ejecuta el comando echo en la consola. Este comando muestra metainformación sobre el DAG (identificador numérico de la ejecución del DAG).

    Ejecuta el siguiente comando en la CLI de Google Cloud. Este comando muestra las ejecuciones del DAG composer_quickstart:

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list-runs -- --dag-id composer_quickstart
    

    Ejemplo:

    dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
    ====================+=============================================+=========+==================================+==================================+=================================
    composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00
    

    La CLI de Airflow no proporciona ningún comando para ver los registros de tareas. Puedes usar otros métodos para ver los registros de tareas de Airflow: la interfaz de usuario de DAG de Cloud Composer, la interfaz de usuario de Airflow o Cloud Logging. En esta guía se muestra cómo consultar registros de Cloud Logging de una ejecución de DAG específica.

    Ejecuta el siguiente comando en la CLI de Google Cloud. Este comando lee los registros de Cloud Logging de una ejecución específica del DAG composer_quickstart. El argumento --format da formato a la salida para que solo se muestre el texto del mensaje de registro.

    gcloud logging read \
    --format="value(textPayload)" \
    --order=asc \
    "resource.type=cloud_composer_environment \
    resource.labels.location=us-central1 \
    resource.labels.environment_name=example-environment \
    labels.workflow=composer_quickstart \
    (labels.\"execution-date\"=\"RUN_ID\")"
    

    Sustituye:

    • RUN_ID con el valor run_id de la salida del comando tasks states-for-dag-run que has ejecutado anteriormente. Por ejemplo, 2024-02-17T15:38:38.969307+00:00.

    Ejemplo:

    ...
    
    Starting attempt 1 of 2
    Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
    15:38:38.969307+00:00
    Started process 22544 to run task
    
    ...
    
    Running command: ['/usr/bin/bash', '-c', 'echo 115746']
    Output:
    115746
    
    ...
    
    Command exited with return code 0
    Marking task as SUCCESS. dag_id=composer_quickstart,
    task_id=print_dag_run_conf, execution_date=20240217T153838,
    start_date=20240218T153841, end_date=20240218T153841
    Task exited with return code 0
    0 downstream tasks scheduled from follow-on schedule check
    

    Limpieza

    Para evitar que se apliquen cargos en tu Google Cloud cuenta por los recursos utilizados en esta página, elimina el Google Cloud proyecto con los recursos.

    Elimina los recursos que has usado en este tutorial:

    1. Elimina el entorno de Cloud Composer:

      1. En la Google Cloud consola, ve a la página Entornos.

        Ir a Entornos

      2. Selecciona example-environment y haz clic en Eliminar.

      3. Espera a que se elimine el entorno.

    2. Elimina el segmento de tu entorno. Si eliminas el entorno de Cloud Composer, no se elimina su segmento.

      1. En la Google Cloud consola, ve a la página Almacenamiento > Navegador.

        Ve a Almacenamiento > Explorador.

      2. Selecciona el contenedor del entorno y haz clic en Eliminar. Por ejemplo, este contenedor se puede llamar us-central1-example-environ-c1616fe8-bucket.

    Siguientes pasos