Testa, sincronizza ed esegui il deployment dei DAG da GitHub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Questa guida spiega come creare una pipeline CI/CD per testare, sincronizzare esegui il deployment dei DAG nel tuo ambiente Cloud Composer dal tuo GitHub repository Git.

Se vuoi sincronizzare solo i dati di altri servizi, vedi Trasferire dati da altri servizi.

Panoramica della pipeline CI/CD

Diagramma dell'architettura che mostra i passaggi del flusso. Le operazioni preliminari e la revisione PR si trovano nella sezione GitHub, mentre la sincronizzazione dei DAG e la verifica manuale del DAG si trovano nella sezione Google Cloud.
Figura 1. Diagramma dell'architettura che mostra le fasi del flusso (fai clic per ingrandire)
.

La pipeline CI/CD che per testare, sincronizzare ed eseguire il deployment dei DAG ha seguenti passaggi:

  1. Apporta una modifica a un DAG ed esegui il push della modifica in un ramo di sviluppo nel tuo repository.

  2. Apri una richiesta di pull per il ramo principale del repository.

  3. Cloud Build esegue test delle unità per verificare che il DAG sia valido.

  4. La tua richiesta di pull è stata approvata e unita al ramo principale del tuo repository Git.

  5. Cloud Build sincronizza lo sviluppo nell'ambiente Cloud Composer con queste nuove modifiche.

  6. Verifichi che il DAG si comporti come previsto nel tuo sviluppo completamente gestito di Google Cloud.

  7. Se il DAG funziona come previsto, lo carichi nella tua produzione nell'ambiente Cloud Composer.

Obiettivi

Prima di iniziare

  • Questa guida presuppone che tu stia lavorando con due istanze Ambienti Cloud Composer: un ambiente di sviluppo nell'ambiente di produzione.

    Ai fini di questa guida, configuri solo una pipeline CI/CD per il tuo ambiente di sviluppo. Assicurati che l'ambiente che utilizzi non è un ambiente di produzione.

  • Questa guida presuppone che i DAG e i relativi test siano archiviati in un GitHub repository Git.

    La pipeline CI/CD di esempio che illustra i contenuti di un repository di esempio. I DAG e i test archiviati nella directory dags/, con i file dei requisiti, i vincoli e i file di configurazione di Cloud Build archiviati al livello più alto. L'utilità di sincronizzazione DAG e i suoi requisiti si trovano nella Directory utils.

Creare un job di controllo prima dell'invio e i test delle unità

Il primo job Cloud Build esegue un controllo pre-invio, che esegue l'unità per i tuoi DAG.

Aggiungi test delle unità

Se non l'hai già fatto, seleziona l'autore test delle unità per i DAG. Salva questi test insieme a DAG nel tuo repository, ciascuno con il suffisso _test. Ad esempio, il test del DAG in example_dag.py è example_dag_test.py. Queste sono le test eseguiti come controllo pre-invio nel repository.

Crea una configurazione YAML di Cloud Build per il controllo pre-invio

Nel tuo repository, crea un file YAML denominato test-dags.cloudbuild.yaml che configura il job Cloud Build per i controlli pre-invio. Al suo interno, tre passaggi:

  1. Installa le dipendenze necessarie dai DAG.
  2. Installa le dipendenze necessarie per i test delle unità.
  3. Esegui i test del DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Crea il trigger di Cloud Build per il controllo pre-invio

Segui la pagina Creazione di repository da GitHub guida alla creazione di un trigger basato su app GitHub con le seguenti configurazioni:

  • Nome: test-dags

  • Evento: richiesta di pull

  • Origine - Repository: scegli il repository

  • Origine - Ramo di base: ^main$ (modifica main del nome del tuo ramo base del repository, se richiesto)

  • Origine - Controllo dei commenti: non obbligatorio

  • Configurazione build - File di configurazione di Cloud Build: /test-dags.cloudbuild.yaml (il percorso del file di build)

Crea un job di sincronizzazione dei DAG e aggiungi lo script di utilità dei DAG

Poi, configura un job Cloud Build che esegue uno script di utilità dei DAG. La lo script di utilità in questo job sincronizza i DAG con dell'ambiente Cloud Composer dopo l'unione nel ramo principale. nel tuo repository.

Aggiungi lo script di utilità dei DAG

Aggiungi lo script dell'utilità DAG al tuo repository. Questo script di utilità copia tutti i file DAG nella directory dags/ del tuo in una directory temporanea, ignorando tutti i file Python non DAG. La utilizza quindi la libreria client di Cloud Storage per caricare tutti i file da quella directory temporanea alla directory dags/ nel tuo del bucket dell'ambiente Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Crea una configurazione YAML di Cloud Build per la sincronizzazione dei DAG

Nel tuo repository, crea un file YAML denominato add-dags-to-composer.cloudbuild.yaml che configura Cloud Build per la sincronizzazione dei DAG. Al suo interno, sono previsti due passaggi:

  1. Installa le dipendenze necessarie dallo script dell'utilità dei DAG.

  2. Esegui lo script di utilità per sincronizzare i DAG nel tuo repository con nell'ambiente Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

crea il trigger di Cloud Build

Segui la pagina Creazione di repository da GitHub guida alla creazione di un trigger basato su app GitHub con le seguenti configurazioni:

  • Nome: add-dags-to-composer

  • Evento: push a un ramo

  • Origine - Repository: scegli il repository

  • Origine - Ramo di base: ^main$ (modifica main del nome del tuo ramo base del repository, se richiesto)

  • Origine. Filtro dei file inclusi (glob): dags/**

  • Configurazione build - File di configurazione di Cloud Build: /add-dags-to-composer.cloudbuild.yaml (il percorso del file di build)

Nella configurazione avanzata, aggiungi due variabili di sostituzione:

  • _DAGS_DIRECTORY: la directory in cui si trovano i DAG nel tuo repository. Se utilizzi il repository di esempio di questa guida, si tratta di dags/.

  • _DAGS_BUCKET: il bucket Cloud Storage che contiene la classe Directory dags/ nel tuo Cloud Composer di sviluppo completamente gestito di Google Cloud. Ometti il prefisso gs://. Ad esempio: us-central1-example-env-1234ab56-bucket.

Testa la tua pipeline CI/CD

In questa sezione, segui un flusso di sviluppo DAG che utilizza le tue nuove i trigger di Cloud Build creati.

Esegui un job di preregistrazione

Crea una richiesta di pull al ramo principale per testare la build. Individua il tuo controllo prima dell'invio della pagina. Fai clic su Dettagli e scegli Visualizza altri dettagli su Google Cloud Build per vedere i log di build nella nella console Google Cloud.

Screenshot di un controllo GitHub chiamato test-dags con una freccia rossa che punta al nome del progetto tra parentesi
Figura 2. Screenshot dello stato del controllo pre-invio di Cloud Build su GitHub (fai clic per ingrandire)
.

Se il controllo prima dell'invio non è andato a buon fine, vedi Risolvere gli errori delle build.

Verifica che il DAG funzioni nel tuo ambiente Cloud Composer di sviluppo

Una volta approvata la richiesta di pull, uniscila al ramo principale. Utilizza la dalla console Google Cloud visualizzare i risultati della build. Se disponi di molti Puoi filtrare le build in base al nome del trigger add-dags-to-composer.

Una volta completato il job di sincronizzazione di Cloud Build, viene visualizzato il DAG sincronizzato nel tuo ambiente di sviluppo Cloud Composer. Lì puoi verificare che il DAG funzioni come previsto.

Aggiungi il DAG al tuo ambiente di produzione

Quando il DAG si comporta come previsto, aggiungilo manualmente alla tua produzione completamente gestito di Google Cloud. Per farlo, carica il file DAG alla directory dags/ nel Cloud Composer di produzione del bucket dell'ambiente.

Se il job di sincronizzazione DAG non è riuscito o se il DAG non si comporta come previsto dell'ambiente di sviluppo Cloud Composer, Gestire gli errori di build.

Gestione degli errori delle build

Questa sezione spiega come risolvere scenari comuni di errore delle build.

Che cosa succede se il controllo prima dell'invio non è andato a buon fine?

Nella richiesta di pull, fai clic su Dettagli e scegli Visualizza altri dettagli su Google Cloud Build per vedere i log di build nella nella console Google Cloud. Utilizza questi log per eseguire il debug del problema relativo al tuo con il DAG. Una volta risolti i problemi, esegui il commit della correzione ed eseguine il push al ramo. Il controllo prima dell'invio viene eseguito di nuovo e puoi continuare a eseguire l'iterazione utilizzando i log come strumento di debug.

Cosa succede se il mio job di sincronizzazione DAG non riesce?

Utilizzare la console Google Cloud per visualizzare i risultati della build. Se disponi di molti Puoi filtrare le build in base al nome del trigger add-dags-to-composer. Esamina i log del job di build e risolvi errori. Se hai bisogno di ulteriore assistenza per risolvere gli errori, utilizza canali di assistenza.

Cosa succede se il mio DAG non funziona correttamente nel mio ambiente Cloud Composer?

Se il DAG non funziona come previsto nel tuo sviluppo nell'ambiente Cloud Composer, non promuovere manualmente il DAG nell'ambiente di produzione Cloud Composer. Esegui invece una delle seguenti:

  • Ripristinare la richiesta di pull con le modifiche che hanno danneggiato il DAG per ripristinarlo allo stato immediatamente prima delle modifiche (verranno ripristinati anche tutti gli altri file nella richiesta di pull).
  • Crea una nuova richiesta di pull per ripristinare manualmente le modifiche al DAG non funzionante.
  • Crea una nuova richiesta di pull per correggere gli errori nel tuo DAG.

Seguendo uno di questi passaggi attivi un nuovo controllo prima dell'invio e, al momento dell'unione, il job di sincronizzazione dei DAG.

Passaggi successivi