Acceder a la interfaz de línea de comandos de Airflow

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow tiene una interfaz de línea de comandos (CLI) que puedes usar para realizar tareas como activar y gestionar DAGs, obtener información sobre ejecuciones de DAGs y tareas, y añadir y eliminar conexiones y usuarios.

Comandos de la CLI de Airflow admitidos

Airflow usa la sintaxis de la CLI de Airflow 2, que se describe en la documentación de Airflow.

Para ver una lista completa de los comandos de la CLI de Airflow compatibles, consulta la referencia del comando gcloud composer environments run.

Antes de empezar

  • Debes tener permisos para usar la CLI de Google Cloud con Cloud Composer y ejecutar comandos de la CLI de Airflow.

  • Los comandos de la CLI de Airflow consumen la environments.executeAirflowCommand cuota.

  • En las versiones de Cloud Composer anteriores a la 2.4.0, necesitas acceso al plano de control del clúster de tu entorno para ejecutar comandos de la CLI de Airflow.

Ejecutar comandos de la CLI de Airflow

Para ejecutar comandos de la CLI de Airflow en tus entornos, usa la CLI de gcloud:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

Sustituye lo siguiente :

  • ENVIRONMENT_NAME: el nombre de tu entorno.
  • LOCATION: la región en la que se encuentra el entorno.
  • SUBCOMMAND: uno de los comandos de la CLI de Airflow admitidos.
  • SUBCOMMAND_ARGUMENTS con argumentos para el comando de la CLI de Airflow.

Separador de argumentos de subcomando

Separa los argumentos del comando de la CLI de Airflow especificado con --:

  • Especifica comandos de la CLI compuestos como subcomandos.
  • Especifica los argumentos de los comandos compuestos como argumentos de subcomando después de un separador --.

Ejemplo:

gcloud composer environments run example-environment \
    dags list -- --output=json

Ubicación predeterminada

La mayoría de los comandos de gcloud composer requieren una ubicación. Puedes especificar la ubicación con la marca --location o configurar la ubicación predeterminada.

Por ejemplo, para activar un DAG llamado sample_quickstart con el ID 5077 en tu entorno de Cloud Composer, haz lo siguiente:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Ejecutar comandos en un entorno de IP privada

En las versiones de Cloud Composer anteriores a la 2.4.0:

Para ejecutar comandos de la CLI de Airflow en un entorno de IP privada, ejecútalos en una máquina que pueda acceder al endpoint del plano de control del clúster de GKE. Las opciones disponibles pueden variar en función de la configuración de tu clúster privado.

Si el acceso al endpoint público está inhabilitado en el clúster de tu entorno, no podrás usar los comandos gcloud composer para ejecutar la CLI de Airflow. Para poder ejecutar comandos de la CLI de Airflow, sigue estos pasos:

  1. Crear una VM en tu red de VPC
  2. Obtén las credenciales del clúster. Ejecuta el siguiente comando:

    gcloud container clusters get-credentials CLUSTER_NAME \
      --region REGION \
      --project PROJECT \
      --internal-ip
    

Usa kubectl para ejecutar el comando de Airflow. Por ejemplo:

kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

Sustituye COMPOSER_NAMESPACE por un espacio de nombres similar a este: composer-2-0-28-airflow-2-3-394zxc12411. Puedes encontrar tu Cloud Composer en la lista de cargas de trabajo o usando el comando kubectl get namespaces.

Si el acceso a los endpoints públicos está habilitado en el clúster de tu entorno, también puedes ejecutar comandos de la CLI de Airflow desde una máquina con una dirección IP externa que se haya añadido a las redes autorizadas. Para habilitar el acceso desde tu máquina, añade la dirección externa de tu máquina a la lista de redes autorizadas de tu entorno.

Al ejecutar los comandos gcloud composer environments run o kubectl, puede que se produzca el siguiente error:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

Síntoma: Este mensaje de error indica que no hay conectividad de red desde un ordenador en el que ejecutas estos comandos.

Solución: Sigue las directrices que se indican en la sección Ejecutar comandos en un entorno de IP privada o las instrucciones disponibles en la sección El comando kubectl agota el tiempo de espera.

Ejecutar comandos de la CLI de Airflow a través de la API de Cloud Composer

A partir de la versión 2.4.0 de Cloud Composer, puedes ejecutar comandos de la CLI de Airflow a través de la API de Cloud Composer.

Ejecutar un comando

Crea una solicitud de API environments.executeAirflowCommand:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID de proyecto.
  • LOCATION: la región en la que se encuentra el entorno.
  • ENVIRONMENT_NAME: el nombre de tu entorno.
  • AIRFLOW_COMMAND: comando de la CLI de Airflow que quieras ejecutar, como dags.
  • AIRFLOW_SUBCOMMAND: subcomando del comando de la CLI de Airflow que quieras ejecutar, como list.
  • (Opcional) SUBCOMMAND_PARAMETER: parámetros del subcomando. Si quieres usar más de un parámetro, añade más elementos a la lista.

Ejemplo:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

Estado del comando de sondeo

Después de ejecutar un comando de la CLI de Airflow a través de la API de Cloud Composer, comprueba si el comando se ha completado correctamente haciendo una solicitud PollAirflowCommand e inspeccionando los campos de exitInfo para ver los errores y los códigos de estado. El campo output contiene líneas de registro.

Para obtener el estado de ejecución del comando y los registros, proporcione los valores executionId, pod y podNamespace devueltos por ExecuteAirflowCommandRequest:

Ejemplo:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

Siguientes pasos