Auf Airflow REST API zugreifen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow bietet eine REST API-Schnittstelle, mit der Sie Aufgaben wie das Abrufen von Informationen zu DAG-Ausführungen und -Aufgaben, das Aktualisieren von DAGs, das Abrufen von Airflow-Konfigurationen, das Hinzufügen und Löschen von Verbindungen und das Auflisten von Nutzern ausführen können.

Ein Beispiel für die Verwendung der Airflow REST API mit Cloud Functions finden Sie unter DAGs mit Cloud Functions auslösen.

Airflow REST API-Versionen

Die folgenden Airflow REST API-Versionen sind in Cloud Composer 2 verfügbar:

  • Airflow 2 verwendet die stabile REST API. Die experimentelle REST API wurde von Airflow verworfen.

  • Sie können die experimentelle REST API in Airflow 2 weiterhin verwenden, wenn Sie sie über eine Airflow-Konfigurationsüberschreibung aktivieren, wie unten beschrieben.

Hinweis

Cloud Composer API aktivieren.

Aktivieren Sie die API

Stabile Airflow REST API aktivieren

Die stabile REST API ist in Airflow 2 bereits standardmäßig aktiviert.

Cloud Composer verwendet ein eigenes API-Authentifizierungs-Backend.

Die Autorisierung funktioniert wie gewohnt von Airflow. Wenn ein neuer Nutzer über die API autorisiert wird, erhält das Nutzerkonto standardmäßig die Rolle Op.

Sie können die stabile REST API aktivieren oder deaktivieren oder die Standardnutzerrolle ändern. Dazu überschreiben die folgenden Airflow-Konfigurationsoptionen:

Bereich Schlüssel Wert Hinweise
api (Airflow 2.2.5 und niedriger) auth_backend
(Airflow 2.3.0 und höher) auth_backends
airflow.composer.api.backend.composer_auth Um die stabile REST API zu deaktivieren, ändern Sie zu airflow.api.auth.backend.deny_all
api composer_auth_user_registration_role Op Sie können eine beliebige andere Rolle angeben

Experimentelle Airflow REST API aktivieren

Standardmäßig ist das API-Authentifizierungsfeature in der experimentellen API deaktiviert. Der Airflow-Webserver lehnt alle von Ihnen gestellten Anfragen ab.

Überschreiben Sie die folgende Airflow-Konfigurationsoption, um die API-Authentifizierungsfunktion und die experimentelle API von Airflow 2 zu aktivieren:

Bereich Schlüssel Wert Hinweise
api (Airflow 2.2.5 und niedriger) auth_backend
(Airflow 2.3.0 und höher) auth_backends
airflow.api.auth.backend.default Der Standardwert ist airflow.composer.api.backend.composer_auth.
api enable_experimental_api True Der Standardwert ist False.

Nachdem Sie diese Konfigurationsoption auf airflow.api.auth.backend.default, der Airflow-Webserver akzeptiert alle APIs -Anfragen ohne Authentifizierung. Auch wenn der Airflow-Webserver selbst keine Authentifizierung erfordert, ist er durch Identity-Aware Proxy mit einer eigenen Authentifizierungsebene geschützt.

API-Aufrufe an Airflow REST API mit Webserver-Zugriffssteuerung zulassen

Abhängig von der Methode, mit der die Airflow REST API aufgerufen wird, kann entweder eine IPv4- oder IPv6-Adresse verwenden. Blockierung aufheben IP-Traffic an die Airflow REST API mit der Webserver-Zugriffssteuerung

Verwenden Sie die Standardkonfigurationsoption: All IP addresses have access (default). Wenn Sie sich nicht sicher sind, von welcher IP-Adresse aus Ihre Aufrufe an die Airflow REST API gesendet werden gesendet.

Aufrufe an Airflow REST API senden

Dieser Abschnitt enthält ein Beispielskript in Python, mit dem Sie DAGs mit der stabilen Airflow REST API auslösen können.

Fügen Sie den Inhalt des folgenden Beispiels in eine Datei mit dem Namen composer2_airflow_rest_api.py ein und geben Sie dann Ihre Airflow-UI-URL, den Namen des DAG und die DAG-Ausführungskonfiguration in den Parametern an.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Die folgende Konfiguration ist beispielsweise nicht korrekt

  web_server_url = (
    "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com/"
  )

Mit einem Dienstkonto auf die Airflow REST API zugreifen

Die Airflow-Datenbank beschränkt die Länge des E-Mail-Felds auf 64 Zeichen. Dienstkonten haben manchmal E-Mail-Adressen, die länger als 64 Zeichen sind Zeichen. Es ist nicht möglich, Airflow-Nutzer für einen solchen Dienst zu erstellen wie gewohnt zu verwalten. Wenn es für einen solchen Dienst keinen Airflow-Nutzer gibt und der Zugriff auf die Airflow REST API führt zu HTTP-Fehlern 401 und 403.

Als Behelfslösung können Sie einen Airflow-Nutzer für ein Dienstkonto vorregistrieren. Bis Verwenden Sie dabei accounts.google.com:NUMERIC_USER_ID als Nutzernamen und eindeutigen String wie die E-Mail-Adresse an.

  1. Führen Sie folgenden Befehl aus, um NUMERIC_USER_ID für ein Dienstkonto abzurufen:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Ersetzen Sie:

    • SA_NAME durch den Namen des Dienstkontos.
    • PROJECT_ID durch die Projekt-ID.
  2. Erstellen Sie einen Airflow-Nutzer mit der Rolle Op für das Dienstkonto:

    Airflow-UI

    1. Rufen Sie die Airflow-UI auf.

    2. Gehen Sie zu Sicherheit > Nutzer auflisten und klicken Sie auf Fügen Sie einen neuen Eintrag hinzu. Ihr Airflow-Nutzer muss die Rolle Admin haben, um um diese Seite zu öffnen.

    3. accounts.google.com:NUMERIC_USER_ID als Nutzer angeben Namen. Ersetzen Sie NUMERIC_USER_ID durch die Nutzer-ID, die auf der vorherigen Schritt.

    4. Geben Sie eine eindeutige Kennung als E-Mail-Adresse an. Sie können beliebige eindeutigen String hinzu.

    5. Geben Sie die Rolle für den Nutzer an. Beispiel: Op.

    6. Achten Sie darauf, dass das Kästchen Ist aktiv? angeklickt ist.

    7. Geben Sie den Vor- und Nachnamen des Nutzers an. Sie können beliebige .

    8. Klicken Sie auf Speichern.

    gcloud

    Führen Sie den folgenden Befehl der Airflow-Befehlszeile aus:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Ersetzen Sie:

    • ENVIRONMENT_NAME durch den Namen der Umgebung.
    • LOCATION durch die Region, in der sich die Umgebung befindet.
    • NUMERIC_USER_ID durch die im vorherigen Schritt abgerufene User-ID.
    • UNIQUE_ID durch die ID des Airflow-Nutzers. Sie können beliebige eindeutigen String hinzu.
  3. Nachdem Sie einen Airflow-Nutzer für ein Dienstkonto erstellt haben, da das Dienstkonto als vorregistrierter Nutzer erkannt wird, und bei Airflow angemeldet.

Airflow REST API-Komponente skalieren

Airflow REST API und Airflow-UI-Endpunkte werden innerhalb der Komponente ausgeführt. z.B. Airflow-Webserver. Wenn Sie die REST API intensiv nutzen, sollten Sie CPU- und Speicherparameter werden erhöht, um Airflow Webserver-Ressourcen anzupassen auf die erwartete Auslastung.

Nächste Schritte