Auf die Airflow REST API zugreifen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Apache Airflow bietet eine REST API-Schnittstelle, mit der Sie Aufgaben wie das Abrufen von Informationen zu DAG-Ausführungen und -Aufgaben, das Aktualisieren von DAGs, das Abrufen von Airflow-Konfigurationen, das Hinzufügen und Löschen von Verbindungen und das Auflisten von Nutzern ausführen können.

Ein Beispiel für die Verwendung der Airflow REST API mit Cloud Run-Funktionen finden Sie unter DAGs mit Cloud Functions auslösen.

Airflow REST API-Versionen

Die folgenden Airflow REST API-Versionen sind in Cloud Composer 1 verfügbar:

  • Airflow 2 verwendet die stabile REST API. Die experimentelle REST API wurde von Airflow verworfen.

  • Sie können die experimentelle REST API in Airflow 2 weiterhin verwenden, wenn Sie sie über eine Airflow-Konfigurationsüberschreibung aktivieren, wie unten beschrieben.

Hinweis

Enable the Cloud Composer API.

Enable the API

Stabile Airflow REST API aktivieren

Airflow 2

Die stabile REST API ist in Airflow 2 bereits standardmäßig aktiviert.

Cloud Composer verwendet ein eigenes API-Authentifizierungs-Backend, in das der Identity-Aware Proxy eingebunden ist.

Die Autorisierung funktioniert wie gewohnt von Airflow. Wenn ein neuer Nutzer über die API autorisiert wird, erhält das Nutzerkonto standardmäßig die Rolle Op.

Sie können die stabile REST API aktivieren oder deaktivieren oder die Standardnutzerrolle ändern. Dazu überschreiben die folgenden Airflow-Konfigurationsoptionen:

Bereich Schlüssel Wert Hinweise
api (Airflow 2.2.5 und niedriger) auth_backend
(Airflow 2.3.0 und höher) auth_backends
airflow.composer.api.backend.composer_auth Ändern Sie zum Deaktivieren der stabilen REST API auf airflow.api.auth.backend.deny_all.
api composer_auth_user_registration_role Op Sie können eine beliebige andere Rolle angeben

Airflow 1

Die stabile REST API ist in Airflow 1 nicht verfügbar. Sie können stattdessen die experimentelle REST API verwenden.

Experimentelle Airflow REST API aktivieren

Airflow 2

Standardmäßig ist das API-Authentifizierungsfeature in der experimentellen API deaktiviert. Der Airflow-Webserver lehnt alle von Ihnen gestellten Anfragen ab.

Überschreiben Sie die folgende Airflow-Konfigurationsoption, um die API-Authentifizierungsfunktion und die experimentelle API von Airflow 2 zu aktivieren:

Bereich Schlüssel Wert Hinweise
api (Airflow 2.2.5 und niedriger) auth_backend
(Airflow 2.3.0 und höher) auth_backends
airflow.api.auth.backend.default Der Standardwert ist airflow.composer.api.backend.composer_auth.
api enable_experimental_api True Der Standardwert ist False.

Airflow 1

Standardmäßig ist das API-Authentifizierungsfeature in Airflow 1.10.11 und höheren Versionen deaktiviert. Der Airflow-Webserver lehnt alle von Ihnen gestellten Anfragen ab. Zum Auslösen von DAGs verwenden Sie Anfragen. Aktivieren Sie daher diese Funktion.

Zum Aktivieren der API-Authentifizierungsfunktion in Airflow 1 überschreiben Sie die folgende Airflow-Konfigurationsoption:

Bereich Schlüssel Wert Hinweise
api auth_backend airflow.api.auth.backend.default Der Standardwert ist airflow.api.auth.backend.deny_all.

Nachdem Sie diese Konfigurationsoption auf airflow.api.auth.backend.default gesetzt haben, akzeptiert der Airflow-Webserver alle API-Anfragen ohne Authentifizierung. Auch wenn der Airflow-Webserver selbst keine Authentifizierung erfordert, ist er durch Identity-Aware Proxy mit einer eigenen Authentifizierungsebene geschützt.

API-Aufrufe an Airflow REST API mit Webserver-Zugriffssteuerung zulassen

Abhängig von der Methode, mit der die Airflow REST API aufgerufen wird, kann entweder eine IPv4- oder IPv6-Adresse verwenden. Denken Sie daran, den IP-Traffic zur Airflow REST API mithilfe der Webserver-Zugriffssteuerung zu entsperren.

Verwenden Sie die Standardkonfigurationsoption: All IP addresses have access (default). Wenn Sie sich nicht sicher sind, von welcher IP-Adresse aus Ihre Aufrufe an die Airflow REST API gesendet werden gesendet.

Aufrufe an Airflow REST API senden

client_id des IAM-Proxys abrufen

Um eine Anfrage an den Airflow REST API-Endpunkt zu senden, benötigt die Funktion die Client-ID des IAM-Proxys, der den Airflow-Webserver schützt.

Cloud Composer stellt diese Informationen nicht direkt zur Verfügung. Stattdessen eine nicht authentifizierte Anfrage an den Airflow-Webserver senden und Client-ID aus der Weiterleitungs-URL:

cURL

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"

Ersetzen Sie AIRFLOW_URL durch die URL der Airflow-Weboberfläche.

Suchen Sie in der Ausgabe nach dem String nach client_id. Beispiel:

client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com

Python

Speichern Sie den folgenden Code in einer Datei mit dem Namen get_client_id.py. Geben Sie die Werte für project_id, location und composer_environment ein und führen Sie den Code dann in Cloud Shell oder in Ihrer lokalen Umgebung aus.

# This script is intended to be used with Composer 1 environments
# In Composer 2, the Airflow Webserver is not in the tenant project
# so there is no tenant client ID
# See https://cloud.google.com/composer/docs/composer-2/environment-architecture
# for more details
import google.auth
import google.auth.transport.requests
import requests
import six.moves.urllib.parse

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
composer_response = authed_session.request("GET", environment_url)
environment_data = composer_response.json()
composer_version = environment_data["config"]["softwareConfig"]["imageVersion"]
if "composer-1" not in composer_version:
    version_error = (
        "This script is intended to be used with Composer 1 environments. "
        "In Composer 2, the Airflow Webserver is not in the tenant project, "
        "so there is no tenant client ID. "
        "See https://cloud.google.com/composer/docs/composer-2/environment-architecture for more details."
    )
    raise (RuntimeError(version_error))
airflow_uri = environment_data["config"]["airflowUri"]

# The Composer environment response does not include the IAP client ID.
# Make a second, unauthenticated HTTP request to the web server to get the
# redirect URI.
redirect_response = requests.get(airflow_uri, allow_redirects=False)
redirect_location = redirect_response.headers["location"]

# Extract the client_id query parameter from the redirect.
parsed = six.moves.urllib.parse.urlparse(redirect_location)
query_string = six.moves.urllib.parse.parse_qs(parsed.query)
print(query_string["client_id"][0])

Airflow REST API mit client_id aufrufen

Ersetzen Sie die folgenden Werte:

  • Ersetzen Sie den Wert der Variablen client_id durch den Wert von client_id die im vorherigen Schritt abgerufen wurden.
  • Ersetzen Sie den Wert der Variable webserver_id durch die Mandantenprojekt-ID, die Teil der URL der Airflow-Weboberfläche vor .appspot.com ist. Sie haben die URL der Airflow-Weboberfläche in einem vorherigen Schritt abgerufen.
  • Geben Sie die von Ihnen verwendete Airflow REST API-Version an:

    • Wenn Sie die stabile Airflow API verwenden, legen Sie die Variable USE_EXPERIMENTAL_API auf False fest.
    • Wenn Sie die experimentelle Airflow REST API verwenden, sind keine Änderungen erforderlich. Die Variable USE_EXPERIMENTAL_API ist bereits auf True festgelegt.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests


IAM_SCOPE = "https://www.googleapis.com/auth/iam"
OAUTH_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
# If you are using the stable API, set this value to False
# For more info about Airflow APIs see https://cloud.google.com/composer/docs/access-airflow-api
USE_EXPERIMENTAL_API = True


def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.

    This function is currently only compatible with Composer v1 environments.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/composer/rest/get_client_id.py
    client_id = "YOUR-CLIENT-ID"
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = "YOUR-TENANT-PROJECT"
    # The name of the DAG you wish to trigger
    dag_name = "composer_sample_trigger_response_dag"

    if USE_EXPERIMENTAL_API:
        endpoint = f"api/experimental/dags/{dag_name}/dag_runs"
        json_data = {"conf": data, "replace_microseconds": "false"}
    else:
        endpoint = f"api/v1/dags/{dag_name}/dagRuns"
        json_data = {"conf": data}
    webserver_url = "https://" + webserver_id + ".appspot.com/" + endpoint
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(webserver_url, client_id, method="POST", json=json_data)


# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method="GET", **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method,
        url,
        headers={"Authorization": "Bearer {}".format(google_open_id_connect_token)},
        **kwargs,
    )
    if resp.status_code == 403:
        raise Exception(
            "Service account does not have permission to "
            "access the IAP-protected application."
        )
    elif resp.status_code != 200:
        raise Exception(
            "Bad response from application: {!r} / {!r} / {!r}".format(
                resp.status_code, resp.headers, resp.text
            )
        )
    else:
        return resp.text


# END COPIED IAP CODE

Mit einem Dienstkonto auf die Airflow REST API zugreifen

Die Airflow-Datenbank beschränkt die Länge des E-Mail-Felds auf 64 Zeichen. Dienstkonten haben manchmal E-Mail-Adressen, die länger als 64 Zeichen sind. Es ist nicht möglich, Airflow-Nutzer für einen solchen Dienst zu erstellen wie gewohnt zu verwalten. Wenn es für einen solchen Dienst keinen Airflow-Nutzer gibt und der Zugriff auf die Airflow REST API führt zu HTTP-Fehlern 401 und 403.

Als Problemumgehung können Sie einen Airflow-Nutzer vorab für ein Dienstkonto registrieren. Bis Verwenden Sie dabei accounts.google.com:NUMERIC_USER_ID als Nutzernamen und eindeutigen String wie die E-Mail-Adresse an.

  1. Führen Sie folgenden Befehl aus, um NUMERIC_USER_ID für ein Dienstkonto abzurufen:

    gcloud iam service-accounts describe \
      SA_NAME@PROJECT_ID.iam.gserviceaccount.com \
      --format="value(oauth2ClientId)"
    

    Ersetzen Sie:

    • SA_NAME durch den Namen des Dienstkontos.
    • PROJECT_ID durch die Projekt-ID.
  2. Erstellen Sie einen Airflow-Nutzer mit der Rolle Op für das Dienstkonto:

    Airflow-UI

    1. Rufen Sie die Airflow-UI auf.

    2. Klicken Sie auf Verwaltung > Nutzer und dann auf Erstellen. Ihr Airflow-Nutzer muss die Rolle Admin haben, um diese Seite zu öffnen.

    3. Geben Sie accounts.google.com:NUMERIC_USER_ID als Nutzernamen an. Ersetzen Sie NUMERIC_USER_ID durch die Nutzer-ID, die Sie im vorherigen Schritt ermittelt haben.

    4. Geben Sie eine eindeutige Kennung als E-Mail-Adresse an. Sie können einen beliebigen eindeutigen String verwenden.

    5. Geben Sie die Rolle für den Nutzer an. Beispiel: Op.

    6. Das Kästchen Ist aktiv? muss angeklickt sein.

    7. Geben Sie den Vor- und Nachnamen des Nutzers an. Sie können beliebige .

    8. Klicken Sie auf Speichern.

    gcloud

    Führen Sie in Airflow 2 den folgenden Befehl der Airflow-Befehlszeile aus:

    gcloud composer environments run ENVIRONMENT_NAME \
        --location LOCATION \
        users create -- \
        -u accounts.google.com:NUMERIC_USER_ID \
        -e UNIQUE_ID  \
        -f UNIQUE_ID \
        -l - -r Op --use-random-password
    

    Ersetzen Sie:

    • ENVIRONMENT_NAME durch den Namen der Umgebung.
    • LOCATION durch die Region, in der sich die Umgebung befindet.
    • NUMERIC_USER_ID durch die Nutzer-ID, die Sie im vorherigen Schritt ermittelt haben.
    • UNIQUE_ID durch die Kennung für den Airflow-Nutzer. Sie können beliebige eindeutigen String hinzu.
  3. Nachdem Sie einen Airflow-Nutzer für ein Dienstkonto erstellt haben, da das Dienstkonto als vorregistrierter Nutzer erkannt wird, und bei Airflow angemeldet.

Airflow REST API-Komponente skalieren

Die Endpunkte der Airflow REST API und der Airflow-Benutzeroberfläche werden innerhalb der Komponente, also des Airflow-Webservers, ausgeführt. Wenn Sie die REST API intensiv nutzen, sollten Sie CPU- und Speicherparameter werden erhöht, um Airflow Webserver-Ressourcen anzupassen auf die erwartete Auslastung.

Nächste Schritte