DAGs mit Cloud Functions auslösen

Auf dieser Seite wird gezeigt, wie Sie Cloud Functions für ereignisbasierte DAG-Trigger verwenden.

Airflow ist für das regelmäßige Ausführen von DAGs nach Plan konzipiert. Sie können DAGs aber auch als Reaktion auf Ereignisse auslösen. Eine Möglichkeit dafür ist die Verwendung von Cloud Functions, um Cloud Composer-DAGs auszulösen, wenn ein bestimmtes Ereignis auftritt. Sie können z. B. eine Funktion erstellen, die einen DAG auslöst, wenn sich ein Objekt in einem Cloud Storage-Bucket ändert, oder wenn eine Nachricht per Push an ein Pub/Sub-Thema übertragen wird.

Im vorliegenden Beispiel wird bei jeder Änderung an einem Cloud Storage-Bucket ein DAG ausgeführt. Änderungen an einem Objekt in einem Bucket lösen eine Funktion aus. Diese Funktion stellt eine Anfrage an die Airflow REST API Ihrer Cloud Composer-Umgebung. Airflow verarbeitet diese Anfrage und führt einen DAG aus. Der DAG gibt Informationen zur Änderung aus.

Hinweis

APIs für Ihr Projekt aktivieren

Cloud Composer and Cloud Functions APIs aktivieren.

Aktivieren Sie die APIs

REST API für Airflow-Webserver aktivieren

Standardmäßig ist die API-Authentifizierungsfunktion in Airflow 1.10.11 und höher deaktiviert. Der Airflow-Webserver lehnt alle Anfragen ab, die Sie stellen. Sie verwenden Anfragen, um DAGs auszulösen. Aktivieren Sie daher dieses Feature.

Zum Aktivieren der API-Authentifizierungsfunktion schreiben Sie die folgende Airflow-Konfigurationsoption:

Bereich Schlüssel Wert Hinweise
api auth_backend airflow.api.auth.backend.default Der Standardwert ist airflow.api.auth.backend.deny_all

Airflow-Webserver-URL abrufen

Die Funktion sendet Anfragen an den Airflow-Webserverendpunkt, sodass Sie die URL des Airflow-Webservers erhalten.

Console

So rufen Sie die Airflow-Webserver-URL ab:

  1. Öffnen Sie die Seite Umgebungen.

    Seite „Umgebungen“ öffnen

  2. Klicken Sie auf den Namen Ihrer Umgebung.
  3. Siehe Umgebungskonfiguration für das Element Airflow-Web-UI.

gcloud

Rufen Sie die Airflow-Webserver-URL mit dem folgenden Befehl ab:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

Ersetzen Sie:

  • ENVIRONMENT_NAME durch den Namen der Umgebung.
  • LOCATION durch die Compute Engine-Region, in der sich die Umgebung befindet.

Client-ID des IAM-Proxys abrufen

Für eine Anfrage an den Airflow REST API-Endpunkt benötigt die Funktion die Client-ID des IAM-Proxys, der den Airflow-Webserver schützt.

Cloud Composer stellt diese Informationen nicht direkt bereit. Stellen Sie stattdessen eine nicht authentifizierte Anfrage an den Airflow-Webserver und erfassen Sie die Client-ID aus der Weiterleitungs-URL:

curl -v AIRFLOW_URL 2>&1 >/dev/null | grep "location:"

Ersetzen Sie AIRFLOW_URL durch die URL der Airflow-Weboberfläche.

Suchen Sie in der Ausgabe nach dem String client_id, der auf apps.googleusercontent.com endet. Beispiel:

location: https://accounts.google.com/o/oauth2/v2/auth?
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com&response_type= ...

Cloud Storage-Bucket erstellen

Da in diesem Beispiel ein DAG als Reaktion auf Änderungen in einem Cloud Storage-Bucket ausgelöst wird, erstellen Sie einen neuen Bucket für die Verwendung in diesem Beispiel.

DAG aus Cloud Functions auslösen

DAG in die Umgebung hochladen

Laden Sie einen DAG in Ihre Umgebung hoch. Im folgenden Beispiel-DAG wird die empfangene DAG-Ausführungskonfiguration ausgegeben. Sie lösen diesen DAG aus einer Funktion aus, die Sie später in dieser Anleitung erstellen.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

with DAG(
    dag_id='composer_sample_trigger_response_dag',
    start_date=days_ago(1),
    schedule_interval=None) as dag:

    # Print the received dag_run configuration.
    # The DAG run configuration contains information about the
    # Cloud Storage object change.
    t1 = BashOperator(
        task_id='print_gcs_info',
        bash_command='echo Triggered from GCF: {{ dag_run.conf }}',
        dag=dag)

    t1

Cloud Functions-Funktion bereitstellen, die den DAG auslöst

Stellen Sie eine Cloud Functions-Funktion von Python bereit. Verwenden Sie dazu die folgenden Konfigurationsparameter und Inhalte.

Cloud Functions-Konfigurationsparameter festlegen

  • Trigger: Wählen Sie für dieses Beispiel einen Trigger aus, der funktioniert, wenn ein neues Objekt in einem Bucket erstellt wird, oder ein vorhandenes Objekt wird überschrieben.

    • Triggertyp: Cloud Storage

    • Ereignistyp: Abschließen / Erstellen

    • Bucket: Wählen Sie einen Bucket aus, der diese Funktion auslösen muss.

    • Bei Fehler noch einmal versuchen. Wir empfehlen, diese Option für das Beispiel zu deaktivieren. Wenn Sie in einer Produktionsumgebung Ihre eigene Funktion verwenden, aktivieren Sie diese Option, um vorübergehende Fehler zu behandeln.

  • Dienstkonto für die Laufzeit. Verwenden Sie je nach Ihren Optionen eine der folgenden Optionen:

    • Wählen Sie Compute Engine-Standarddienstkonto aus. Mit standardmäßigen IAM-Berechtigungen kann dieses Konto Funktionen ausführen, die auf Cloud Composer-Umgebungen zugreifen.

    • Erstellen Sie ein benutzerdefiniertes Dienstkonto mit der Rolle Composer-Nutzer und geben Sie es als Laufzeitdienstkonto für diese Funktion an. Diese Option entspricht dem Prinzip der geringsten Berechtigung.

  • Laufzeit und Einstiegspunkt. Wählen Sie beim Hinzufügen von Code für dieses Beispiel die Python 3.7-Laufzeit aus und geben Sie trigger_dag als Einstiegspunkt an.

Anforderungen hinzufügen

Geben Sie die Abhängigkeiten in der Datei requirements.txt an:

requests_toolbelt==0.9.1
google-auth==1.31.0

Funktionscode hinzufügen

Fügen Sie der Datei main.py den folgenden Code hinzu und ersetzen Sie den folgenden Wert:

  • Ersetzen Sie den Wert der Variablen client_id durch den Wert client_id, den Sie in einem vorherigen Schritt erhalten haben.
  • Ersetzen Sie den Wert der Variable webserver_id durch die Airflow-Weboberfläche in einem vorherigen Schritt.

from google.auth.transport.requests import Request
from google.oauth2 import id_token
import requests

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'

def trigger_dag(data, context=None):
    """Makes a POST request to the Composer DAG Trigger API

    When called via Google Cloud Functions (GCF),
    data and context are Background function parameters.

    For more info, refer to
    https://cloud.google.com/functions/docs/writing/background#functions_background_parameters-python

    To call this function from a Python script, omit the ``context`` argument
    and pass in a non-null value for the ``data`` argument.
    """

    # Fill in with your Composer info here
    # Navigate to your webserver's login page and get this from the URL
    # Or use the script found at
    # https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/composer/rest/get_client_id.py
    client_id = 'YOUR-CLIENT-ID'
    # This should be part of your webserver's URL:
    # {tenant-project-id}.appspot.com
    webserver_id = 'YOUR-TENANT-PROJECT'
    # The name of the DAG you wish to trigger
    dag_name = 'composer_sample_trigger_response_dag'
    webserver_url = (
        'https://'
        + webserver_id
        + '.appspot.com/api/experimental/dags/'
        + dag_name
        + '/dag_runs'
    )
    # Make a POST request to IAP which then Triggers the DAG
    make_iap_request(
        webserver_url, client_id, method='POST', json={"conf": data, "replace_microseconds": 'false'})

# This code is copied from
# https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/iap/make_iap_request.py
# START COPIED IAP CODE
def make_iap_request(url, client_id, method='GET', **kwargs):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
      method: The request method to use
              ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT', 'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                If no timeout is provided, it is set to 90 by default.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Set the default timeout, if missing
    if 'timeout' not in kwargs:
        kwargs['timeout'] = 90

    # Obtain an OpenID Connect (OIDC) token from metadata server or using service
    # account.
    google_open_id_connect_token = id_token.fetch_id_token(Request(), client_id)

    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    resp = requests.request(
        method, url,
        headers={'Authorization': 'Bearer {}'.format(
            google_open_id_connect_token)}, **kwargs)
    if resp.status_code == 403:
        raise Exception('Service account does not have permission to '
                        'access the IAP-protected application.')
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text
# END COPIED IAP CODE

Funktion testen

So überprüfen Sie, ob die Funktion und der DAG ordnungsgemäß funktionieren:

  1. Warten Sie, bis die Funktion bereitgestellt wurde.
  2. Laden Sie eine Datei in Ihren Cloud Storage-Bucket hoch. Alternativ können Sie die Funktion manuell in der Google Cloud Console auswählen, indem Sie die Aktion Testfunktion auswählen.
  3. Prüfen Sie die DAG-Seite in der Airflow-Weboberfläche. Der DAG sollte einen aktiven oder bereits abgeschlossenen DAG haben.
  4. Prüfen Sie in der Airflow-Weboberfläche die Aufgabenlogs für diese Ausführung. Die Aufgabe print_gcs_info sollte die von der Funktion empfangenen Daten an die Logs ausgeben:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0

Nächste Schritte