GitHub에서 DAG 테스트, 동기화, 배포

Cloud Composer 1 | Cloud Composer 2

이 가이드에서는 CI/CD 파이프라인을 만들어 GitHub 저장소에서 DAG를 테스트, 동기화, Cloud Composer 환경에 배포하는 방법을 설명합니다.

다른 서비스의 데이터만 동기화하려면 다른 서비스에서 데이터 전송을 참조하세요.

CI/CD 파이프라인 개요

흐름 단계를 보여주는 아키텍처 다이어그램. 사전 제출 및 PR 검토는 GitHub 섹션에 있고 DAG 동기화 및 수동 DAG 확인은 Google Cloud 섹션에 있습니다.
그림 1. 흐름의 단계를 보여주는 아키텍처 다이어그램 (확대하려면 클릭)

DAG를 테스트, 동기화, 배포하는 CI/CD 파이프라인에는 다음 단계가 포함됩니다.

  1. DAG를 변경하고 해당 변경사항을 저장소의 개발 브랜치로 푸시합니다.

  2. 저장소의 기본 브랜치에 대한 pull 요청을 엽니다.

  3. Cloud Build가 단위 테스트를 실행하여 DAG가 유효한지 검사합니다.

  4. pull 요청이 승인되고 저장소의 기본 브랜치에 병합됩니다.

  5. Cloud Build가 개발 Cloud Composer 환경을 이러한 새로운 변경사항과 동기화합니다.

  6. DAG가 개발 환경에서 예상대로 작동하는지 확인합니다.

  7. DAG가 예상대로 작동하는 경우 DAG를 프로덕션 Cloud Composer 환경에 업로드합니다.

목표

시작하기 전에

  • 이 가이드에서는 개발 환경과 프로덕션 환경이라는 동일한 두 가지 Cloud Composer 환경을 사용한다고 가정합니다.

    이 가이드에서는 개발 환경에만 CI/CD 파이프라인을 구성합니다. 사용하는 환경이 프로덕션 환경이 아닌지 확인합니다.

  • 이 가이드에서는 DAG 및 해당 테스트가 GitHub 저장소에 저장되어 있다고 가정합니다.

    예시 CI/CD 파이프라인은 예시 저장소의 콘텐츠를 보여줍니다. DAG 및 테스트는 요구사항 파일, 제약조건 파일, Cloud Build 구성 파일과 함께 dags/ 디렉터리 최상위에 저장됩니다. DAG 동기화 유틸리티 및 요구사항은 utils 디렉터리에 있습니다.

    이 구조는 Airflow 1, Airflow 2, Cloud Composer 1, Cloud Composer 2 환경에 사용할 수 있습니다.

사전 제출 검사 작업 및 단위 테스트 만들기

첫 번째 Cloud Build 작업은 DAG의 단위 테스트를 실행하는 사전 제출 검사를 실행합니다.

단위 테스트 추가

아직 작성하지 않은 경우 DAG의 단위 테스트를 작성합니다. 이러한 테스트를 각각 _test 서픽스를 붙여서 DAG와 함께 저장소에 저장합니다. 예를 들어 example_dag.py의 DAG에 대한 테스트 파일은 example_dag_test.py입니다. 이 테스트가 저장소에서 사전 제출 검사로 실행됩니다.

사전 제출 검사에 대한 Cloud Build YAML 구성 만들기

저장소에서 사전 제출 검사에 사용되는 Cloud Build 작업을 구성하는 test-dags.cloudbuild.yaml이라는 YAML 파일을 만듭니다. 이 절차는 3단계로 구성됩니다.

  1. DAG에 필요한 종속 항목을 설치합니다.
  2. 단위 테스트에 필요한 종속 항목을 설치합니다.
  3. DAG 테스트 실행

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

사전 제출 검사에 대한 Cloud Build 트리거 만들기

GitHub에서 저장소 빌드 가이드에 따라 다음 구성으로 GitHub 앱 기반 트리거를 만듭니다.

  • 이름: test-dags

  • 이벤트: Pull 요청

  • 소스 - 저장소: 저장소 선택

  • 소스 - 기본 브랜치: ^main$(필요한 경우 main을 저장소의 기본 브랜치 이름으로 변경)

  • 소스 - 주석 제어: 필수 아님

  • 빌드 구성 - Cloud 빌드 구성 파일: /test-dags.cloudbuild.yaml(빌드 파일의 경로)

DAG 동기화 작업 만들기 및 DAG 유틸리티 스크립트 추가

다음으로 DAG 유틸리티 스크립트를 실행하는 Cloud Build 작업을 구성합니다. 이 작업의 유틸리티 스크립트는 DAG가 저장소의 기본 브랜치에 병합된 후 DAG와 Cloud Composer 환경을 동기화합니다.

DAG 유틸리티 스크립트 추가

DAG 유틸리티 스크립트를 저장소에 추가합니다. 이 유틸리티 스크립트는 DAG가 아닌 Python 파일은 무시하면서 저장소의 dags/ 디렉터리에 있는 모든 DAG 파일을 임시 디렉터리에 복사합니다. 그런 다음 스크립트는 Cloud Storage 클라이언트 라이브러리를 사용하여 해당 임시 디렉터리의 모든 파일을 Cloud Composer 환경 버킷의 dags/ 디렉터리에 업로드합니다.

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage

def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)

def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

DAG 동기화를 위한 Cloud Build YAML 구성 만들기

저장소에서 DAG 동기화를 위한 Cloud Build 작업을 구성하는 add-dags-to-composer.cloudbuild.yaml이라는 YAML 파일을 만듭니다. 이는 다음과 같은 두 단계로 이루어집니다.

  1. DAG 유틸리티 스크립트에 필요한 종속 항목 설치하기

  2. 유틸리티 스크립트를 실행하여 저장소의 DAG를 Cloud Composer 환경과 동기화하기

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Cloud Build 트리거 만들기

GitHub에서 저장소 빌드 가이드에 따라 다음 구성으로 GitHub 앱 기반 트리거를 만듭니다.

  • 이름: add-dags-to-composer

  • 이벤트: 브랜치로 푸시

  • 소스 - 저장소: 저장소 선택

  • 소스 - 기본 브랜치: ^main$(필요한 경우 main을 저장소의 기본 브랜치 이름으로 변경)

  • 소스 - 포함된 파일 필터(glob): dags/**

  • 빌드 구성 - Cloud 빌드 구성 파일: /add-dags-to-composer.cloudbuild.yaml(빌드 파일의 경로)

고급 구성에서 대체 변수 2개를 추가합니다.

  • _DAGS_DIRECTORY - DAG가 저장소에서 위치하는 디렉터리. 이 가이드의 예시 저장소를 사용하는 경우 dags/입니다.

  • _DAGS_BUCKET - 개발 Cloud Composer 환경에서 dags/ 디렉터리를 포함하는 Cloud Storage 버킷입니다. gs:// 프리픽스를 생략합니다. 예를 들면 다음과 같습니다. us-central1-example-env-1234ab56-bucket.

CI/CD 파이프라인 테스트

이 섹션에서는 새로 생성된 Cloud Build 트리거를 활용하는 DAG 개발 흐름을 따릅니다.

사전 제출 작업 실행

기본 브랜치에 대한 pull 요청을 만들어 빌드를 테스트하세요. 페이지에서 사전 제출 검사를 찾습니다. Google Cloud 콘솔에서 빌드 로그를 보려면 세부정보를 클릭하고 Google Cloud Build에 대한 자세한 내용 보기를 선택하세요.

빨간색 화살표가 괄호 안의 프로젝트 이름을 가리키는 테스트 DAG라는 GitHub 검사 스크린샷
그림 2. GitHub의 Cloud Build 사전 제출 검사 상태 스크린샷(확대하려면 클릭)

사전 제출 검사가 실패하는 경우 빌드 실패 해결을 참조하세요.

개발 Cloud Composer 환경에서 DAG가 작동하는지 검증

pull 요청이 승인되면 기본 브랜치에 병합합니다. Google Cloud 콘솔을 사용하여 빌드 결과를 확인합니다. Cloud Build 트리거가 많은 경우 트리거 이름 add-dags-to-composer로 빌드를 필터링할 수 있습니다.

Cloud Build 동기화 작업이 성공하면 동기화된 DAG가 개발 Cloud Composer 환경에 표시됩니다. 여기에서 DAG가 예상대로 작동하는지 확인할 수 있습니다.

프로덕션 환경에 DAG 배포

DAG가 예상대로 작동하면 프로덕션 환경에 수동으로 추가합니다. 이렇게 하려면 프로덕션 Cloud Composer 환경의 버킷의 dags/ 디렉터리에 DAG 파일을 업로드합니다.

DAG 동기화 작업이 실패했거나 개발 Cloud Composer 환경에서 DAG가 제대로 작동하지 않는 경우 빌드 실패 해결을 참조하세요.

빌드 실패 해결

이 섹션에서는 일반적인 빌드 실패 시나리오를 해결하는 방법을 설명합니다.

사전 제출 검사에 실패하면 어떻게 되나요?

pull 요청에서 세부정보를 클릭하고 Google Cloud Build에 대한 추가 세부정보 보기를 선택하여 Google Cloud 콘솔에서 빌드 로그를 확인합니다. 이러한 로그를 사용하여 DAG의 문제를 디버깅할 수 있습니다. 문제가 해결되었으면 수정사항을 커밋하고 브랜치로 푸시합니다. 사전 제출 검사가 다시 실행되고 로그를 디버깅 도구로 사용하여 작업을 계속 반복할 수 있습니다.

DAG 동기화 작업이 실패하면 어떻게 되나요?

Google Cloud 콘솔을 사용하여 빌드 결과를 확인합니다. Cloud Build 트리거가 많은 경우 트리거 이름 add-dags-to-composer로 빌드를 필터링할 수 있습니다. 빌드 작업의 로그를 검토하고 오류를 해결합니다. 오류를 해결하는 데 도움이 더 필요한 경우 지원 채널을 활용하세요.

DAG가 Cloud Composer 환경에서 제대로 작동하지 않는 경우에는 어떻게 하나요?

DAG가 개발 Cloud Composer 환경에서 예상대로 작동하지 않는 경우 수동으로 DAG를 프로덕션 Cloud Composer 환경으로 승격하지 마세요. 그 대신, 다음 방법 중 하나를 따르세요.

  • DAG를 중단시킨 변경사항으로 pull 요청을 되돌리기하여 변경 직전의 상태로 복원합니다(이렇게 하면 해당 pull 요청 안의 다른 모든 파일도 되돌리기됩니다).
  • 새로운 pull 요청을 만들어 손상된 DAG에 대한 변경사항을 수동으로 되돌립니다.
  • 새로운 pull 요청을 만들어 DAG의 오류를 수정합니다.

이 단계를 수행하면 새 사전 제출 검사가 트리거되고 병합 시 DAG 동기화 작업이 트리거됩니다.

다음 단계