Ejecuta un DAG de Apache Airflow en Cloud Composer 1

Cloud Composer 1 | Cloud Composer 2

En esta página, se muestra cómo crear un entorno de Cloud Composer y ejecutar un DAG de Apache Airflow en Cloud Composer.

Si eres nuevo en Airflow, consulta este instructivo para obtener más información sobre los conceptos, los objetos y su uso de Airflow.

Antes de comenzar

  1. Accede a tu cuenta de Google Cloud. Si eres nuevo en Google Cloud, crea una cuenta para evaluar el rendimiento de nuestros productos en situaciones reales. Los clientes nuevos también obtienen $300 en créditos gratuitos para ejecutar, probar y, además, implementar cargas de trabajo.
  2. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  3. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  4. En la página del selector de proyectos de Google Cloud Console, selecciona o crea un proyecto de Google Cloud.

    Ir al selector de proyectos

  5. Comprueba que la facturación esté habilitada en tu proyecto.

    Descubre cómo puedes habilitar la facturación

  6. Habilita la API de Cloud Composer.

    Habilita la API

Crear un entorno

Console

  1. En Google Cloud Console, ve a la página Crear entorno.

    Ir a Crear entorno

  2. En el campo Nombre, ingresa example-environment.

  3. En la lista desplegable Ubicación, selecciona una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.

  4. Para otras opciones de configuración del entorno, usa los valores predeterminados proporcionados.

  5. Para crear el entorno, haz clic en Crear.

  6. Espera hasta que se cree el entorno. Cuando termine, aparecerá una marca de verificación verde junto al nombre del entorno.

gcloud

gcloud composer environments create ENVIRONMENT_NAME \
    --location LOCATION \
    --image-version IMAGE_VERSION

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno. En esta guía de inicio rápido, se usa example-environment.
  • LOCATION por una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.
  • IMAGE_VERSION por el nombre de la imagen de Cloud Composer En esta guía, se usa composer-1.19.6-airflow-1.10.15 para crear un entorno con la última imagen de Cloud Composer.

Ejemplo:

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.19.6-airflow-1.10.15

Terraform

Para configurar este entorno mediante Terraform, agrega el siguiente bloque de recursos a la configuración de Terraform y ejecuta terraform apply.

Si deseas obtener más información sobre el uso de Terraform a fin de crear un entorno de Cloud Composer, consulta la documentación de Terraform.

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME por el nombre del entorno. En esta guía de inicio rápido, se usa example-environment.
  • LOCATION por una región para el entorno de Cloud Composer. Consulta Regiones disponibles para obtener información sobre cómo seleccionar una región.
  • IMAGE_VERSION por el nombre de la imagen de Cloud Composer En esta guía, se usa composer-1.19.6-airflow-1.10.15 para crear un entorno con la última imagen de Cloud Composer.

Ejemplo:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.19.6-airflow-1.10.15"
    }
  }

}

Consulta los detalles del entorno

Una vez finalizada la creación del entorno, puedes ver la información del entorno, como la versión de Cloud Composer, la URL de la interfaz web de Airflow y la carpeta de los DAG en Cloud Storage.

Para ver la información del entorno, sigue estos pasos:

  1. En Google Cloud Console, ve a la página Entornos.

    Ir a Entornos

  2. Para ver la página Detalles del entorno, haz clic en el nombre de tu entorno, example-environment.

Crea un DAG

Un DAG de Airflow es una colección de tareas organizadas que deseas programar y ejecutar. Los DAG se definen en archivos estándares de Python.

El código de Python en quickstart.py realiza estas tareas:

  1. Crea un DAG composer_sample_dag. El DAG se ejecuta una vez al día.
  2. Ejecuta una tarea print_dag_run_conf. La tarea imprime la configuración de ejecución del DAG con el operador bash.

Para crear un DAG, crea una copia del archivo quickstart.py en tu máquina local.

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with models.DAG(
        'composer_quickstart',
        catchup=False,
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

Sube el DAG a Cloud Storage

Cloud Composer programa solo los DAG que se encuentran en la carpeta /dags del bucket de Cloud Storage del entorno.

Para programar tu DAG, sube quickstart.py desde tu máquina local a la carpeta /dags del entorno.

Console

  1. En Google Cloud Console, ve a la página Entornos.

    Ir a Entornos

  2. Para abrir la carpeta /dags, haz clic en el vínculo Carpeta DAG de example-environment.

  3. En la página de detalles del depósito, haz clic en Subir archivos y selecciona tu copia local de quickstart.py.

  4. Para subir el archivo, haz clic en Abrir.

    Después de subir el DAG, Cloud Composer agrega el DAG a Airflow y programa una ejecución del DAG de inmediato. Es posible que el DAG tarde unos minutos en aparecer en la interfaz web de Airflow.

gcloud

Para subir quickstart.py con gcloud, ejecuta el siguiente comando:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Vea el DAG en la IU de Airflow

Cada entorno de Cloud Composer tiene un servidor web que ejecuta la interfaz web de Airflow. Puedes administrar los DAG desde la interfaz web de Airflow.

Para ver el DAG en la interfaz web de Airflow, sigue estos pasos:

Airflow 1

  1. En Google Cloud Console, ve a la página Entornos.

    Ir a Entornos

  2. Para abrir la interfaz web de Airflow, haz clic en el vínculo Airflow para example-environment. Se abrirá la IU de Airflow en una nueva ventana del navegador.

  3. En la barra de herramientas de Airflow, ve a la página DAG.

  4. Para abrir la página de detalles del DAG, haz clic en composer_sample_dag.

    Página de DAG en la IU de Airflow
    Figure 1. Página DAG en la IU de Airflow (haz clic para ampliar)

    La página del DAG muestra la Vista de árbol (Tree View), una representación gráfica de las tareas y dependencias del flujo de trabajo.

    Vista de árbol para el DAG de composer_sample_dags
    Figure 2. Vista de árbol para el DAG de composer_sample_dags

Airflow 2

  1. En Google Cloud Console, ve a la página Entornos.

    Ir a Entornos

  2. Para abrir la interfaz web de Airflow, haz clic en el vínculo Airflow para example-environment. Se abrirá la IU de Airflow en una nueva ventana del navegador.

  3. En la barra de herramientas de Airflow, ve a la página DAG.

  4. Para abrir la página de detalles del DAG, haz clic en composer_sample_dag.

    Página de DAG en la IU de Airflow
    Figure 1. Página DAG en la IU de Airflow (haz clic para ampliar)

    La página del DAG muestra la Vista de árbol (Tree View), una representación gráfica de las tareas y dependencias del flujo de trabajo.

    Vista de árbol para el DAG de composer_sample_dags
    Figure 2. Vista de árbol para el DAG de composer_sample_dags

Visualiza los detalles de la instancia de la tarea en los registros de Airflow

El DAG que programaste incluye la tarea print_dag_run_conf. Esta tarea imprime la configuración de ejecución del DAG, que se puede ver en los registros de Airflow para la instancia de la tarea.

Para ver los detalles de la instancia de la tarea, sigue estos pasos:

Airflow 1

  1. En la Vista de árbol (Tree View) del DAG, en la interfaz web de Airflow, haz clic en Vista de gráfico (Graph View).

    Si mantienes el puntero sobre la tarea print_dag_run_conf, se muestra su estado.

    Vista de árbol para el DAG de composer_sample_dags
    Figura 3. Estado de la tarea print_dag_run_conf
  2. Haz clic en la tarea print_dag_run_conf.

    En el menú contextual Instancia de la tarea, puedes obtener metadatos y realizar algunas acciones.

    Menú contextual de la instancia de la tarea para composer_sample_dags
    Figura 4: Menú contextual de la instancia de la tarea de composer_sample_dags
  3. En el menú contextual Instancia de tarea, haz clic en Ver registro.

  4. En el registro, busca Running: ['bash' para ver el resultado del operador bash.

    Resultado del registro del operador Bash
    Figura 5. Resultado del registro del operador Bash

Airflow 2

  1. En la Vista de árbol (Tree View) del DAG, en la interfaz web de Airflow, haz clic en Vista de gráfico (Graph View).

    Si mantienes el puntero sobre la tarea print_dag_run_conf, se muestra su estado.

    Vista de árbol para el DAG de composer_sample_dags
    Figura 3. Estado de la tarea print_dag_run_conf
  2. Haz clic en la tarea print_dag_run_conf.

    En el menú contextual Instancia de la tarea, puedes obtener metadatos y realizar algunas acciones.

    Menú contextual de la instancia de la tarea para composer_sample_dags
    Figura 4: Menú contextual de la instancia de la tarea de composer_sample_dags
  3. En el menú contextual de la instancia de la tarea, haz clic en Ver registro (View Log).

  4. En el registro, busca Running command: ['bash' para ver el resultado del operador bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Realiza una limpieza

Sigue estos pasos para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos que usaste en esta página.

Borra los recursos usados en este instructivo:

  1. Borra el entorno de Cloud Composer:

    1. En Google Cloud Console, ve a la página Entornos.

      Ir a Entornos

    2. Selecciona example-environment y haz clic en Borrar.

    3. Espera hasta que se borre el entorno.

  2. Borra el bucket de tu entorno. Aunque borres el entorno de Cloud Composer, no se borra el bucket.

    1. En Google Cloud Console, ve a la página Storage Navegador.

      Ir a Almacenamiento > Navegador

    2. Selecciona el bucket del entorno y haz clic en Borrar. Por ejemplo, este bucket puede llamarse us-central1-example-environ-c1616fe8-bucket.

  3. Borra el disco persistente de la cola de Redis de tu entorno. Aunque borres el entorno de Cloud Composer, no se borrará el disco persistente.

    1. En Google Cloud Console, ve a Compute Engine > Disks.

      Ir a Discos

    2. Selecciona el disco persistente de la cola de Redis del entorno y haz clic en Borrar.

      Por ejemplo, este disco puede llamarse gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Los discos para Cloud Composer 1 siempre tienen el tipo Standard persistent disk y el tamaño de 2 GB.

¿Qué sigue?