Teste, sincronize e implante os DAGs pelo GitHub

Cloud Composer 1 | Cloud Composer 2

Neste guia, explicamos como criar um pipeline de CI/CD para testar, sincronizar e implantar DAGs no ambiente do Cloud Composer usando seu repositório do GitHub.

Visão geral do pipeline de CI/CD

Diagrama da arquitetura mostrando as etapas do fluxo. O pré-envio e a revisão de RP estão na seção do GitHub, e a sincronização e a verificação manual do DAG estão na seção do Google Cloud.
Figura 1. Diagrama da arquitetura mostrando as etapas do fluxo (clique para ampliar)

O pipeline de CI/CD para testar, sincronizar e implantar DAGs tem as seguintes etapas:

  1. Você faz uma alteração em um DAG e a envia para uma ramificação de desenvolvimento no seu repositório.

  2. Você vai abrir uma solicitação de envio na ramificação principal do repositório.

  3. O Cloud Build executa testes de unidade para verificar se o DAG é válido.

  4. A solicitação de envio foi aprovada e mesclada com a ramificação principal do repositório.

  5. O Cloud Build sincroniza seu ambiente de desenvolvimento do Cloud Composer com essas novas alterações.

  6. Você vai verificar se o DAG se comporta conforme o esperado no seu ambiente de desenvolvimento.

  7. Se o DAG funcionar conforme o esperado, faça upload dele para o ambiente de produção do Cloud Composer.

Objetivos

Antes de começar

  • Neste guia, presumimos que você esteja trabalhando com dois ambientes idênticos do Cloud Composer: um de desenvolvimento e outro de produção.

    Neste guia, você está configurando um pipeline de CI/CD apenas para seu ambiente de desenvolvimento. Verifique se o ambiente usado não é de produção.

  • Este guia pressupõe que seus DAGs e os testes deles estejam armazenados em um repositório do GitHub.

    O exemplo de pipeline de CI/CD demonstra o conteúdo de um repositório de exemplo. Os DAGs e testes são armazenados no diretório dags/, com arquivos de requisitos, o arquivo de restrições e os arquivos de configuração do Cloud Build armazenados no nível superior. O utilitário de sincronização de DAG e os respectivos requisitos estão localizados no diretório utils.

    Essa estrutura pode ser usada nos ambientes Airflow 1, Airflow 2, Cloud Composer 1 e Cloud Composer 2.

Criar um job de verificação de pré-envio e testes de unidade

O primeiro job do Cloud Build executa uma verificação de pré-envio, que executa testes de unidade para os DAGs.

Adicionar testes de unidade

Se ainda não o fez, crie testes de unidade para os DAGs. Salve esses testes com os DAGs no seu repositório, cada um com o sufixo _test. Por exemplo, o arquivo de teste para o DAG em example_dag.py é example_dag_test.py. Esses são os testes executados como verificação de pré-envio no seu repositório.

Criar a configuração do YAML do Cloud Build para a verificação de pré-envio

No seu repositório, crie um arquivo YAML chamado test-dags.cloudbuild.yaml que configure o job do Cloud Build para verificações de pré-envio. Nele, há três etapas:

  1. Instale as dependências necessárias para os DAGs.
  2. Instale as dependências necessárias para os testes de unidade.
  3. Execute os testes de DAG.

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

Criar o gatilho do Cloud Build para a verificação de pré-envio

Siga o guia Como criar repositórios no GitHub para criar um gatilho baseado em um app do GitHub com as seguintes configurações:

  • Name: test-dags

  • Evento: solicitação de envio

  • Origem - Repositório: escolha o repositório.

  • Origem: ramificação de base: ^main$. Mude main para o nome da ramificação de base do repositório, se necessário.

  • Origem: controle de comentários: não obrigatório.

  • Configuração do build: arquivo de configuração do Cloud build: /test-dags.cloudbuild.yaml (o caminho para o arquivo do build).

Criar um job de sincronização de DAGs e adicionar um script utilitário de DAGs

Em seguida, configure um job do Cloud Build que execute um script utilitário de DAGs. O script de utilitário neste job sincroniza os DAGs com o ambiente do Cloud Composer após eles serem mesclados à ramificação principal no repositório.

Adicionar o script utilitário de DAGs

Adicione o script utilitário do DAG ao seu repositório. Esse script utilitário copia todos os arquivos DAG no diretório dags/ do seu repositório para um diretório temporário, ignorando todos os arquivos Python não DAG. Em seguida, o script usa a biblioteca de cliente do Cloud Storage para fazer upload de todos os arquivos desse diretório temporário para o diretório dags/ no bucket do ambiente do Cloud Composer.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage

def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)

def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

Criar configuração YAML do Cloud Build para sincronizar DAGs

No seu repositório, crie um arquivo YAML chamado add-dags-to-composer.cloudbuild.yaml que configure o job do Cloud Build para sincronizar DAGs. Nele, há duas etapas:

  1. Instale as dependências necessárias para o script utilitário de DAGs.

  2. Execute o script utilitário para sincronizar os DAGs no seu repositório com o ambiente do Cloud Composer.

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Crie o gatilho do Cloud Build

Siga o guia Como criar repositórios no GitHub para criar um gatilho baseado em um app do GitHub com as seguintes configurações:

  • Name: add-dags-to-composer

  • Evento: enviar por push para uma ramificação

  • Origem - Repositório: escolha o repositório.

  • Origem: ramificação de base: ^main$. Mude main para o nome da ramificação de base do repositório, se necessário.

  • Origem - Filtro de arquivos incluídos (glob): dags/**

  • Configuração do build: arquivo de configuração do Cloud build: /add-dags-to-composer.cloudbuild.yaml (o caminho para o arquivo do build).

Na Configuração avançada, adicione duas variáveis de substituição:

  • _DAGS_DIRECTORY: o diretório em que os DAGs estão localizados no seu repositório. Se você estiver usando o repositório de exemplo deste guia, ele é dags/.

  • _DAGS_BUCKET: o bucket do Cloud Storage que contém o diretório dags/ no ambiente de desenvolvimento do Cloud Composer. Omita o prefixo gs://. Por exemplo: us-central1-example-env-1234ab56-bucket.

Testar o pipeline de CI/CD

Nesta seção, siga um fluxo de desenvolvimento de DAG que usa seus gatilhos do Cloud Build recém-criados.

Executar um job de pré-envio

Crie uma solicitação de envio para a ramificação principal e teste o build. Localize a verificação de pré-envio na página. Clique em Detalhes e escolha Ver mais detalhes no Google Cloud Build para ver os registros da versão no console do Google Cloud.

Captura de tela de uma verificação do GitHub chamada test-dags com uma seta vermelha apontando para o nome do projeto entre parênteses
Figura 2. Captura de tela do status da verificação de pré-envio do Cloud Build no GitHub (clique para ampliar)

Se a verificação de pré-envio falhou, consulte Como resolver falhas de build.

Verificar se o DAG funciona no ambiente de desenvolvimento do Cloud Composer

Depois que a solicitação de envio for aprovada, ela deve ser mesclada com a ramificação principal. Use o console do Google Cloud para ver os resultados da versão. Se você tiver muitos gatilhos do Cloud Build, será possível filtrar as versões pelo nome de gatilho add-dags-to-composer.

Depois que o job de sincronização do Cloud Build for bem-sucedido, o DAG sincronizado aparecerá no ambiente de desenvolvimento do Cloud Composer. Lá, é possível validar se o DAG funciona conforme esperado.

Adicionar o DAG ao ambiente de produção

Depois que o DAG for executado conforme esperado, adicione-o manualmente ao ambiente de produção. Para isso, faça upload do arquivo DAG para o diretório dags/ no bucket do ambiente de produção do Cloud Composer.

Se o job de sincronização do DAG falhou ou se o DAG não está se comportando conforme o esperado no ambiente de desenvolvimento do Cloud Composer, consulte Como resolver falhas de build.

Como corrigir falhas de build

Esta seção explica como resolver cenários de falha de build comuns.

E se minha verificação de pré-envio falhar?

Na solicitação de envio, clique em Detalhes e escolha Ver mais detalhes no Google Cloud Build para conferir os registros da versão no Console do Google Cloud. Use esses registros para depurar o problema com o DAG. Depois de resolver os problemas, confirme a correção e envie para a ramificação. A verificação de pré-envio é executada novamente e você pode continuar a iterar usando os registros como uma ferramenta de depuração.

E se meu job de sincronização do DAG falhar?

Use o console do Google Cloud para ver os resultados do build. Se você tiver muitos gatilhos do Cloud Build, será possível filtrar as versões pelo nome de gatilho add-dags-to-composer. Examine os registros do job de build e resolva os erros. Se precisar de mais ajuda para resolver os erros, use os canais de suporte.

E se o DAG não funcionar corretamente no ambiente do Cloud Composer?

Se o DAG não funcionar como esperado no ambiente de desenvolvimento do Cloud Composer, não o promova manualmente no ambiente de produção do Cloud Composer. Em vez disso, escolha uma das opções a seguir:

  • Reverta a solicitação de envio com as alterações que interromperam seu DAG para restaurá-lo ao estado imediatamente antes das alterações. Isso também reverterá todos os outros arquivos nessa solicitação de envio.
  • Crie uma nova solicitação de envio para reverter manualmente as alterações no DAG corrompido.
  • Crie uma nova solicitação de envio para corrigir os erros no seu DAG.

Seguir qualquer uma dessas etapas aciona uma nova verificação de pré-envio e, após a mesclagem, o job de sincronização do DAG.

A seguir