Auf Airflow-Befehlszeile zugreifen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow bietet eine Befehlszeile, mit der Sie Aufgaben wie das Auslösen und Verwalten von DAGs, das Abrufen von Informationen zu DAG-Ausführungen und -Aufgaben, das Hinzufügen und Löschen von Verbindungen und Nutzern ausführen können.

Über CLI-Syntaxversionen

Airflow in Cloud Composer 2 verwendet die Airflow 2-Befehlszeilensyntax.

Unterstützte Befehle der Airflow-Befehlszeile

Eine vollständige Liste der unterstützten Airflow-Befehlszeilenbefehle finden Sie in der Referenz zu gcloud composer environments run.

Hinweise

  • Sie müssen ausreichende Berechtigungen zum Verwenden der gcloud-Befehlszeile haben mit Cloud Composer erstellen und Airflow-Befehlszeilenbefehle ausführen. Weitere Informationen finden Sie unter Zugriffssteuerung.

  • In Cloud Composer-Versionen vor 2.4.0 benötigen Sie Zugriff auf die Steuerungsebene des Umgebungsclusters um Befehle der Airflow-Befehlszeile auszuführen.

Befehle der Airflow-Befehlszeile ausführen

Zum Ausführen von Befehlen der Airflow-Befehlszeile in Ihren Umgebungen verwenden Sie gcloud:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Region, in der sich die Umgebung befindet.
  • SUBCOMMAND durch einen der unterstützten Airflow-Befehlszeilenbefehle.
  • SUBCOMMAND_ARGUMENTS durch Argumente für den Airflow-Befehlszeilenbefehl.

Trennzeichen für Unterbefehlsargumente

Trennen Sie die Argumente für den angegebenen Airflow-Befehlszeilenbefehl durch --:

Airflow 2

Für die Syntax der Airflow-2-Befehlszeile:

  • Geben Sie zusammengesetzte CLI-Befehle als Unterbefehl an.
  • Geben Sie alle Argumente für zusammengesetzte Befehle als Unterbefehlargumente an. nach einem ---Trennzeichen ein.
gcloud composer environments run example-environment \
    dags list -- --output=json

Airflow 1

Cloud Composer 2 unterstützt nur Airflow 2.

Standardort

Für die meisten gcloud composer-Befehle ist ein Speicherort erforderlich. Sie können den Speicherort mit dem Flag --location oder durch Festlegen des Standardspeicherorts angeben.

Beispiel

Beispielsweise haben Sie die Möglichkeit, mit dem folgenden Befehl einen DAG namens sample_quickstart mit der ID 5077 in Ihrer Cloud Composer-Umgebung auszulösen:

Airflow 2

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Airflow 1

Cloud Composer 2 unterstützt nur Airflow 2.

Befehle in einer privaten IP-Umgebung ausführen

Ab Cloud Composer-Version 2.4.0 können Sie die Airflow-Befehlszeile ausführen in einer privaten IP-Umgebung ohne zusätzliche Konfiguration ausführen. Ihr Maschine benötigt keinen Zugriff auf die Cluster-Steuerungsebene der Umgebung für die Ausführung dieser Befehle.

In Cloud Composer-Versionen vor 2.4.0:

So führen Sie Befehle der Airflow-Befehlszeile in einer privaten IP-Umgebung aus: auf einer Maschine ausführen, die auf den GKE-Cluster Endpunkt der Steuerungsebene. Die Optionen variieren dabei abhängig von Ihrer privaten Clusterkonfiguration.

Wenn der Zugriff auf öffentliche Endpunkte im Cluster Ihrer Umgebung deaktiviert ist, kann die Airflow-Befehlszeile nicht mit gcloud composer-Befehlen ausgeführt werden. Führen Sie die folgenden Schritte aus, um Befehle der Airflow-Befehlszeile ausführen zu können:

  1. VM in Ihrem VPC-Netzwerk erstellen
  2. Rufen Sie Clusteranmeldedaten ab. Führen Sie den folgenden Befehl aus: bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
  • Führen Sie den Airflow-Befehl mit kubectl aus. Beispiel:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

Ersetzen Sie COMPOSER_NAMESPACE durch einen Namespace wie diesen: composer-2-0-28-airflow-2-3-394zxc12411. Sie können Ihren Cloud Composer suchen in der Liste der Arbeitslasten oder mit dem Befehl kubectl get namespaces.

Wenn im Cluster Ihrer Umgebung der Zugriff auf öffentliche Endpunkte aktiviert ist, haben Sie folgende Möglichkeiten: Airflow-Kommandozeilenbefehle auf einer Maschine mit einer externen IP-Adresse, zu autorisierten Netzwerken hinzugefügt. Um den Zugriff von Ihrem Computer aus zu ermöglichen, fügen Sie den der externen Adresse Ihres Computers an die Liste der autorisierten Netzwerke.

Befehle der Airflow-Befehlszeile über die Cloud Composer API ausführen

Ab Cloud Composer-Version 2.4.0 können Sie die Airflow-Befehlszeile ausführen über die Cloud Composer API ausführen.

Befehl ausführen

Erstellen Sie eine environments.executeAirflowCommand-API-Anfrage:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

Ersetzen Sie Folgendes:

  • PROJECT_ID: die Projekt-ID.
  • LOCATION: Region, in der sich die Umgebung befindet.
  • ENVIRONMENT_NAME: der Name Ihrer Umgebung
  • AIRFLOW_COMMAND: Befehl der Airflow-Befehlszeile, den Sie ausführen möchten, z. B. dags.
  • AIRFLOW_SUBCOMMAND: Unterbefehl für den Befehl der Airflow-Befehlszeile, der den Sie ausführen möchten, z. B. list.
  • (Optional) SUBCOMMAND_PARAMETER: Parameter für den Unterbefehl. Wenn Sie mehrere Parameter verwenden möchten, fügen Sie der Liste weitere Elemente hinzu.

Beispiel:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

Status des Abfragebefehls

Prüfen Sie nach dem Ausführen eines Befehls der Airflow-Befehlszeile über die Cloud Composer API, ob der Befehl erfolgreich ausgeführt wurde, indem eine PollAirflowCommand-Anfrage und untersucht die in exitInfo für Fehler und Statuscodes. Das Feld output enthält Protokollzeilen ein.

Geben Sie executionId an, um den Ausführungsstatus des Befehls und die Logs abzurufen. pod- und podNamespace-Werte, die von ExecuteAirflowCommandRequest zurückgegeben werden:

Beispiel:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

Fehlerbehebung

Keine Verbindung zur Steuerungsebene des Clusters

Bei Ausführung von gcloud composer environments run oder kubectl-Befehle verwenden, wird möglicherweise der folgende Fehler angezeigt:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

Symptom: Diese Fehlermeldung zeigt an, dass kein Netzwerk vorhanden ist. eine Verbindung von einem Computer herstellen, auf dem Sie diese Befehle ausführen.

Lösung: Folgen Sie den Richtlinien im Befehle in einer privaten IP-Umgebung ausführen oder folgen Sie der Anleitung im Abschnitt zum Zeitüberschreitung beim kubectl-Befehl.