DAGs aus GitHub testen, synchronisieren und bereitstellen

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

In diesem Leitfaden wird erläutert, wie Sie eine CI/CD-Pipeline erstellen, um DAGs aus Ihrem GitHub-Repository zu testen, zu synchronisieren und in Ihrer Cloud Composer-Umgebung bereitzustellen.

Wenn Sie nur Daten aus anderen Diensten synchronisieren möchten, lesen Sie den Hilfeartikel Daten aus anderen Diensten übertragen.

CI/CD-Pipeline – Übersicht

Architekturdiagramm mit den Schritten des Ablaufs Das Vorabsenden und die PR-Überprüfung befinden sich im GitHub-Bereich, die DAG-Synchronisierung und die manuelle DAG-Überprüfung im Bereich „Google Cloud“.
Abbildung 1. Architekturdiagramm mit den Schritten des Ablaufs (zum Vergrößern anklicken)

Die CI/CD-Pipeline zum Testen, Synchronisieren und Bereitstellen von DAGs umfasst die folgenden Schritte:

  1. Sie nehmen eine Änderung an einem DAG vor und übertragen diese Änderung in einen Entwicklungszweig in Ihrem Repository gespeichert.

  2. Sie öffnen eine Pull-Anfrage für den Hauptzweig Ihres Repositorys.

  3. Cloud Build führt Einheitentests aus, um die Gültigkeit des DAG zu prüfen.

  4. Ihre Pull-Anfrage wurde genehmigt und mit dem Hauptzweig Ihres zu erstellen.

  5. Cloud Build synchronisiert Ihre Entwicklung Cloud Composer-Umgebung mit diesen neuen Änderungen.

  6. Sie prüfen, ob sich der DAG in der Entwicklung wie erwartet verhält. zu verbessern.

  7. Wenn der DAG wie erwartet funktioniert, laden Sie ihn in Ihre Produktions-Cloud Composer-Umgebung hoch.

Lernziele

Hinweise

  • In diesem Leitfaden wird davon ausgegangen, dass Sie mit zwei identischen Cloud Composer-Umgebungen: Entwicklungsumgebung und der Produktionsumgebung.

    Für die Zwecke dieser Anleitung konfigurieren Sie nur eine CI/CD-Pipeline. für Ihre Entwicklungsumgebung. Achten Sie darauf, dass die Umgebung, ist keine Produktionsumgebung.

  • In dieser Anleitung wird davon ausgegangen, dass Sie Ihre DAGs und ihre Tests in einem GitHub-Repository gespeichert haben.

    Die Beispiel für eine CI/CD-Pipeline zeigt den Inhalt eines Beispiel-Repositorys. DAGs und Tests sind mit Anforderungsdateien, die im Verzeichnis dags/ gespeichert sind, und Cloud Build-Konfigurationsdateien, die auf oberster Ebene gespeichert sind. Das DAG-Synchronisierungstool und die zugehörigen Anforderungen befinden sich im Verzeichnis utils.

Job zur Vorabprüfung und Einheitentests erstellen

Der erste Cloud Build-Job führt eine Vorabprüfung durch, bei der Unit-Tests für Ihre DAGs ausgeführt werden.

Einheitentests hinzufügen

Falls noch nicht geschehen, erstellen Sie Einheitentests für Ihre DAGs. Speichern Sie diese Tests zusammen mit den DAGs in Ihrem Repository, jeweils mit dem Suffix _test. Der Test für den DAG in example_dag.py ist example_dag_test.py. Das sind die Tests, die in Ihrem Repository als Vorabprüfung ausgeführt werden.

Cloud Build-YAML-Konfiguration für die Vorabprüfung erstellen

Erstellen Sie in Ihrem Repository eine YAML-Datei mit dem Namen test-dags.cloudbuild.yaml, in der Ihr Cloud Build-Job für Vorabprüfungen konfiguriert wird. Es umfasst drei Schritte:

  1. Installieren Sie die Abhängigkeiten, die für Ihre DAGs erforderlich sind.
  2. Installieren Sie die Abhängigkeiten, die für Ihre Einheitentests erforderlich sind.
  3. Führen Sie die DAG-Tests aus.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Cloud Build-Trigger für die Vorabprüfung erstellen

Folgen Sie der Anleitung unter Repositories aus GitHub erstellen, um einen GitHub App-basierten Trigger mit den folgenden Konfigurationen zu erstellen:

  • Name: test-dags

  • Ereignis: Pull-Anfrage

  • Quelle: Repository: Wählen Sie Ihr Repository aus.

  • Quelle – Basiszweig: ^main$ (ändern Sie main in den Namen Ihres Basiszweig des Repositorys (falls erforderlich)

  • Quelle – Kommentarsteuerung: nicht erforderlich

  • Build-Konfiguration – Cloud Build-Konfigurationsdatei:/test-dags.cloudbuild.yaml (Pfad zur Build-Datei)

DAG-Synchronisierungsjob erstellen und DAG-Dienstprogrammscript hinzufügen

Konfigurieren Sie als Nächstes einen Cloud Build-Job, mit dem ein DAGs-Dienstprogrammskript ausgeführt wird. Die synchronisiert Ihre DAGs mit Ihrem Dienstprogrammskript Cloud Composer-Umgebung, nachdem sie mit dem Hauptzweig zusammengeführt wurden in Ihrem Repository gespeichert.

DAGs-Dienstprogrammskript hinzufügen

Fügen Sie Ihrem Repository das DAG-Dienstprogrammskript hinzu. Dieses Dienstprogrammskript kopiert alle DAG-Dateien im Verzeichnis dags/ Ihres Repositorys in ein temporäres Verzeichnis und ignoriert alle Python-Dateien, die keine DAG-Dateien sind. Die Dann verwendet das Skript die Cloud Storage-Clientbibliothek, um alle Dateien hochzuladen. von diesem temporären Verzeichnis in das Verzeichnis dags/ in Ihrem Bucket der Cloud Composer-Umgebung.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Cloud Build-YAML-Konfiguration zum Synchronisieren von DAGs erstellen

Erstellen Sie in Ihrem Repository eine YAML-Datei mit dem Namen add-dags-to-composer.cloudbuild.yaml, der Cloud Build konfiguriert zum Synchronisieren von DAGs. Sie besteht aus zwei Schritten:

  1. Installieren Sie die Abhängigkeiten, die für das DAG-Dienstprogrammskript erforderlich sind.

  2. Führen Sie das Dienstprogrammskript aus, um die DAGs in Ihrem Repository mit Ihrem Cloud Composer-Umgebung.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Cloud Build-Trigger erstellen

Folgen Sie der Anleitung unter Repositories aus GitHub erstellen. Anleitung zum Erstellen eines auf GitHub-Anwendung basierenden Triggers mit den folgenden Konfigurationen:

  • Name: add-dags-to-composer

  • Ereignis: Per Push-Befehl an Zweig übertragen

  • Quelle: Repository: Wählen Sie Ihr Repository aus.

  • Quelle – Basiszweig: ^main$ (ändern Sie main in den Namen Ihres Basiszweig des Repositorys (falls erforderlich)

  • Quelle – Filter für enthaltene Dateien (glob): dags/**

  • Build-Konfiguration – Cloud Build-Konfigurationsdatei: /add-dags-to-composer.cloudbuild.yaml (Pfad zur Build-Datei)

Fügen Sie in der erweiterten Konfiguration zwei Ersatzvariablen hinzu:

  • _DAGS_DIRECTORY: das Verzeichnis, in dem sich die DAGs in Ihrem Repository befinden. Wenn Sie das Beispiel-Repository aus dieser Anleitung verwenden, lautet der Name dags/.

  • _DAGS_BUCKET: Der Cloud Storage-Bucket, der das Verzeichnis dags/ in Ihrer Cloud Composer-Entwicklungsumgebung enthält. Lassen Sie das Präfix gs:// weg. Beispiel: us-central1-example-env-1234ab56-bucket.

CI/CD-Pipeline testen

Folgen Sie in diesem Abschnitt einem DAG-Entwicklungsablauf, der Ihre neuen Cloud Build-Trigger erstellt.

Vorabsendejob ausführen

Erstellen Sie eine Pull-Anfrage an Ihren Hauptzweig, um den Build zu testen. Finden Sie Ihren auf der Seite prüfen. Klicken Sie auf Details und wählen Sie Sehen Sie sich weitere Details zu Google Cloud Build an, um Ihre Build-Logs in der Google Cloud Console

Screenshot einer GitHub-Prüfung namens „test-dags“ mit einem roten Pfeil, der auf den Projektnamen in Klammern zeigt
Abbildung 2. Screenshot des Cloud Build-Status vor dem Einreichen auf GitHub (zum Vergrößern anklicken)

Wenn die Prüfung vor dem Einreichen fehlgeschlagen ist, lesen Sie den Hilfeartikel Fehlerbehebung bei Build-Fehlern.

Prüfen, ob Ihr DAG in Ihrer Cloud Composer-Entwicklungsumgebung funktioniert

Nachdem Ihr Pull-Request genehmigt wurde, führen Sie ihn mit Ihrem Hauptzweig zusammen. Verwenden Sie die Methode Google Cloud Console Build-Ergebnisse ansehen. Wenn Sie viele Cloud Build-Trigger können Sie Ihre Builds nach dem Triggernamen filtern. add-dags-to-composer

Wenn der Cloud Build-Synchronisierungsjob erfolgreich war, wird der synchronisierte DAG angezeigt. in Ihrer Cloud Composer-Entwicklungsumgebung. Hier können Sie überprüfen, ob der DAG wie erwartet funktioniert.

DAG der Produktionsumgebung hinzufügen

Wenn der DAG wie erwartet funktioniert, fügen Sie ihn Ihrer Produktionsumgebung manuell hinzu. Laden Sie dazu die DAG-Datei in das Verzeichnis dags/ des Buckets Ihrer Produktions-Cloud Composer-Umgebung hoch.

Wenn der DAG-Synchronisierungsjob fehlgeschlagen ist oder der DAG in Ihrem Entwicklungsumgebung von Cloud Composer, siehe Build-Fehler beheben

Build-Fehler beheben

In diesem Abschnitt wird erläutert, wie Sie häufige Build-Fehlerszenarien beheben.

Was passiert, wenn die Vorabprüfung fehlgeschlagen ist?

Klicken Sie in Ihrer Pull-Anfrage auf Details und wählen Sie Sehen Sie sich weitere Details zu Google Cloud Build an, um Ihre Build-Logs in der Google Cloud Console Verwenden Sie diese Protokolle, um das Problem mit Ihrem DAG Nachdem Sie die Probleme behoben haben, committen Sie die Korrektur und pushen Sie sie an Ihren Branch. Die Prüfung vor dem Senden wird noch einmal ausgeführt. Logs als Debugging-Tool verwenden.

Was passiert, wenn der DAG-Synchronisierungsauftrag fehlschlägt?

In der Google Cloud Console können Sie sich die Build-Ergebnisse ansehen. Wenn Sie viele Cloud Build-Trigger können Sie Ihre Builds nach dem Triggernamen filtern. add-dags-to-composer Prüfen Sie die Logs des Build-Jobs und lösen Sie den Fehler Fehler. Wenn Sie weitere Unterstützung beim Beheben der Fehler benötigen, nutzen Sie die Supportkanäle.

Was kann ich tun, wenn mein DAG in meiner Cloud Composer-Umgebung nicht richtig funktioniert?

Wenn Ihr DAG in Ihrer Cloud Composer-Entwicklungsumgebung nicht wie erwartet funktioniert, sollten Sie ihn nicht manuell in Ihre Cloud Composer-Produktionsumgebung hochstufen. Führen Sie stattdessen einen der folgenden Schritte aus:

  • Stellen Sie den Pull-Request mit den Änderungen zurück, die Ihren DAG beschädigt haben, um ihn in den Zustand vor Ihren Änderungen wiederherzustellen. Dadurch werden auch alle anderen Dateien in diesem Pull-Request rückgängig gemacht.
  • Erstellen Sie einen neuen Pull-Request, um die Änderungen an der fehlerhaften DAG manuell rückgängig zu machen.
  • Erstellen Sie eine neue Pull-Anfrage, um die Fehler im DAG zu beheben.

Wenn Sie einen dieser Schritte ausführen, wird eine neue Vorabprüfung ausgelöst. Nach der Zusammenführung den DAG-Synchronisierungsjob.

Nächste Schritte