Agrega y actualiza DAG (flujos de trabajo)

En esta página, se describe cómo determinar el bucket de almacenamiento de tu entorno y cómo agregar, actualizar y borrar un DAG del entorno.

Cloud Composer usa Cloud Storage para almacenar los DAG de Apache Airflow, también conocidos como flujos de trabajo. Cada entorno está asociado a un bucket de Cloud Storage. Cloud Composer programa solo los DAG del bucket de Cloud Storage.

Antes de comenzar

  • Debido a que Apache Airflow no proporciona un aislamiento eficaz del DAG, recomendamos que mantengas entornos de prueba y producción separados para evitar la interferencia del DAG. Para obtener más información, consulta Probar DAG.
  • Estos son los permisos necesarios para agregar y actualizar complementos en el depósito de Cloud Storage del entorno de Cloud Composer:
    • storage.objectAdmin para subir archivos
    • composer.environments.get para buscar el depósito de destino del DAG Este permiso no es obligatorio cuando se usa la API de Cloud Storage o gsutil.
  • Los cambios en el DAG tardan de 3 a 5 minutos. Puedes ver el estado de la tarea en la interfaz web de Airflow.

Determina el nombre del bucket de almacenamiento

Para determinar el nombre del bucket de almacenamiento asociado con tu entorno, haz lo siguiente:

Console

  1. Abre la página Entornos en Cloud Console.
    Abrir la página Entornos
  2. En la columna Nombre, haz clic en el nombre del entorno para abrir la página de detalles correspondiente.

    En la pestaña Configuración, el nombre del depósito de Cloud Storage se muestra a la derecha de la carpetaDAG.

  3. Para ver el bucket en Cloud Storage, haz clic en su nombre (opcional).

gcloud

Ingresa el siguiente comando:

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine donde se encuentra el entorno.
  • --format es una opción para especificar solo la propiedad dagGcsPrefix, en lugar de todos los detalles del entorno.

La propiedad dagGcsPrefix muestra el nombre del depósito:

gs://region-environment_name-random_id-bucket/

rest

Para obtener información sobre las credenciales, consulta Cloud Composer Authentication.

  1. Llama al método projects.locations.environments.get.
  2. Lee el config.dagGcsPrefix de la respuesta del Entorno.

rpc

Para obtener información sobre las credenciales, consulta Cloud Composer Authentication.

  1. Llama al método Environments.GetEnvironment.
  2. Lee el config.dag_gcs_prefix de la respuesta del Entorno.

Python

Usa la biblioteca google-auth a fin de obtener credenciales y la biblioteca requests para llamar a la API de REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Agrega o actualiza un DAG

Para agregar o actualizar un DAG, mueve el archivo .py de Python del DAG a la carpeta dags del entorno en Cloud Storage.

Console

  1. Abre la página Entornos en Cloud Console.
    Abrir la página Entornos
  2. En la columna Nombre, haz clic en el nombre del entorno para abrir la página de detalles correspondiente.

    En la pestaña Configuración, el nombre del depósito de Cloud Storage se muestra a la derecha de la carpeta DAG.

  3. Para ver el bucket de Cloud Storage, haz clic en el nombre del bucket.

    De forma predeterminada, se abre la carpeta dags.

  4. Haz clic en Subir archivos y selecciona la copia local del DAG que deseas subir.

  5. Para subir el archivo a la carpeta dags, haz clic en Abrir.

gcloud

Para agregar o actualizar un DAG, usa el siguiente comando:

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • LOCAL_FILE_TO_UPLOAD es el DAG que se debe subir.

Borra un DAG

Borra un DAG en tu entorno

Para borrar un DAG, quita el archivo .py de Python correspondiente al DAG de la carpeta dags del entorno de Cloud Storage. Cuando se borra un DAG, sus metadatos no se quitan de la interfaz web de Airflow.

Console

  1. Ve a la página Entornos en Cloud Console.
    Abrir la página Entornos
  2. En la columna Nombre, haz clic en el nombre del entorno para abrir la página de detalles del entorno.

    En la pestaña Configuración, el nombre del depósito de Cloud Storage se muestra a la derecha de la carpeta DAGs.

  3. Para ver el bucket de Cloud Storage, haz clic en el nombre del bucket.

    De forma predeterminada, se abre la carpeta dags.

  4. Haz clic en la casilla de verificación junto al DAG que deseas borrar.

  5. En la parte superior de Cloud Console, haz clic en Borrar.

  6. En el cuadro de diálogo que aparece, haz clic en Aceptar.

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • DAG_NAME.py es el DAG que se debe borrar.
  • Airflow 1.9.0: los metadatos de los DAG borrados permanecen visibles en la interfaz web de Airflow.

  • Airflow 1.10.0 o posterior: puedes usar la herramienta de gcloud para quitar los metadatos del DAG.

Quita un DAG de la interfaz web de Airflow

Para quitar los metadatos de un DAG de la interfaz web de Airflow, ingresa el siguiente código:

  gcloud composer environments run --location LOCATION \
  ENVIRONMENT_NAME delete_dag -- DAG_NAME

Donde:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • DAG_NAME es el nombre del DAG que se debe borrar.

¿Qué sigue?