Agregar y actualizar DAG

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

En esta página, se describe cómo administrar los DAG en Cloud Composer en un entorno de nube.

Cloud Composer usa un bucket de Cloud Storage para almacenar los DAG de tu entorno de Cloud Composer. El entorno se sincroniza DAG de este bucket a los componentes de Airflow, como los trabajadores de Airflow y programadores.

Antes de comenzar

  • Debido a que Apache Airflow no proporciona un aislamiento eficaz del DAG, recomendamos que mantengas entornos de prueba y producción separados para evitar la interferencia del DAG. Para obtener más información, consulta Probar DAG.
  • Asegúrate de que tu cuenta tenga los permisos suficientes para: y administrar DAG.
  • Los cambios en el DAG se propagan a Airflow en un plazo de 3 a 5 minutos. Puedes ver las tareas estado en la interfaz web de Airflow.

Accede al bucket de tu entorno

Sigue estos pasos para acceder al bucket asociado con tu entorno:

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, busca una fila con el nombre de tu entorno. y, en la columna Carpeta de DAG, haz clic en el vínculo DAG. El Detalles del bucket. Muestra el contenido de la carpeta /dags en el bucket de tu entorno.

gcloud

gcloud CLI tiene comandos independientes para agregar y borrar DAG en el bucket de tu entorno.

Si quieres interactuar con el bucket de tu entorno, también puedes usar la herramienta de línea de comandos de gsutil. Cómo obtener la dirección del bucket de tu entorno ejecuta la siguiente gcloud CLI :

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno ubicado.

Ejemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Realiza una solicitud a la API de environments.get. En el recurso Environment en la recurso EnvironmentConfig, en El recurso dagGcsPrefix es la dirección del bucket de tu entorno.

Ejemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Usa la biblioteca google-auth para obtén credenciales y usa la biblioteca requests para llamar a la API de REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Agrega o actualiza un DAG

Para agregar o actualizar un DAG, mueve el archivo .py de Python del DAG a la carpeta /dags en el bucket del entorno.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, busca una fila con el nombre de tu entorno. y, en la columna Carpeta de DAG, haz clic en el vínculo DAG. El Detalles del bucket. Muestra el contenido de la carpeta /dags en el bucket de tu entorno.

  3. Haz clic en Subir archivos. Luego, selecciona el archivo .py de Python para el DAG. usando el cuadro de diálogo del navegador y confirma tu decisión.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno ubicado.
  • LOCAL_FILE_TO_UPLOAD es el archivo .py de Python para el DAG.

Ejemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Actualiza un DAG que tenga ejecuciones activas del DAG

Si actualizas un DAG que tiene ejecuciones activas del DAG:

  • Todas las tareas que se ejecutan actualmente finalizan mediante el archivo DAG original.
  • Todas las tareas que están programadas, pero que en este momento no se están ejecutando, usan el archivo DAG actualizado.
  • Todas las tareas que ya no están presentes en el archivo DAG actualizado se marcan como o quitarse.

Actualiza los DAG que se ejecutan con frecuencia

Después de que subas un archivo DAG, Airflow demora un tiempo en cargar este archivo y actualiza el DAG. Si el DAG se ejecuta con frecuencia, asegurarte de que el DAG use la versión actualizada del archivo. Para ello, deberás hacer lo siguiente:

  1. Pausa el DAG en la IU de Airflow.

  2. Sube un archivo DAG actualizado.

  3. Espera hasta ver las actualizaciones en la IU de Airflow. Esto significa que el programador analizó correctamente el DAG y lo actualizó en la base de datos de Airflow.

    Si la IU de Airflow muestra los DAG actualizados, esto no garantiza que los trabajadores de Airflow tengan la versión actualizada del archivo de DAG. Esto sucede porque los archivos de DAG se sincronizan de forma independiente para los programadores y trabajadores.

  4. Es posible que desees extender el tiempo de espera para asegurarte de que el archivo DAG esté sincronizado con todos los trabajadores de tu entorno. La sincronización se realiza varias veces por minuto. En un entorno en buen estado, esperar entre 20 y 30 segundos es suficiente para que todos los trabajadores se sincronicen.

  5. Si deseas asegurarte de que todos los trabajadores tengan la versión nueva del archivo de DAG, inspecciona los registros de cada trabajador individual (opcional). Para ello, deberás hacer lo siguiente:

    1. Abre la pestaña Registros de tu entorno en la consola de Google Cloud.

    2. Ve a Registros de Composer > Infraestructura > elemento de sincronización de Cloud Storage e inspecciona los registros de cada trabajador en tu entorno. Busca el registro de Syncing dags directory más reciente que tiene una marca de tiempo después de que subiste el nuevo archivo DAG. Si verás un elemento Finished syncing a continuación, los DAG se sincronizó correctamente en este trabajador.

  6. Reanuda el DAG.

Borra un DAG en tu entorno

Para borrar un DAG, quita el archivo Python .py del DAG del la carpeta /dags de tu entorno en el bucket de tu entorno.

Console

  1. En la consola de Google Cloud, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, busca una fila con el nombre de tu entorno. y, en la columna Carpeta de DAG, haz clic en el vínculo DAG. El Detalles del bucket. Muestra el contenido de la carpeta /dags en en el bucket de tu entorno.

  3. Selecciona el archivo DAG, haz clic en Borrar y confirma la operación.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno ubicado.
  • DAG_FILE con el archivo .py de Python para el DAG.

Ejemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Quita un DAG de la IU de Airflow

Sigue estos pasos para quitar los metadatos de un DAG de la interfaz web de Airflow:

IU de Airflow

  1. Ve a la IU de Airflow de tu entorno.
  2. Para el DAG, haz clic en Borrar DAG.

gcloud

Ejecuta el siguiente comando en gcloud CLI:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Reemplaza lo siguiente:

  • ENVIRONMENT_NAME por el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno
  • DAG_NAME es el nombre del DAG que se debe borrar.

¿Qué sigue?