Accéder à la CLI Airflow

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow dispose d'une interface de ligne de commande (CLI) qui vous permet de déclencher et gérer des DAG, d'obtenir des informations sur les exécutions de DAG et sur les tâches, et d'ajouter ou supprimer des connexions et des utilisateurs.

À propos des versions de syntaxe de la CLI

Airflow dans Cloud Composer 2 utilise la syntaxe CLI Airflow 2.

Commandes de CLI Airflow compatibles

Pour obtenir la liste complète des commandes de CLI Airflow compatibles, consultez la documentation de référence gcloud composer environments run.

Avant de commencer

Exécuter des commandes de CLI Airflow

Pour exécuter les commandes de CLI Airflow dans vos environnements, utilisez gcloud :

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

Remplacez :

  • ENVIRONMENT_NAME par le nom de l'environnement.
  • LOCATION par la région où se trouve l'environnement.
  • SUBCOMMAND par l'une des commandes de CLI Airflow compatibles.
  • SUBCOMMAND_ARGUMENTS par les arguments de la commande Airflow CLI.

Séparateur d'arguments de sous-commande

Séparez les arguments de la commande de CLI Airflow spécifiée avec -- :

Airflow 2

Pour la syntaxe Airflow 2 :

  • Spécifiez les commandes CLI composées en tant que sous-commande.
  • Spécifiez les arguments des commandes composées en tant qu'arguments de sous-commande, après un séparateur --.
gcloud composer environments run example-environment \
    dags list -- --output=json

Airflow 1

Cloud Composer 2 n'est compatible qu'avec Airflow 2.

Emplacement par défaut

La plupart des commandes gcloud composer nécessitent un emplacement. Vous pouvez le spécifier à l'aide de l'option --location ou en définissant l'emplacement par défaut.

Exemple

Par exemple, pour déclencher un DAG nommé sample_quickstart ayant l'ID 5077 dans votre environnement Cloud Composer :

Airflow 2

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Airflow 1

Cloud Composer 2 n'est compatible qu'avec Airflow 2.

Exécuter des commandes sur un environnement d'adresse IP privée

À partir de la version 2.4.0 de Cloud Composer, vous pouvez exécuter des commandes de CLI Airflow dans un environnement d'adresse IP privée sans configuration supplémentaire. Votre machine n'a pas besoin d'accéder au point de terminaison du plan de contrôle du cluster de l'environnement pour exécuter ces commandes.

Dans les versions de Cloud Composer antérieures à la version 2.4.0:

Pour exécuter des commandes de CLI Airflow sur un environnement d'adresse IP privée, exécutez-les sur une machine qui peut accéder au point de terminaison du plan de contrôle du cluster GKE. Les options peuvent varier en fonction de votre configuration de cluster privé.

Si l'accès public aux points de terminaison est désactivé dans le cluster de votre environnement, il n'est pas possible d'utiliser les commandes gcloud composer pour exécuter la CLI Airflow. Pour pouvoir exécuter des commandes de CLI Airflow, procédez comme suit:

  1. Créer une VM dans votre réseau VPC
  2. Procurez-vous les identifiants du cluster. Exécutez la commande suivante : bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
  • Utilisez kubectl pour exécuter votre commande Airflow. Exemple :
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

Remplacez COMPOSER_NAMESPACE par un espace de noms semblable à : composer-2-0-28-airflow-2-3-394zxc12411. Vous pouvez trouver vos fichiers Cloud Composer dans la liste des charges de travail ou à l'aide de la commande kubectl get namespaces.

Si l'accès public aux points de terminaison est activé dans le cluster de votre environnement, vous pouvez également exécuter les commandes de la CLI Airflow depuis une machine dotée d'une adresse IP externe ajoutée aux réseaux autorisés. Pour activer l'accès depuis votre machine, ajoutez le externe de votre machine au réseau VPC liste des réseaux autorisés.

Exécuter des commandes de CLI Airflow via l'API Cloud Composer

À partir de la version 2.4.0 de Cloud Composer, vous pouvez exécuter la CLI Airflow via l'API Cloud Composer.

Exécuter une commande

Créez une requête API environments.executeAirflowCommand :

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet.
  • LOCATION : région où se trouve l'environnement.
  • ENVIRONMENT_NAME : nom de votre environnement
  • AIRFLOW_COMMAND : commande de la CLI Airflow que vous souhaitez exécuter, par exemple dags.
  • AIRFLOW_SUBCOMMAND : sous-commande de la commande de la CLI Airflow que vous souhaitez exécuter, par exemple list.
  • (facultatif) SUBCOMMAND_PARAMETER: paramètres de la sous-commande. Si vous si vous souhaitez utiliser plusieurs paramètres, ajoutez des éléments à la liste.

Exemple :

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

État de la commande d'interrogation

Une fois que vous avez exécuté une commande de CLI Airflow via l'API Cloud Composer, vérifiez si la commande a bien été exécutée en effectuant une requête PollAirflowCommand et en inspectant les champs de exitInfo pour détecter les erreurs et les codes d'état. Le champ output contient des lignes de journal.

Pour obtenir l'état d'exécution de la commande et récupérer les journaux, fournissez les valeurs executionId, pod et podNamespace renvoyées par ExecuteAirflowCommandRequest :

Exemple :

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

Dépannage

Aucune connectivité au plan de contrôle du cluster

Lors de l'exécution de gcloud composer environments run ou kubectl, vous pouvez rencontrer l'erreur suivante:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

Problème constaté: ce message d'erreur indique qu'il n'y a pas de réseau la connectivité à partir d'un ordinateur où vous exécutez ces commandes.

Solution : Suivez les consignes présentées dans la section Exécuter des commandes sur un environnement d'adresse IP privée ou utilisez les instructions disponibles dans la section Expiration de la commande kubectl.