Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página se describe cómo gestionar los DAGs en tu entorno de Cloud Composer.
Cloud Composer usa un segmento de Cloud Storage para almacenar los DAGs de tu entorno de Cloud Composer. Tu entorno sincroniza los DAGs de este bucket con los componentes de Airflow, como los trabajadores y los programadores de Airflow.
Antes de empezar
- Como Apache Airflow no proporciona un aislamiento sólido de los DAGs, te recomendamos que mantengas entornos de producción y de prueba independientes para evitar interferencias entre los DAGs. Para obtener más información, consulta Probar DAGs.
- Asegúrate de que tu cuenta tiene suficientes permisos para gestionar DAGs.
- Los cambios en el DAG se propagan a Airflow en un plazo de entre 3 y 5 minutos. Puedes ver el estado de las tareas en la interfaz web de Airflow.
Acceder al bucket de tu entorno
Para acceder al contenedor asociado a tu entorno, sigue estos pasos:
Consola
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta DAGs, haz clic en el enlace DAGs. Se abrirá la página Detalles del segmento. Muestra el contenido de la carpeta
/dags
en el contenedor de tu entorno.
gcloud
gcloud CLI tiene comandos independientes para añadir y eliminar DAGs en el bucket de tu entorno.
Si quieres interactuar con el bucket de tu entorno, también puedes usar Google Cloud CLI. Para obtener la dirección del bucket de tu entorno, ejecuta el siguiente comando de gcloud CLI:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Sustituye:
ENVIRONMENT_NAME
con el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.
Ejemplo:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Crea una solicitud de la API environments.get
. En el recurso Environment, en el recurso EnvironmentConfig, en el recurso dagGcsPrefix
se encuentra la dirección del bucket de tu entorno.
Ejemplo:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Usa la biblioteca google-auth para obtener las credenciales y la biblioteca requests para llamar a la API REST.
Añadir o actualizar un DAG
Para añadir o actualizar un DAG, mueve el archivo Python .py
del DAG a la carpeta /dags
del bucket del entorno.
Consola
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta DAGs, haz clic en el enlace DAGs. Se abrirá la página Detalles del segmento. Muestra el contenido de la carpeta
/dags
en el contenedor de tu entorno.Haz clic en Subir archivos. A continuación, selecciona el archivo
.py
de Python del DAG en el cuadro de diálogo del navegador y confirma la acción.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Sustituye:
ENVIRONMENT_NAME
con el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.LOCAL_FILE_TO_UPLOAD
es el archivo.py
de Python del DAG.
Ejemplo:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Actualizar un DAG que tenga ejecuciones activas
Si actualizas un DAG que tiene ejecuciones activas:
- Todas las tareas que se estén ejecutando en ese momento se completarán con el archivo DAG original.
- Todas las tareas programadas que no se estén ejecutando en ese momento usarán el archivo DAG actualizado.
- Todas las tareas que ya no estén en el archivo DAG actualizado se marcarán como eliminadas.
Actualizar los DAGs que se ejecutan con frecuencia
Después de subir un archivo DAG, Airflow tarda un tiempo en cargar el archivo y actualizar el DAG. Si tu DAG se ejecuta con una frecuencia alta, te recomendamos que te asegures de que usa la versión actualizada del archivo DAG. Para ello:
Pausa el DAG en la interfaz de Airflow.
Sube un archivo DAG actualizado.
Espera hasta que veas las actualizaciones en la interfaz de usuario de Airflow. Esto significa que el programador ha analizado correctamente el DAG y lo ha actualizado en la base de datos de Airflow.
Si la interfaz de usuario de Airflow muestra los DAGs actualizados, no se garantiza que los workers de Airflow tengan la versión actualizada del archivo DAG. Esto ocurre porque los archivos DAG se sincronizan de forma independiente para los programadores y los trabajadores.
Puede que quieras ampliar el tiempo de espera para asegurarte de que el archivo DAG se sincroniza con todos los trabajadores de tu entorno. La sincronización se produce varias veces por minuto. En un entorno correcto, esperar entre 20 y 30 segundos es suficiente para que se sincronicen todos los trabajadores.
(Opcional) Si quieres asegurarte de que todos los trabajadores tienen la nueva versión del archivo DAG, inspecciona los registros de cada trabajador. Para ello:
Abre la pestaña Registros de tu entorno en la Google Cloud consola.
Ve a Registros de Composer > Infraestructura > Sincronización de Cloud Storage e inspecciona los registros de cada trabajador de tu entorno. Busca el elemento de registro
Syncing dags directory
más reciente que tenga una marca de tiempo posterior a la subida del nuevo archivo DAG. Si ves un elementoFinished syncing
después, significa que los DAGs se han sincronizado correctamente en este trabajador.
Reanuda el DAG.
Eliminar un DAG de tu entorno
Para eliminar un DAG, quita el archivo .py
de Python del DAG de la carpeta /dags
del entorno en el bucket del entorno.
Consola
En la Google Cloud consola, ve a la página Entornos.
En la lista de entornos, busca una fila con el nombre de tu entorno y, en la columna Carpeta DAGs, haz clic en el enlace DAGs. Se abrirá la página Detalles del segmento. Muestra el contenido de la carpeta
/dags
en el contenedor de tu entorno.Selecciona el archivo DAG, haz clic en Eliminar y confirma la operación.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Sustituye:
ENVIRONMENT_NAME
con el nombre del entorno.LOCATION
por la región en la que se encuentra el entorno.DAG_FILE
con el archivo.py
de Python del DAG.
Ejemplo:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Quitar un DAG de la interfaz de usuario de Airflow
Para quitar los metadatos de un DAG de la interfaz web de Airflow, sigue estos pasos:
Interfaz de usuario de Airflow
- Ve a la interfaz de usuario de Airflow de tu entorno.
- En el DAG, haz clic en Eliminar DAG.
gcloud
En las versiones de Airflow 1 anteriores a la 1.14.0, ejecuta el siguiente comando en la CLI de gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
delete_dag -- DAG_NAME
En Airflow 2, Airflow 1.14.0 y versiones posteriores, ejecuta el siguiente comando en la CLI de gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Sustituye:
ENVIRONMENT_NAME
con el nombre del entorno.LOCATION
con la región en la que se encuentra el entorno.DAG_NAME
es el nombre del DAG que se va a eliminar.