Testar, sincronizar e implantar seus DAGs do GitHub

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Neste guia, explicamos como criar um pipeline de CI/CD para testar, sincronizar e implantar DAGs do GitHub no ambiente do Cloud Composer repositório de dados.

Se você quiser sincronizar apenas dados de outros serviços, consulte Transferir dados de outros serviços.

Visão geral do pipeline de CI/CD

Diagrama de arquitetura mostrando as etapas do fluxo. A pré-submissão e a revisão de PR estão na seção do GitHub, e a sincronização e a verificação manual de DAG estão na seção do Google Cloud.
Figura 1. Diagrama de arquitetura mostrando as etapas do fluxo (clique para ampliar)

O pipeline de CI/CD que testa, sincroniza e implanta DAGs tem as seguintes etapas:

  1. Você faz uma alteração em um DAG e envia essa mudança para uma ramificação de desenvolvimento no repositório.

  2. Você abre uma solicitação de envio na ramificação principal do repositório.

  3. O Cloud Build executa testes de unidade para verificar se o DAG é válido.

  4. Sua solicitação de envio foi aprovada e mesclada à ramificação principal do repositório.

  5. O Cloud Build sincroniza o desenvolvimento ambiente do Cloud Composer com essas novas alterações.

  6. Você verifica se o DAG se comporta conforme o esperado no ambiente de desenvolvimento.

  7. Se o DAG funcionar conforme o esperado, faça o upload dele para a produção ambiente do Cloud Composer.

Objetivos

Antes de começar

  • Este guia pressupõe que você está trabalhando com dois ambientes idênticos do Cloud Composer: um de desenvolvimento e um de produção.

    Neste guia, você configura apenas um pipeline de CI/CD para seu ambiente de desenvolvimento. Verifique se o ambiente que você usa não é de produção.

  • Este guia pressupõe que você tenha seus DAGs e seus testes armazenados em um GitHub repositório de dados.

    O exemplo de pipeline de CI/CD demonstra o conteúdo de um repositório de exemplo. Os DAGs e testes são armazenados no diretório dags/, com arquivos de requisitos, o arquivo de restrições e os arquivos de configuração do Cloud Build armazenados no nível superior. O utilitário de sincronização do DAG e os requisitos dele estão localizados no utils.

    Essa estrutura pode ser usada para os ambientes do Airflow 1, Airflow 2, Cloud Composer 1 e Cloud Composer 2.

Criar um job de verificação pré-envio e testes de unidade

O primeiro job do Cloud Build executa uma verificação de pré-envio, que executa testes de unidade para seus DAGs.

Adicionar testes de unidade

Se ainda não fez isso, crie testes de unidade para seus DAGs. Salve esses testes com os DAGs no repositório, cada um com o sufixo _test. Por exemplo, o teste para o DAG em example_dag.py é example_dag_test.py. Estes são os testes executados como uma verificação de pré-envio no seu repositório.

Criar configuração YAML do Cloud Build para a verificação de pré-envio

No repositório, crie um arquivo YAML chamado test-dags.cloudbuild.yaml que configure o job do Cloud Build para verificações de pré-envio. Nele, há três etapas:

  1. Instale as dependências necessárias para seus DAGs.
  2. Instale as dependências necessárias para seus testes de unidade.
  3. Execute os testes de DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Criar o gatilho do Cloud Build para a verificação de pré-envio

Siga as instruções em Como criar repositórios do GitHub. guia para criar um gatilho baseado em app do GitHub com as seguintes configurações:

  • Nome: test-dags

  • Evento: Solicitação de envio

  • Origem - Repositório: escolha o repositório.

  • Origem: ramificação base: ^main$ (mude main para o nome da ramificação base do repositório, se necessário)

  • Origem — Controle de comentários: não obrigatório

  • Configuração do build: arquivo de configuração do Cloud Build: /test-dags.cloudbuild.yaml (o caminho para o arquivo de build)

Criar um job de sincronização do DAG e adicionar o script utilitário deles

Em seguida, configure um job do Cloud Build que execute um script de utilitário de DAGs. O script de utilitário neste job sincroniza seus DAGs com o ambiente do Cloud Composer depois que eles são mesclados à ramificação principal no repositório.

Adicionar o script de utilitário dos DAGs

Adicione o script de utilitário DAG ao repositório. Esse script utilitário copia todos os arquivos DAG no diretório dags/ do em um diretório temporário, ignorando todos os arquivos Python que não sejam DAG. Em seguida, o script usa a biblioteca de cliente do Cloud Storage para fazer o upload de todos os arquivos desse diretório temporário para o diretório dags/ no bucket do ambiente do Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Criar configuração YAML do Cloud Build para sincronizar DAGs

Em seu repositório, crie um arquivo YAML chamado add-dags-to-composer.cloudbuild.yaml que configura o Cloud Build para sincronizar os DAGs. Nele, há duas etapas:

  1. Instale as dependências necessárias para o script de utilitário dos DAGs.

  2. Execute o script utilitário para sincronizar os DAGs no repositório com o ambiente do Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Crie o gatilho do Cloud Build

Siga as instruções em Como criar repositórios do GitHub. guia para criar um gatilho baseado em app do GitHub com as seguintes configurações:

  • Nome: add-dags-to-composer

  • Evento: enviar para uma ramificação

  • Origem - Repositório: escolha o repositório.

  • Origem: ramificação de base: ^main$ (mude main para o nome da na ramificação de base do repositório, se necessário)

  • Origem - Filtro de arquivos incluídos (glob): dags/**

  • Configuração do build: arquivo de configuração do Cloud Build: /add-dags-to-composer.cloudbuild.yaml (o caminho para o arquivo de build)

Na configuração avançada, adicione duas variáveis de substituição:

  • _DAGS_DIRECTORY: o diretório em que os DAGs estão localizados no repositório. Se você estiver usando o repositório de exemplo deste guia, ele será dags/.

  • _DAGS_BUCKET: o bucket do Cloud Storage que contém o diretório dags/ no ambiente de desenvolvimento do Cloud Composer. Omita o prefixo gs://. Por exemplo: us-central1-example-env-1234ab56-bucket.

Testar seu pipeline de CI/CD

Nesta seção, siga um fluxo de desenvolvimento do DAG que utiliza seu novo criou gatilhos do Cloud Build.

Executar um job de pré-envio

Crie uma solicitação de envio para a ramificação principal e teste o build. Localize seu verificação de pré-envio na página. Clique em Detalhes e escolha Confira mais detalhes no Google Cloud Build para acessar os registros de build na console do Google Cloud.

Captura de tela de uma verificação do GitHub chamada "test-dags" com uma seta vermelha apontando para o nome do projeto entre parênteses
Figura 2. Captura de tela do status de verificação de pré-envio do Cloud Build no GitHub (clique para ampliar)

Se a verificação de pré-envio falhar, consulte Como resolver falhas no build.

Validar se o DAG funciona no ambiente de desenvolvimento do Cloud Composer

Depois que a solicitação de envio for aprovada, mescle-a à ramificação principal. Use o console do Google Cloud para acessar os resultados da build. Se você tiver muitos gatilhos do Cloud Build, é possível filtrar os builds pelo nome do gatilho add-dags-to-composer:

Depois que o job de sincronização do Cloud Build for bem-sucedido, o DAG sincronizado será exibido no ambiente de desenvolvimento do Cloud Composer. Lá, é possível validar se o DAG funciona como esperado.

Adicionar o DAG ao ambiente de produção

Depois que o DAG funcionar conforme o esperado, adicione-o manualmente ao ambiente de produção. Para isso, faça upload do arquivo DAG para o diretório dags/ no bucket do ambiente de produção do Cloud Composer.

Se o job de sincronização do DAG tiver falhado ou se o DAG não estiver se comportando como esperado no ambiente de desenvolvimento do Cloud Composer, consulte Como resolver falhas de build.

Como resolver falhas de build

Esta seção explica como resolver cenários comuns de falhas de build.

E se a verificação de pré-envio falhar?

Na sua solicitação de envio, clique em Detalhes e escolha Confira mais detalhes no Google Cloud Build para acessar os registros de build na console do Google Cloud. Use esses registros para depurar o problema da DAG. Depois de resolver os problemas, confirme a correção e envie para a ramificação. A verificação de pré-envio é executada novamente, e é possível continuar iterando usando os registros como uma ferramenta de depuração.

E se o job de sincronização do DAG falhar?

Use o console do Google Cloud para ver os resultados da versão. Se você tiver muitos gatilhos do Cloud Build, poderá filtrar seus builds pelo nome do gatilho add-dags-to-composer. Examine os registros do job de build e resolva os erros. Se precisar de mais ajuda para resolver os erros, utilize canais de suporte.

E se o DAG não funcionar corretamente no meu ambiente do Cloud Composer?

Se o DAG não funcionar como esperado no seu ambiente de desenvolvimento ambiente do Cloud Composer, não promova manualmente o DAG para de produção do Cloud Composer. Em vez disso, escolha uma das opções a seguir:

  • Reverta a solicitação de pull com as mudanças que quebraram seu DAG para restaurar o estado imediatamente anterior às mudanças. Isso também reverte todos os outros arquivos nessa solicitação de pull.
  • Crie uma nova solicitação de envio para reverter manualmente as mudanças no DAG corrompido.
  • Crie uma nova solicitação de pull para corrigir os erros no DAG.

Seguir qualquer uma dessas etapas aciona uma nova verificação de pré-envio e, após a mesclagem, o job de sincronização do DAG.

A seguir