Esta página se ha traducido con Cloud Translation API.
Switch to English

Prueba los DAG (flujos de trabajo)

Antes de implementar los DAG en producción, puedes ejecutar subcomandos de la CLI de Airflow para analizar el código del DAG en el mismo contexto en el que se ejecuta.

Realiza pruebas durante la creación del DAG

Puedes ejecutar una única instancia de tarea localmente y ver el resultado del registro. Ver el resultado te permite comprobar si hay errores de sintaxis y tarea. La prueba local no verifica las dependencias ni comunica el estado a la base de datos.

Te recomendamos que coloques los DAG en una carpeta data/test en tu entorno de prueba.

Verifica que el paquete de PyPI no tenga errores

Dado que las dependencias de PyPI podrían ocasionar conflictos con las dependencias que se necesitan para Airflow, recomendamos que instales los paquetes de Python deseados de forma local en un contenedor de trabajadores de Airflow y pruebes el paquete.

  1. Determina el clúster y la zona de GKE del entorno de Cloud Composer.

    a. Usa el comando gcloud composer para mostrar las propiedades de un entorno de Cloud Composer:

    gcloud composer environments describe ENVIRONMENT_NAME \
        --location ENVIRONMENT_LOCATION
    

    En el ejemplo anterior, se ilustra lo siguiente:

    • ENVIRONMENT_NAME es el nombre del entorno.
    • ENVIRONMENT_LOCATION es la región de Compute Engine donde se encuentra el entorno.

    b. En el resultado, el clúster aparece como gkeCluster.

    c. La zona en la que se implementa el clúster se muestra en la última parte de la propiedad location (config > nodeConfig > location). Por ejemplo, us-central1-b.

  2. Usa el comando gcloud composer para conectar el comando kubectl con el clúster.

    gcloud container clusters get-credentials GKE_CLUSTER --zone GKE_LOCATION
    

    En el ejemplo anterior, se ilustra lo siguiente:

    • GKE_CLUSTER es el clúster.
    • GKE_LOCATION es la zona en la que se implementa el clúster.
  3. Ver y elegir un pod de trabajador de Airflow

    kubectl get pods --all-namespaces
    

    Busca un pod con un nombre como airflow-worker-1a2b3c-x0yz.

  4. Conéctate a una shell remota en un contenedor de trabajadores de Airflow.

    kubectl -n composer-1-14-4-airflow-example-namespace \
    exec -it airflow-worker-1a2b3c-x0yz -c airflow-worker -- /bin/bash
    

    Mientras estás conectado a la shell remota, el símbolo del sistema muestra el nombre del pod trabajador de Airflow, por ejemplo, airflow-worker-1a2b3c-x0yz.

  5. Para la versión de Python que se ejecuta en tu entorno, instala el paquete de Python en el contenedor de trabajadores de Airflow, por ejemplo:

    sudo python3 -m pip install PACKAGE
    
  6. Prueba la compatibilidad en el contenedor de trabajadores de Airflow.

    • Comprueba si hay errores de sintaxis.
    airflow list_dags
    
    • Procesa la plantilla.
    airflow test --dry_run DAG_ID TASK_ID EXECUTION_DATE
    
    • Comprueba si hay errores en la tarea.
    airflow test DAG_ID TASK_ID EXECUTION_DATE
    
  7. Desinstala el paquete de Python del contenedor de trabajadores de Airflow, por ejemplo:

    sudo python3 -m pip uninstall PACKAGE
    

Comprueba errores de sintaxis

  1. En el depósito de Cloud Storage de tu entorno, crea un directorio de prueba.

  2. Para comprobar si hay errores de sintaxis, ingresa el siguiente comando de gcloud:

    gcloud composer environments run ENVIRONMENT_NAME \
     --location ENVIRONMENT_LOCATION \
     list_dags -- -sd /home/airflow/gcs/data/test
    

    Aquí:

    • ENVIRONMENT_NAME es el nombre del entorno.
    • ENVIRONMENT_LOCATION es la región de Compute Engine en la que se encuentra el entorno.

    Por ejemplo:

    gcloud composer environments run \
    test-environment --location us-central1 \
    list_dags -- -sd /home/airflow/gcs/data/test
    

Comprueba errores de tareas

Para comprobar si hay errores específicos de la tarea, ingresa el siguiente comando gcloud:

gcloud composer environments run ENVIRONMENT_NAME \
  --location ENVIRONMENT_LOCATION \
  test -- -sd /home/airflow/gcs/data/test DAG_ID \
  TASK_ID DAG_EXECUTION_DATE

Aquí:

  • ENVIRONMENT_NAME es el nombre del entorno.
  • ENVIRONMENT_LOCATION es la región de Compute Engine en la que se encuentra el entorno.
  • DAG_ID es el ID del DAG.
  • TASK_ID es el ID de la tarea.
  • DAG_EXECUTION_DATE es la fecha de ejecución del DAG. Esta fecha se usa para plantillas. Independientemente de la fecha que especifiques aquí, el DAG se ejecuta de inmediato.

Por ejemplo:

gcloud composer environments run test-environment --location us-central1 \
        -- -sd /home/airflow/gcs/data/test-dags hello_world print_date 2018-09-03

Actualiza y prueba un DAG implementado

Para probar las actualizaciones de tus DAG en el entorno de prueba, haz lo siguiente:

  1. Copia el DAG implementado que deseas actualizar en data/test.
  2. Actualiza el DAG.
  3. Prueba el DAG.
    1. Comprueba si hay errores de sintaxis.
    2. Verifica si hay errores específicos de la tarea.
  4. Asegúrate de que el DAG se ejecute correctamente.
  5. Desactiva el DAG en tu entorno de prueba.
    1. Ve a la página de IU de Airflow > DAG.
    2. Si el DAG que estás modificando se ejecuta constantemente, desactívalo.
    3. Para acelerar las tareas pendientes, haz clic en la tarea y en Marcar como correcta.
  6. Implementa el DAG en tu entorno de producción.
    1. Desactiva el DAG en tu entorno de producción.
    2. Sube el DAG actualizado a la carpeta dags/ en tu entorno de producción.

Preguntas frecuentes sobre la prueba de los flujos de trabajo

¿Qué sigue?