Agrega y actualiza DAG (flujos de trabajo)

En esta página, se describe cómo determinar el bucket de almacenamiento de tu entorno y cómo agregar, actualizar y borrar un DAG del entorno.

Cloud Composer usa Cloud Storage para almacenar los DAG de Apache Airflow, también conocidos como flujos de trabajo. Cada entorno está asociado a un bucket de Cloud Storage. Cloud Composer programa solo los DAG del bucket de Cloud Storage.

Antes de comenzar

  • Debido a que Apache Airflow no proporciona un aislamiento eficaz del DAG, recomendamos que mantengas entornos de prueba y producción separados para evitar la interferencia del DAG. Para obtener más información, consulta Probar DAG.
  • Estos son los permisos necesarios para agregar y actualizar complementos en el depósito de Cloud Storage del entorno de Cloud Composer:
    • storage.objectAdmin para subir archivos
    • composer.environments.get para buscar el depósito de destino del DAG Este permiso no es obligatorio cuando se usa la API de Cloud Storage o gsutil.
  • Los cambios en el DAG tardan de 3 a 5 minutos. Puedes ver el estado de la tarea en la interfaz web de Airflow.

Determina el nombre del bucket de almacenamiento

Para determinar el nombre del bucket de almacenamiento asociado con tu entorno, haz lo siguiente:

Console

  1. Abre la página Entornos en Cloud Console.
    Abrir la página Entornos
  2. En la columna Nombre, haz clic en el nombre del entorno para abrir la página de detalles correspondiente.

    En la pestaña Configuración, el nombre del depósito de Cloud Storage se muestra a la derecha de la carpetaDAG.

  3. Para ver el bucket en Cloud Storage, haz clic en su nombre (opcional).

gcloud

Ingresa el siguiente comando:

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine donde se encuentra el entorno.
  • --format es una opción para especificar solo la propiedad dagGcsPrefix, en lugar de todos los detalles del entorno.

La propiedad dagGcsPrefix muestra el nombre del depósito:

gs://region-environment_name-random_id-bucket/

rest

Para obtener información sobre las credenciales, consulta Cloud Composer Authentication.

  1. Llama al método projects.locations.environments.get.
  2. Lee el config.dagGcsPrefix de la respuesta del Entorno.

rpc

Para obtener información sobre las credenciales, consulta Cloud Composer Authentication.

  1. Llama al método Environments.GetEnvironment.
  2. Lee el config.dag_gcs_prefix de la respuesta del Entorno.

Python

Usa la biblioteca google-auth a fin de obtener credenciales y la biblioteca requests para llamar a la API de REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Agrega o actualiza un DAG

Si deseas agregar o actualizar un DAG, mueve el archivo .py de Python para el DAG a la carpeta dags del entorno en Cloud Storage.

Console

  1. Abre la página Entornos en Cloud Console.
    Abrir la página Entornos
  2. En la columna Nombre, haz clic en el nombre del entorno para abrir la página de detalles correspondiente.

    En la pestaña Configuración, el nombre del depósito de Cloud Storage se muestra a la derecha de la carpeta DAG.

  3. Para ver el bucket de Cloud Storage, haz clic en el nombre del bucket.

    De forma predeterminada, se abre la carpeta dags.

  4. Haz clic en Subir archivos y selecciona la copia local del DAG que deseas subir.

  5. Para subir el archivo a la carpeta dags, haz clic en Abrir.

gcloud

Para agregar o actualizar un DAG, usa el siguiente comando:

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • LOCAL_FILE_TO_UPLOAD es el DAG que se debe subir.

Actualiza un DAG que tenga ejecuciones de DAG activas

Si actualizas un DAG que tiene ejecuciones activas de DAG:

  • Todas las tareas que se ejecutan actualmente terminan de usar el archivo DAG original.
  • Todas las tareas programadas, pero que no se ejecutan actualmente, usan el archivo DAG actualizado.
  • Todas las tareas que ya no estén presentes en el archivo DAG actualizado se marcan como quitadas.

Actualiza los DAG que se ejecutan con frecuencia

Después de subir un archivo DAG, Airflow demorará en cargar y actualizar el DAG. Si su DAG se ejecuta con frecuencia, puede asegurarse de que el DAG use la versión actualizada del archivo DAG. Para ello, deberás hacer lo siguiente:

  1. Pausa el DAG en la IU de Airflow.
  2. Sube un archivo DAG actualizado.
  3. Espere hasta ver las actualizaciones en la IU de Airflow. Esto significa que el programador analizó correctamente el DAG y que se actualizó en la base de datos de Airflow.

    Si la IU de Airflow muestra los DAG actualizados, esto no garantiza que los trabajadores de Airflow tengan la versión actualizada del archivo DAG. Esto sucede porque los archivos DAG se sincronizan de forma independiente para los programadores y los trabajadores.

  4. Te recomendamos extender el tiempo de espera para asegurarte de que el archivo DAG esté sincronizado con todos los trabajadores de tu entorno. La sincronización se realiza varias veces por minuto. En un entorno en buen estado, esperar entre 20 y 30 segundos es suficiente para que todos los trabajadores se sincronicen.

  5. (Opcional) Si quieres asegurarte de que todos los trabajadores tengan la versión nueva del archivo DAG, inspecciona los registros para cada trabajador individual. Para ello, deberás hacer lo siguiente:

    1. En Console, abra la pestaña Registros para su entorno.
    2. Ve a Registros de Composer > Infraestructura > elemento de sincronización de Cloud Storage e inspecciona los registros para cada trabajador de tu entorno. Busca el elemento de registro Syncing dags directory más reciente que tiene una marca de tiempo después de que subiste el archivo DAG nuevo. Si ves un elemento Finished syncing que lo siga, los DAG se sincronizaron de forma correcta en este trabajador.
  6. Reanuda el DAG.

Borra un DAG

En esta sección, se describe cómo borrar un DAG.

Borra un DAG en tu entorno

Para borrar un DAG, quita el archivo .py de Python correspondiente al DAG de la carpeta dags del entorno de Cloud Storage. Cuando se borra un DAG, sus metadatos no se quitan de la interfaz web de Airflow.

Console

  1. Ve a la página Entornos en Cloud Console.
    Abrir la página Entornos
  2. En la columna Nombre, haz clic en el nombre del entorno para abrir la página de detalles del entorno.

    En la pestaña Configuración, el nombre del depósito de Cloud Storage se muestra a la derecha de la carpeta DAGs.

  3. Para ver el bucket de Cloud Storage, haz clic en el nombre del bucket.

    De forma predeterminada, se abre la carpeta dags.

  4. Haz clic en la casilla de verificación junto al DAG que deseas borrar.

  5. En la parte superior de Cloud Console, haz clic en Borrar.

  6. En el cuadro de diálogo que aparece, haz clic en Aceptar.

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • DAG_NAME.py es el DAG que se debe borrar.
  • Airflow 1.9.0: los metadatos de los DAG borrados permanecen visibles en la interfaz web de Airflow.

  • Airflow 1.10.0 o posterior: puedes usar la herramienta de gcloud para quitar los metadatos del DAG.

Quita un DAG de la interfaz web de Airflow

Para quitar los metadatos de un DAG de la interfaz web de Airflow, ingresa el siguiente código:

CLI de Airflow 1.10

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

Airflow 2.0 CLI

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • DAG_NAME es el nombre del DAG que se debe borrar.

¿Qué sigue?