DAGs aus GitHub testen, synchronisieren und bereitstellen

Cloud Composer 1 Cloud Composer 2

In dieser Anleitung wird erläutert, wie Sie eine CI/CD-Pipeline erstellen, um DAGs aus Ihrem GitHub-Repository in Ihrer Cloud Composer-Umgebung zu testen, zu synchronisieren und bereitzustellen.

Wenn Sie nur Daten aus anderen Diensten synchronisieren möchten, lesen Sie die Informationen unter Daten aus anderen Diensten übertragen.

CI/CD-Pipeline – Übersicht

Architekturdiagramm, das die Schritte des Ablaufs zeigt. Das Vorabsenden und die PR-Überprüfung befinden sich im GitHub-Abschnitt, die DAG-Synchronisierung und die manuelle DAG-Bestätigung im Bereich „Google Cloud“.
Abbildung 1. Architekturdiagramm mit den Schritten des Ablaufs (zum Vergrößern klicken)

Die CI/CD-Pipeline zum Testen, Synchronisieren und Bereitstellen von DAGs umfasst die folgenden Schritte:

  1. Sie nehmen eine Änderung an einem DAG vor und übertragen diese Änderung in einen Entwicklungszweig in Ihrem Repository.

  2. Sie öffnen eine Pull-Anfrage für den Hauptzweig Ihres Repositorys.

  3. Cloud Build führt Einheitentests aus, um die Gültigkeit des DAG zu prüfen.

  4. Ihre Pull-Anfrage wurde genehmigt und mit dem Hauptzweig Ihres Repositorys zusammengeführt.

  5. Cloud Build synchronisiert Ihre Cloud Composer-Entwicklungsumgebung mit diesen neuen Änderungen.

  6. Sie prüfen, ob sich der DAG in Ihrer Entwicklungsumgebung wie erwartet verhält.

  7. Wenn der DAG wie erwartet funktioniert, laden Sie ihn in Ihre Cloud Composer-Produktionsumgebung hoch.

Lernziele

Hinweise

  • In diesem Leitfaden wird davon ausgegangen, dass Sie mit zwei identischen Cloud Composer-Umgebungen arbeiten: mit einer Entwicklungsumgebung und einer Produktionsumgebung.

    In dieser Anleitung konfigurieren Sie eine CI/CD-Pipeline nur für Ihre Entwicklungsumgebung. Achten Sie darauf, dass die von Ihnen verwendete Umgebung keine Produktionsumgebung ist.

  • In diesem Leitfaden wird davon ausgegangen, dass Ihre DAGs und deren Tests in einem GitHub-Repository gespeichert sind.

    In der CI/CD-Beispielpipeline wird der Inhalt eines Beispiel-Repositorys veranschaulicht. DAGs und Tests werden im Verzeichnis dags/ mit Anforderungsdateien, der Datei mit den Einschränkungen und den Cloud Build-Konfigurationsdateien auf oberster Ebene gespeichert. Das DAG-Synchronisierungsdienstprogramm und seine Anforderungen befinden sich im Verzeichnis utils.

    Diese Struktur kann für Airflow 1-, Airflow 2-, Cloud Composer 1- und Cloud Composer 2-Umgebungen verwendet werden.

Prüfjob und Einheitentests für Vorabprüfung erstellen

Der erste Cloud Build-Job führt eine Prüfung zur Vorabübermittlung aus, die Einheitentests für Ihre DAGs ausführt.

Unittests hinzufügen

Erstellen Sie Unittests für Ihre DAGs, falls noch nicht geschehen. Speichern Sie diese Tests zusammen mit den DAGs in Ihrem Repository jeweils mit dem Suffix _test. Die Testdatei für den DAG in example_dag.py ist beispielsweise example_dag_test.py. Dies sind die Tests, die als Vorabprüfung in Ihrem Repository ausgeführt werden.

Cloud Build-YAML-Konfiguration für die Prüfung vor dem Senden erstellen

Erstellen Sie in Ihrem Repository eine YAML-Datei mit dem Namen test-dags.cloudbuild.yaml, die Ihren Cloud Build-Job für Vorabprüfungen konfiguriert. Sie umfasst drei Schritte:

  1. Installieren Sie die für Ihre DAGs erforderlichen Abhängigkeiten.
  2. Installieren Sie die für Ihre Einheitentests erforderlichen Abhängigkeiten.
  3. Führen Sie die DAG-Tests aus.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Cloud Build-Trigger für die Vorabprüfung erstellen

Folgen Sie der Anleitung Repositories auf GitHub erstellen, um einen auf GitHub basierenden Trigger mit den folgenden Konfigurationen zu erstellen:

  • Name: test-dags

  • Ereignis: Pull-Anfrage

  • Quelle – Repository: Wählen Sie Ihr Repository aus.

  • Quelle: Basiszweig: ^main$ (ändern Sie main in den Namen des Basiszweigs Ihres Repositorys, falls erforderlich)

  • Quelle: Kommentarsteuerung: nicht erforderlich

  • Build-Konfiguration – Cloud-Build-Konfigurationsdatei: /test-dags.cloudbuild.yaml (der Pfad zu Ihrer Build-Datei)

DAG-Synchronisierungsjob erstellen und DAGs-Dienstprogrammskript hinzufügen

Konfigurieren Sie als Nächstes einen Cloud Build-Job, der ein DAGs-Dienstprogrammskript ausführt. Das Dienstprogrammskript in diesem Job synchronisiert die DAGs mit Ihrer Cloud Composer-Umgebung, nachdem sie mit dem Hauptzweig in Ihrem Repository zusammengeführt wurden.

DAGs-Dienstprogrammskript hinzufügen

Fügen Sie Ihrem Repository das Skript des DAG-Dienstprogramms hinzu. Dieses Dienstprogrammskript kopiert alle DAG-Dateien im Verzeichnis dags/ Ihres Repositorys in ein temporäres Verzeichnis und ignoriert alle Nicht-DAG-Python-Dateien. Das Skript verwendet dann die Cloud Storage-Clientbibliothek, um alle Dateien aus diesem temporären Verzeichnis in das Verzeichnis dags/ im Bucket Ihrer Cloud Composer-Umgebung hochzuladen.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage

def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)

def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Cloud Build-YAML-Konfiguration zum Synchronisieren von DAGs erstellen

Erstellen Sie in Ihrem Repository eine YAML-Datei mit dem Namen add-dags-to-composer.cloudbuild.yaml, die Ihren Cloud Build-Job für die Synchronisierung von DAGs konfiguriert. Sie umfasst zwei Schritte:

  1. Installieren Sie die vom DAGs-Dienstprogramm benötigten Abhängigkeiten.

  2. Führen Sie das Dienstprogrammskript aus, um die DAGs in Ihrem Repository mit Ihrer Cloud Composer-Umgebung zu synchronisieren.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Cloud Build-Trigger erstellen

Folgen Sie der Anleitung Repositories auf GitHub erstellen, um einen auf GitHub basierenden Trigger mit den folgenden Konfigurationen zu erstellen:

  • Name: add-dags-to-composer

  • Ereignis: Push zu einem Zweig

  • Quelle – Repository: Wählen Sie Ihr Repository aus.

  • Quelle: Basiszweig: ^main$ (ändern Sie main in den Namen des Basiszweigs Ihres Repositorys, falls erforderlich)

  • Quelle – Filter für enthaltene Dateien (glob): dags/**

  • Build-Konfiguration – Cloud-Build-Konfigurationsdatei: /add-dags-to-composer.cloudbuild.yaml (der Pfad zu Ihrer Build-Datei)

Fügen Sie in der erweiterten Konfiguration zwei Substitutionsvariablen hinzu:

  • _DAGS_DIRECTORY ist das Verzeichnis, in dem sich die DAgs in Ihrem Repository befinden. Wenn Sie das Beispiel-Repository aus dieser Anleitung verwenden, ist es dags/.

  • _DAGS_BUCKET ist der Cloud Storage-Bucket, der das Verzeichnis dags/ in Ihrer Cloud Composer-Entwicklungsumgebung enthält. Lassen Sie das Präfix gs:// weg. Beispiel: us-central1-example-env-1234ab56-bucket.

CI/CD-Pipeline testen

Folgen Sie in diesem Abschnitt einem DAG-Entwicklungsablauf, der Ihre neu erstellten Cloud Build-Trigger verwendet.

Vorabübermittlungsjob ausführen

Erstellen Sie eine Pull-Anfrage an Ihren Hauptzweig, um Ihren Build zu testen. Suchen Sie auf der Seite nach Ihrer Prüfung für das Vorabsenden. Klicken Sie auf Details und wählen Sie Weitere Details in Google Cloud Build ansehen aus, um Ihre Build-Logs in der Google Cloud Console aufzurufen.

Screenshot der GitHub-Prüfung namens „test-dags“ mit einem roten Pfeil, der auf den Projektnamen in Klammern zeigt
Abbildung 2. Screenshot des Cloud Build-Prüfstatus vor dem Senden auf GitHub (zum Vergrößern klicken)

Wenn das Vorabprüfungsverfahren nicht erfolgreich war, lesen Sie Build-Fehler beheben.

Prüfen, ob der DAG in Ihrer Cloud Composer-Entwicklungsumgebung funktioniert

Nachdem Ihre Pull-Anfrage genehmigt wurde, führen Sie sie mit dem Hauptzweig zusammen. Build-Ergebnisse in der Google Cloud Console ansehen Wenn Sie viele Cloud Build-Trigger haben, können Sie Ihre Builds nach dem Triggernamen add-dags-to-composer filtern.

Wenn der Cloud Build-Synchronisierungsjob erfolgreich war, wird der synchronisierte DAG in der Cloud Composer-Entwicklungsumgebung angezeigt. Dort können Sie prüfen, ob der DAG wie erwartet funktioniert.

DAG zur Produktionsumgebung hinzufügen

Wenn der DAG wie erwartet funktioniert, fügen Sie ihn manuell der Produktionsumgebung hinzu. Laden Sie dazu die DAG-Datei in das Verzeichnis dags/ im Bucket Ihrer Cloud Composer-Produktionsumgebung hoch.

Wenn der DAG-Synchronisierungsjob fehlgeschlagen ist oder sich der DAG in Ihrer Cloud Composer-Entwicklungsumgebung nicht wie erwartet verhält, lesen Sie Build-Fehler beheben.

Build-Fehler beheben

In diesem Abschnitt wird erläutert, wie Sie mit gängigen Build-Fehlerszenarien umgehen.

Was kann ich tun, wenn meine Vorabprüfung fehlgeschlagen ist?

Klicken Sie in der Pull-Anfrage auf Details und wählen Sie Weitere Details in Google Cloud Build ansehen aus, um Ihre Build-Logs in der Google Cloud Console aufzurufen. Verwenden Sie diese Logs, um das Problem mit Ihrem DAG zu beheben. Sobald Sie die Probleme behoben haben, führen Sie die Fehlerbehebung aus und wenden Sie sich an Ihren Branch. Die Prüfung des Vorabeinreichs wird noch einmal ausgeführt und Sie können die Iteration mithilfe der Logs als Debugging-Tool fortsetzen.

Was passiert, wenn mein DAG-Synchronisierungsjob fehlgeschlagen ist?

Build-Ergebnisse in der Google Cloud Console ansehen Wenn Sie viele Cloud Build-Trigger haben, können Sie Ihre Builds nach dem Triggernamen add-dags-to-composer filtern. Prüfen Sie die Logs des Build-Jobs und beheben Sie die Fehler. Falls Sie weitere Hilfe beim Beheben der Fehler benötigen, nutzen Sie die Supportkanäle.

Was kann ich tun, wenn mein DAG in meiner Cloud Composer-Umgebung nicht ordnungsgemäß funktioniert?

Wenn der DAG in Ihrer Cloud Composer-Entwicklungsumgebung nicht wie erwartet funktioniert, stufen Sie den DAG nicht manuell in Ihre Cloud Composer-Produktionsumgebung hoch. Führen Sie stattdessen einen der folgenden Schritte aus:

  • Setzen Sie die Pull-Anfrage zurück und verwenden Sie die Änderungen, durch die der DAG unterbrochen wurde. Dadurch wird der Status direkt vor den Änderungen wiederhergestellt. Dadurch werden auch alle anderen Dateien in dieser Pull-Anfrage zurückgesetzt.
  • Erstellen Sie eine neue Pull-Anfrage, um Änderungen am fehlerhaften DAG manuell rückgängig zu machen.
  • Erstellen Sie eine neue Pull-Anfrage, um die Fehler in Ihrem DAG zu beheben.

Wenn Sie einen dieser Schritte ausführen, wird eine neue Prüfung des Vorabsendens und nach der Zusammenführung der DAG-Synchronisierungsjob ausgelöst.

Nächste Schritte