Auf die Airflow REST API zugreifen

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow bietet eine REST API-Schnittstelle, mit der Sie Aufgaben wie das Abrufen von Informationen zu DAG-Ausführungen und -Aufgaben, das Aktualisieren von DAGs, das Abrufen von Airflow-Konfigurationen, das Hinzufügen und Löschen von Verbindungen und das Auflisten von Nutzern ausführen können.

Ein Beispiel für die Verwendung der Airflow REST API mit Cloud Run-Funktionen finden Sie unter DAGs mit Cloud Run-Funktionen auslösen.

Airflow REST API-Versionen

  • Airflow 2 verwendet die stabile REST API.
  • Die experimentelle REST API wurde von Airflow verworfen.

Stabile Airflow REST API konfigurieren

Die stabile REST API ist in Airflow 2 standardmäßig aktiviert. Cloud Composer verwendet ein eigenes API-Authentifizierungs-Backend.

Die Autorisierung funktioniert wie gewohnt von Airflow. Wenn ein neuer Nutzer über die API autorisiert wird, erhält das Nutzerkonto standardmäßig die Rolle Op.

Sie können die stabile REST API aktivieren oder deaktivieren oder die Standardnutzerrolle ändern. Dazu überschreiben die folgenden Airflow-Konfigurationsoptionen:

Bereich Schlüssel Wert Hinweise
api auth_backends airflow.composer.api.backend.composer_auth Ändern Sie zum Deaktivieren der stabilen REST API auf airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op Sie können eine beliebige andere Rolle angeben

API-Aufrufe an die Airflow REST API über die Webserver-Zugriffssteuerung zulassen

Je nach Methode, die zum Aufrufen der Airflow REST API verwendet wird, kann die Aufrufmethode entweder eine IPv4- oder IPv6-Adresse verwenden. Denken Sie daran, den IP-Traffic zur Airflow REST API mithilfe der Webserver-Zugriffssteuerung zu entsperren.

Verwenden Sie die Standardkonfigurationsoption All IP addresses have access (default), wenn Sie nicht sicher sind, von welchen IP-Adressen Ihre Aufrufe an die Airflow REST API gesendet werden.

Aufrufe an Airflow REST API senden

Dieser Abschnitt enthält ein Beispielskript in Python, mit dem Sie DAGs mit der stabilen Airflow REST API auslösen können.

Fügen Sie den Inhalt des folgenden Beispiels in eine Datei mit dem Namen composer2_airflow_rest_api.py ein und legen Sie dann die folgenden Variablen fest:

  • dag_id: Name eines DAG, wie in der DAG-Quelldatei definiert.
  • dag_config: Konfiguration für die DAG-Ausführung.
  • web_server_url: Die URL Ihres Airflow-Webservers. Das Format ist https://<web-server-id>.composer.googleusercontent.com.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text




if __name__ == "__main__":
    # TODO(developer): replace with your values
    dag_id = "your-dag-id"  # Replace with the ID of the DAG that you want to run.
    dag_config = {
        "your-key": "your-value"
    }  # Replace with configuration parameters for the DAG run.
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )

    response_text = trigger_dag(
        web_server_url=web_server_url, dag_id=dag_id, data=dag_config
    )

    print(response_text)

Mit einem Dienstkonto auf die Airflow REST API zugreifen

In der Airflow-Datenbank ist die Länge des E-Mail-Felds auf 64 Zeichen begrenzt. Dienstkonten haben manchmal E-Mail-Adressen, die länger als 64 Zeichen sind. Es ist nicht möglich, Airflow-Nutzer für solche Dienstkonten auf die übliche Weise zu erstellen. Wenn für ein solches Dienstkonto kein Airflow-Nutzer vorhanden ist, führt der Zugriff auf die Airflow REST API zu den HTTP-Fehlern 401 und 403.

Als Problemumgehung können Sie einen Airflow-Nutzer für ein Dienstkonto registrieren. Verwenden Sie dazu accounts.google.com:NUMERIC_USER_ID als Nutzernamen und einen beliebigen eindeutigen String als E-Mail-Adresse.

  1. Führen Sie Folgendes aus, um NUMERIC_USER_ID für ein Dienstkonto abzurufen:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Ersetzen Sie:

    • SA_NAME durch den Namen des Dienstkontos.
    • PROJECT_ID durch die Projekt-ID.
  2. Erstellen Sie einen Airflow-Nutzer mit der Rolle Op für das Dienstkonto:

    Airflow-UI

    1. Rufen Sie die Airflow-UI auf.

    2. Klicken Sie auf Sicherheit > Nutzer auflisten und dann auf Neuen Eintrag hinzufügen. Ihr Airflow-Nutzer muss die Rolle Admin haben, um diese Seite zu öffnen.

    3. Geben Sie accounts.google.com:NUMERIC_USER_ID als Nutzernamen an. Ersetzen Sie NUMERIC_USER_ID durch die Nutzer-ID, die Sie im vorherigen Schritt ermittelt haben.

    4. Geben Sie als E-Mail-Adresse eine eindeutige Kennung an. Sie können einen beliebigen eindeutigen String verwenden.

    5. Geben Sie die Rolle für den Nutzer an. Beispiel: Op.

    6. Das Kästchen Ist aktiv? muss angeklickt sein.

    7. Geben Sie den Vor- und Nachnamen des Nutzers an. Sie können jeden String verwenden.

    8. Klicken Sie auf Speichern.

    gcloud

    Führen Sie den folgenden Airflow-Befehl aus:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Ersetzen Sie:

    • ENVIRONMENT_NAME durch den Namen der Umgebung.
    • LOCATION durch die Region, in der sich die Umgebung befindet.
    • NUMERIC_USER_ID durch die Nutzer-ID, die Sie im vorherigen Schritt ermittelt haben.
    • UNIQUE_ID durch die Kennung für den Airflow-Nutzer. Sie können einen beliebigen eindeutigen String verwenden.
  3. Nachdem Sie einen Airflow-Nutzer für ein Dienstkonto erstellt haben, wird ein als Dienstkonto authentifizierter Aufrufer als vorab registrierter Nutzer erkannt und in Airflow angemeldet.

Airflow REST API-Komponente skalieren

Die Endpunkte der Airflow REST API und der Airflow-Benutzeroberfläche werden auf dem Airflow-Webserver ausgeführt. Wenn Sie die REST API intensiv verwenden, sollten Sie die für den Airflow-Webserver verfügbare CPU- und Arbeitsspeichermenge entsprechend der erwarteten Auslastung erhöhen.

Nächste Schritte