Añadir y actualizar DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se describe cómo gestionar los DAGs en tu entorno de Cloud Composer.

Cloud Composer usa un segmento de Cloud Storage para almacenar los DAGs de tu entorno de Cloud Composer. Tu entorno sincroniza los DAGs de este bucket con los componentes de Airflow, como los trabajadores y los programadores de Airflow.

Antes de empezar

  • Como Apache Airflow no proporciona un aislamiento sólido de los DAGs, te recomendamos que mantengas entornos de producción y de prueba independientes para evitar interferencias entre los DAGs. Para obtener más información, consulta Probar DAGs.
  • Asegúrate de que tu cuenta tiene suficientes permisos para gestionar DAGs.
  • Los cambios en el DAG se propagan a Airflow en un plazo de entre 3 y 5 minutos. Puedes ver el estado de las tareas en la interfaz web de Airflow.

Acceder al bucket de tu entorno

Para acceder al contenedor asociado a tu entorno, sigue estos pasos:

Consola

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta DAGs, haz clic en el enlace DAGs. Se abrirá la página Detalles del segmento. Muestra el contenido de la carpeta /dags en el contenedor de tu entorno.

gcloud

gcloud CLI tiene comandos independientes para añadir y eliminar DAGs en el bucket de tu entorno.

Si quieres interactuar con el bucket de tu entorno, también puedes usar Google Cloud CLI. Para obtener la dirección del bucket de tu entorno, ejecuta el siguiente comando de gcloud CLI:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Sustituye:

  • ENVIRONMENT_NAME con el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.

Ejemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crea una solicitud de la API environments.get. En el recurso Environment, en el recurso EnvironmentConfig, en el recurso dagGcsPrefix se encuentra la dirección del bucket de tu entorno.

Ejemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Usa la biblioteca google-auth para obtener las credenciales y la biblioteca requests para llamar a la API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Añadir o actualizar un DAG

Para añadir o actualizar un DAG, mueve el archivo Python .py del DAG a la carpeta /dags del bucket del entorno.

Consola

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta DAGs, haz clic en el enlace DAGs. Se abrirá la página Detalles del segmento. Muestra el contenido de la carpeta /dags en el contenedor de tu entorno.

  3. Haz clic en Subir archivos. A continuación, selecciona el archivo .py de Python del DAG en el cuadro de diálogo del navegador y confirma la acción.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Sustituye:

  • ENVIRONMENT_NAME con el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.
  • LOCAL_FILE_TO_UPLOAD es el archivo .py de Python del DAG.

Ejemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Actualizar un DAG que tenga ejecuciones activas

Si actualizas un DAG que tiene ejecuciones activas:

  • Todas las tareas que se estén ejecutando en ese momento se completarán con el archivo DAG original.
  • Todas las tareas programadas que no se estén ejecutando en ese momento usarán el archivo DAG actualizado.
  • Todas las tareas que ya no estén en el archivo DAG actualizado se marcarán como eliminadas.

Actualizar los DAGs que se ejecutan con frecuencia

Después de subir un archivo DAG, Airflow tarda un tiempo en cargar el archivo y actualizar el DAG. Si tu DAG se ejecuta con una frecuencia alta, te recomendamos que te asegures de que usa la versión actualizada del archivo DAG. Para ello:

  1. Pausa el DAG en la interfaz de Airflow.

  2. Sube un archivo DAG actualizado.

  3. Espera hasta que veas las actualizaciones en la interfaz de usuario de Airflow. Esto significa que el programador ha analizado correctamente el DAG y lo ha actualizado en la base de datos de Airflow.

    Si la interfaz de usuario de Airflow muestra los DAGs actualizados, no se garantiza que los workers de Airflow tengan la versión actualizada del archivo DAG. Esto ocurre porque los archivos DAG se sincronizan de forma independiente para los programadores y los trabajadores.

  4. Puede que quieras ampliar el tiempo de espera para asegurarte de que el archivo DAG se sincroniza con todos los trabajadores de tu entorno. La sincronización se produce varias veces por minuto. En un entorno correcto, esperar entre 20 y 30 segundos es suficiente para que se sincronicen todos los trabajadores.

  5. (Opcional) Si quieres asegurarte de que todos los trabajadores tienen la nueva versión del archivo DAG, inspecciona los registros de cada trabajador. Para ello:

    1. Abre la pestaña Registros de tu entorno en la Google Cloud consola.

    2. Ve a Registros de Composer > Infraestructura > Sincronización de Cloud Storage e inspecciona los registros de cada trabajador de tu entorno. Busca el elemento de registro Syncing dags directory más reciente que tenga una marca de tiempo posterior a la subida del nuevo archivo DAG. Si ves un elemento Finished syncing después, significa que los DAGs se han sincronizado correctamente en este trabajador.

  6. Reanuda el DAG.

Eliminar un DAG de tu entorno

Para eliminar un DAG, quita el archivo .py de Python del DAG de la carpeta /dags del entorno en el bucket del entorno.

Consola

  1. En la Google Cloud consola, ve a la página Entornos.

    Ir a Entornos

  2. En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta DAGs, haz clic en el enlace DAGs. Se abrirá la página Detalles del segmento. Muestra el contenido de la carpeta /dags en el contenedor de tu entorno.

  3. Selecciona el archivo DAG, haz clic en Eliminar y confirma la operación.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Sustituye:

  • ENVIRONMENT_NAME con el nombre del entorno.
  • LOCATION por la región en la que se encuentra el entorno.
  • DAG_FILE con el archivo .py de Python del DAG.

Ejemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Quitar un DAG de la interfaz de usuario de Airflow

Para quitar los metadatos de un DAG de la interfaz web de Airflow, sigue estos pasos:

Interfaz de usuario de Airflow

  1. Ve a la interfaz de usuario de Airflow de tu entorno.
  2. En el DAG, haz clic en Eliminar DAG.

gcloud

En las versiones de Airflow 1 anteriores a la 1.14.0, ejecuta el siguiente comando en la CLI de gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

En Airflow 2, Airflow 1.14.0 y versiones posteriores, ejecuta el siguiente comando en la CLI de gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Sustituye:

  • ENVIRONMENT_NAME con el nombre del entorno.
  • LOCATION con la región en la que se encuentra el entorno.
  • DAG_NAME es el nombre del DAG que se va a eliminar.

Siguientes pasos