Teste, sincronize e implemente os seus DAGs a partir do GitHub

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Este guia explica como criar um pipeline de CI/CD para testar, sincronizar e implementar DAGs no seu ambiente do Cloud Composer a partir do seu repositório do GitHub.

Se quiser sincronizar apenas dados de outros serviços, consulte o artigo Transfira dados de outros serviços.

Vista geral do pipeline de CI/CD

Diagrama de arquitetura que mostra os passos do fluxo. A pré-submissão e a revisão de PR estão na secção do GitHub, e a sincronização de DAG e a validação manual de DAG estão na secção Google Cloud .
Figura 1. Diagrama de arquitetura que mostra os passos do fluxo (clique para ampliar)

O pipeline de CI/CD para testar, sincronizar e implementar DAGs tem os seguintes passos:

  1. Faz uma alteração a um DAG e envia essa alteração para um ramo de programação no seu repositório.

  2. Abre um pedido de obtenção em relação ao ramo principal do seu repositório.

  3. O Cloud Build executa testes unitários para verificar se o seu DAG é válido.

  4. O seu pedido de envio é aprovado e integrado no ramo principal do seu repositório.

  5. O Cloud Build sincroniza o seu ambiente de desenvolvimento do Cloud Composer com estas novas alterações.

  6. Verifica se o DAG se comporta como esperado no seu ambiente de programação.

  7. Se o DAG funcionar como esperado, carregue-o para o seu ambiente de produção do Cloud Composer.

Objetivos

Antes de começar

  • Este guia pressupõe que está a trabalhar com dois ambientes do Cloud Composer idênticos: um ambiente de desenvolvimento e um ambiente de produção.

    Para os fins deste guia, está a configurar um pipeline de CI/CD apenas para o seu ambiente de desenvolvimento. Certifique-se de que o ambiente que usa não é um ambiente de produção.

  • Este guia pressupõe que tem os seus DAGs e respetivos testes armazenados num repositório do GitHub.

    O exemplo de pipeline de CI/CD demonstra o conteúdo de um repositório de exemplo. Os DAGs e os testes são armazenados no diretório dags/, com ficheiros de requisitos, o ficheiro de restrições e os ficheiros de configuração do Cloud Build armazenados ao nível superior. O utilitário de sincronização DAG e os respetivos requisitos encontram-se no diretório utils.

    Esta estrutura pode ser usada para ambientes do Airflow 1, Airflow 2, Cloud Composer 1 e Cloud Composer 2.

Crie uma tarefa de verificação de pré-envio e testes unitários

A primeira tarefa do Cloud Build executa uma verificação de pré-envio, que executa testes unitários para os seus DAGs.

Adicione testes de unidades

Se ainda não o fez, crie testes unitários para os seus DAGs. Guarde estes testes juntamente com os DAGs no seu repositório, cada um com o sufixo _test. Por exemplo, o ficheiro de teste do DAG em example_dag.py é example_dag_test.py. Estes são os testes que são executados como uma verificação de pré-envio no seu repositório.

Crie uma configuração YAML do Cloud Build para a verificação de pré-envio

No seu repositório, crie um ficheiro YAML com o nome test-dags.cloudbuild.yaml que configura a tarefa do Cloud Build para verificações de pré-envio. Neste, existem três passos:

  1. Instale as dependências necessárias para os seus DAGs.
  2. Instale as dependências necessárias para os testes unitários.
  3. Execute os testes de DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Crie o acionador do Cloud Build para a verificação de pré-envio

Siga o guia Criar repositórios a partir do GitHub para criar um acionador baseado na app GitHub com as seguintes configurações:

  • Nome: test-dags

  • Evento: pedido de envio

  • Origem – Repositório: escolha o seu repositório

  • Origem: ramificação base: ^main$ (altere main para o nome da ramificação base do seu repositório, se necessário)

  • Fonte – Controlo de comentários: não obrigatório

  • Configuração da compilação: ficheiro de configuração da compilação na nuvem: /test-dags.cloudbuild.yaml (o caminho para o ficheiro de compilação)

Crie uma tarefa de sincronização de DAG e adicione o script de utilidade DAGs

Em seguida, configure uma tarefa do Cloud Build que execute um script de utilidade DAGs. O script de utilidade neste trabalho sincroniza os seus DAGs com o seu ambiente do Cloud Composer depois de serem unidos ao ramo principal no seu repositório.

Adicione o script de utilidade DAGs

Adicione o script de utilidade DAG ao seu repositório. Este script de utilidade copia todos os ficheiros DAG no diretório dags/ do seu repositório para um diretório temporário, ignorando todos os ficheiros Python que não sejam DAG. Em seguida, o script usa a biblioteca de cliente do Cloud Storage para carregar todos os ficheiros desse diretório temporário para o diretório dags/ no contentor do seu ambiente do Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Crie uma configuração YAML do Cloud Build para sincronizar DAGs

No seu repositório, crie um ficheiro YAML com o nome add-dags-to-composer.cloudbuild.yaml que configure a tarefa do Cloud Build para sincronizar DAGs. Este processo tem dois passos:

  1. Instale as dependências necessárias para o script de utilidade DAGs.

  2. Execute o script de utilidade para sincronizar os DAGs no seu repositório com o seu ambiente do Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Crie o acionador do Cloud Build

Siga o guia Criar repositórios a partir do GitHub para criar um acionador baseado na app GitHub com as seguintes configurações:

  • Nome: add-dags-to-composer

  • Evento: enviar para um ramo

  • Origem – Repositório: escolha o seu repositório

  • Origem: ramificação base: ^main$ (altere main para o nome da ramificação base do seu repositório, se necessário)

  • Origem: filtro de ficheiros incluídos (glob): dags/**

  • Configuração da compilação: ficheiro de configuração da compilação na nuvem: /add-dags-to-composer.cloudbuild.yaml (o caminho para o ficheiro de compilação)

Na configuração avançada, adicione duas variáveis de substituição:

  • _DAGS_DIRECTORY: o diretório onde os DAGs estão localizados no seu repositório. Se estiver a usar o repositório de exemplo deste guia, é dags/.

  • _DAGS_BUCKET – o contentor do Cloud Storage que contém o diretório dags/ no seu ambiente de desenvolvimento do Cloud Composer. Omita o prefixo gs://. Por exemplo: us-central1-example-env-1234ab56-bucket.

Teste o pipeline de CI/CD

Nesta secção, siga um fluxo de desenvolvimento de DAG que usa os acionadores do Cloud Build criados recentemente.

Execute uma tarefa de pré-envio

Crie um pedido de obtenção para a ramificação principal para testar a compilação. Localize a verificação pré-envio na página. Clique em Detalhes e escolha Ver mais detalhes no Google Cloud Build para ver os registos de compilação na Google Cloud consola.

Captura de ecrã de uma verificação do GitHub denominada test-dags com uma seta vermelha a apontar para o nome do projeto entre parênteses
Figura 2. Captura de ecrã do estado da verificação de pré-envio do Cloud Build no GitHub (clique para aumentar)

Se a verificação pré-envio falhou, consulte o artigo Resolva falhas de compilação.

Valide se o seu DAG funciona no ambiente de desenvolvimento do Cloud Composer

Depois de o seu pedido de obtenção ser aprovado, faça a união com o ramo principal. Use a Google Cloud consola para ver os resultados da compilação. Se tiver muitos acionadores do Cloud Build, pode filtrar as compilações pelo nome do acionador add-dags-to-composer.

Depois de a tarefa de sincronização do Cloud Build ser bem-sucedida, o DAG sincronizado é apresentado no seu ambiente de desenvolvimento do Cloud Composer. Aí, pode validar se o DAG funciona como esperado.

Adicione o DAG ao seu ambiente de produção

Depois de o DAG ter o desempenho esperado, adicione-o manualmente ao seu ambiente de produção. Para tal, carregue o ficheiro DAG para o diretório dags/ no contentor do ambiente de produção do Cloud Composer.

Se a tarefa de sincronização do DAG falhou ou se o DAG não está a funcionar como esperado no seu ambiente de desenvolvimento do Cloud Composer, consulte o artigo Resolver falhas de compilação.

Resolver falhas de compilação

Esta secção explica como resolver cenários comuns de falha de compilação.

E se a minha verificação pré-envio falhar?

No pedido de obtenção, clique em Detalhes e escolha Ver mais detalhes no Google Cloud Build para ver os registos de compilação na Google Cloud consola. Use estes registos para ajudar a depurar o problema com o seu DAG. Depois de resolver os problemas, confirme a correção e envie-a para o seu ramo. A verificação de pré-envio é executada novamente e pode continuar a iterar através dos registos como ferramenta de depuração.

E se a minha tarefa de sincronização de DAG falhar?

Use a Google Cloud consola para ver os resultados da compilação. Se tiver muitos acionadores do Cloud Build, pode filtrar as compilações pelo nome do acionador add-dags-to-composer. Examine os registos da tarefa de compilação e resolva os erros. Se precisar de ajuda adicional para resolver os erros, use os canais de apoio técnico.

O que acontece se o meu DAG não funcionar corretamente no meu ambiente do Cloud Composer?

Se o seu DAG não funcionar como esperado no ambiente de desenvolvimento do Cloud Composer, não promova manualmente o DAG para o ambiente de produção do Cloud Composer. Em alternativa, efetue uma das seguintes ações:

  • Reverta o pedido de envio com as alterações que danificaram o seu DAG para o restaurar ao estado imediatamente anterior às suas alterações (esta ação também reverte todos os outros ficheiros nesse pedido de envio).
  • Crie um novo pedido de obtenção para reverter manualmente as alterações ao DAG danificado.
  • Crie um novo pedido de envio para corrigir os erros no seu DAG.

A execução de qualquer um destes passos aciona uma nova verificação de pré-envio e, após a união, a tarefa de sincronização do DAG.

O que se segue?