Ejecuta un DAG de Apache Airflow en Cloud Composer 2

Cloud Composer 1 | Cloud Composer 2

En esta página, se muestra cómo crear un entorno de Cloud Composer y cómo ejecutar un DAG de Apache Airflow en Cloud Composer 2.

Si recién comienzas a usar Airflow, consulta este instructivo para obtener más información sobre los conceptos, los objetos y su uso de Airflow.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  4. En la página del selector de proyectos de la consola de Google Cloud, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  5. Asegúrate de que la facturación esté habilitada para tu proyecto de Google Cloud.

  6. Habilita la API de Cloud Composer.

    Habilita la API

Crear un entorno

Console

  1. En la consola de Google Cloud, ve a la página Crear entorno.

    Ir a Crear entorno

  2. Cuando creas un entorno en tu proyecto, si el agente de servicio de Cloud Composer no tiene los permisos necesarios en la cuenta de servicio del entorno, aparecerá la sección Otorga los permisos necesarios a la cuenta de servicio de Cloud Composer.

    Agrega la cuenta del agente de servicio de Cloud Composer como una principal nueva a la cuenta de servicio de tu entorno y le otorgas el rol Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

    Confirma que usas la cuenta de servicio prevista para tu entorno y haz clic en Otorgar.

  3. En el campo Nombre, ingresa example-environment.

  4. En la lista desplegable Ubicación, selecciona una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.

  5. Para otras opciones de configuración del entorno, usa los valores predeterminados proporcionados.

  6. Para crear el entorno, haz clic en Crear.

  7. Espera hasta que se cree el entorno. Cuando termine, aparecerá una marca de verificación verde junto al nombre del entorno.

gcloud

Agrega la cuenta de agente de servicio de Cloud Composer como una cuenta principal nueva en la cuenta de servicio de tu entorno y otórgale el rol Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

De forma predeterminada, tu entorno usa la cuenta de servicio predeterminada de Compute Engine.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Crea un entorno nuevo:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno. En esta guía de inicio rápido, se usa example-environment.
  • LOCATION por una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.
  • IMAGE_VERSION por el nombre de la imagen de Cloud Composer En esta guía, se usa composer-2.6.6-airflow-2.6.3 para crear un entorno con la imagen más reciente de Cloud Composer 2.

Ejemplo:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-2.6.6-airflow-2.6.3

Terraform

Para configurar este entorno mediante Terraform, agrega el siguiente bloque de recursos a la configuración de Terraform y ejecuta terraform apply.

Para usar este bloque de recursos, la cuenta de servicio que usa Terraform debe tener un rol con el permiso composer.environments.create habilitado. Si quieres obtener más información sobre la cuenta de servicio para Terraform, consulta la Referencia de configuración de proveedores de Google.

Si deseas obtener más información sobre el uso de Terraform a fin de crear un entorno de Cloud Composer, consulta la documentación de Terraform.

resource "google_composer_environment" "example" {
  provider = google-beta
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME por el nombre del entorno. En esta guía de inicio rápido, se usa example-environment.

  • LOCATION por una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.

  • IMAGE_VERSION por el nombre de la imagen de Cloud Composer En esta guía, se usa composer-2.6.6-airflow-2.6.3 para crear un entorno con la imagen más reciente de Cloud Composer 2.

Ejemplo:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.6.6-airflow-2.6.3"
    }
  }

}

Consulta los detalles del entorno

Una vez finalizada la creación del entorno, puedes ver la información del entorno, como la versión de Cloud Composer, la URL de la interfaz web de Airflow y la carpeta de los DAG en Cloud Storage.

Para ver la información del entorno, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Para ver la página Detalles del entorno, haz clic en el nombre de tu entorno, example-environment.

Crea un DAG

Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python.

El código de Python en quickstart.py realiza estas tareas:

  1. Crea un DAG composer_sample_dag. El DAG se ejecuta una vez al día.
  2. Ejecuta una tarea print_dag_run_conf. La tarea imprime la configuración de ejecución del DAG con el operador bash.

Para crear un DAG, crea una copia del archivo quickstart.py en tu máquina local.

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Sube el DAG a Cloud Storage

Cloud Composer programa solo los DAG que se encuentran en la carpeta /dags del bucket de Cloud Storage del entorno.

Para programar tu DAG, sube quickstart.py desde tu máquina local a la carpeta /dags del entorno.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Para abrir la carpeta /dags, haz clic en el vínculo Carpeta DAG de example-environment.

  3. En la página de detalles del depósito, haz clic en Subir archivos y selecciona tu copia local de quickstart.py.

  4. Para subir el archivo, haz clic en Abrir.

    Después de subir el DAG, Cloud Composer agrega el DAG a Airflow y programa una ejecución del DAG de inmediato. Es posible que el DAG tarde unos minutos en aparecer en la interfaz web de Airflow.

gcloud

Para subir quickstart.py con gcloud, ejecuta el siguiente comando:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Visualiza el DAG en la IU de Airflow

Cada entorno de Cloud Composer tiene un servidor web que ejecuta la interfaz web de Airflow. Puedes administrar los DAG desde la interfaz web de Airflow.

Para ver el DAG en la interfaz web de Airflow, sigue estos pasos:

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. Para abrir la interfaz web de Airflow, haz clic en el vínculo Airflow para example-environment. Se abrirá la IU de Airflow en una nueva ventana del navegador.

  3. En la barra de herramientas de Airflow, ve a la página DAG.

  4. Para abrir la página de detalles del DAG, haz clic en composer_sample_dag.

    Página de DAG en la IU de Airflow
    Figure 1. Página de DAG en la IU de Airflow (haz clic para ampliar)

    La página del DAG muestra la Vista de árbol (Tree View), una representación gráfica de las tareas y dependencias del flujo de trabajo.

    Vista de árbol del DAG composer_sample_dags
    Figure 2. Vista de árbol del DAG composer_sample_dags

Visualiza los detalles de la instancia de la tarea en los registros de Airflow

El DAG que programaste incluye la tarea print_dag_run_conf. Esta tarea imprime la configuración de ejecución del DAG, que se puede ver en los registros de Airflow para la instancia de la tarea.

Para ver los detalles de la instancia de la tarea, sigue estos pasos:

  1. En la Vista de árbol (Tree View) del DAG, en la interfaz web de Airflow, haz clic en Vista de gráfico (Graph View).

    Si mantienes el puntero sobre la tarea print_dag_run_conf, se muestra su estado.

    Vista de árbol del DAG composer_sample_dags
    Figura 3: Estado de la tarea print_dag_run_conf
  2. Haz clic en la tarea print_dag_run_conf.

    En el menú contextual Instancia de la tarea, puedes obtener metadatos y realizar algunas acciones.

    Menú contextual Instancia de tarea para la tarea composer_sample_dags
    Figura 4: Menú contextual Instancia de tarea para la tarea composer_sample_dags
  3. En el menú contextual de la instancia de la tarea, haz clic en Ver registro (View Log).

  4. En el registro, busca Running command: ['bash' para ver el resultado del operador bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Limpia

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que se usaron en esta página.

Borra los recursos que se usaron en este instructivo:

  1. Borra el entorno de Cloud Composer:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. Selecciona example-environment y haz clic en Borrar.

    3. Espera hasta que se borre el entorno.

  2. Borra el bucket de tu entorno. Aunque borres el entorno de Cloud Composer, no se borra el bucket.

    1. En la consola de Google Cloud, vaya a la página Almacenamiento > Navegador.

      Ir a Almacenamiento > Navegador

    2. Selecciona el bucket del entorno y haz clic en Borrar. Por ejemplo, este bucket puede llamarse us-central1-example-environ-c1616fe8-bucket.

  3. Borra el disco persistente de la cola de Redis de tu entorno. Aunque borres el entorno de Cloud Composer, no se borrará el disco persistente.

    1. En la consola de Google Cloud, ve a Compute Engine > Discos.

      Ir a Discos

    2. Selecciona el disco persistente de la cola de Redis del entorno y haz clic en Borrar.

      Por ejemplo, este disco puede llamarse pvc-02bc4842-2312-4347-8519-d87bdcd31115. Los discos para Cloud Composer 2 siempre tienen el tipo Balanced persistent disk y el tamaño de 2 GB.

¿Qué sigue?