バージョン管理を使用して DAG をテスト、同期、デプロイする

このガイドでは、Cloud Composer 環境の DAG をバージョン管理システムと同期するための CI / CD フローを作成する方法について説明します。このフローの手順は次のとおりです。

  1. DAG に変更を加えて、その変更をリポジトリ内の開発ブランチに push する
  2. リポジトリのメインブランチに対して pull リクエストを開く
  3. Cloud Build で単体テストが行われ、DAG が有効であることが確認される
  4. pull リクエストが承認され、メインブランチに統合される
  5. Cloud Build で新しい Cloud Composer 開発環境とこれらの新しい変更が同期される
  6. DAG が開発環境で想定どおりに動作していることを確認する
  7. DAG が想定どおりに機能する場合は、DAG を本番環境の Cloud Composer 環境に手動で同期する
フロー手順を示す構成図。送信前レビューと PR レビューは GitHub セクションにあり、DAG 同期と手動 DAG 検証は GCP セクションにあります。
図 1. フロー手順を示す構成図(クリックして拡大)

目標

  • Cloud Build を使用して DAG 単体テストの自動送信前チェックを実行します。このチェックでは、DAG の単体テストを実行します。
  • Cloud Composer 開発環境の DAG とバージョン管理システムの DAG を同期する

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

このチュートリアルでは、DAG とそのテストが GitHub リポジトリに保存されていることを前提としています。composer/cicd_sample ディレクトリには、サンプル リポジトリのコンテンツが含まれており、DAG とテストは、最上位レベルに保存された要件ファイル、制約ファイル、Cloud Build 構成ファイルとともに dags フォルダに保存されます。DAG 同期ユーティリティとその要件は、utils フォルダにあります。この構造は、Airflow 1、Airflow 2、Cloud Composer 1、Cloud Composer 2 の環境で使用できます。

また、このチュートリアルでは、2 つの同一の Cloud Composer 環境(開発環境と本番環境)を使用していることを前提としています。

  1. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。

  3. 開発環境と本番環境用の Cloud Composer 環境をまだ作成していない場合は、ここで作成します。このチュートリアルでは、Cloud Composer 2 環境または Cloud Composer 1 環境を作成できます。

環境の準備

このセクションでは、2 つの Cloud Build ジョブを設定します。最初のジョブは、DAG の単体テストを実行する送信前チェックを実行します。2 番目のジョブでは、リポジトリ内のメインブランチに DAG がマージされた後、DAG を Cloud Composer 環境と同期します。

送信前チェックを追加する

単体テストを追加する

まだ実行していない場合は、DAG の単体テストを作成します。これらのテストを _test 接尾辞付きのリポジトリ内の DAG と一緒に保存します。たとえば、example_dag.py の DAG のテストファイルは example_dag_test.py です。これらは、リポジトリで送信前チェックとして実行されるテストです。

Cloud Build の送信前チェックを追加する

このセクションでは、Cloud Build YAML 構成と、送信前チェック用の Cloud Build トリガーを作成します。

Cloud Build YAML 構成を作成する

YAML ファイルを作成して、test-dags.cloudbuild.yaml という名前の Cloud Build ジョブを構成します。次の 3 つのステップを行います。

  1. DAG に必要な依存関係をインストールする
  2. 単体テストに必要な依存関係をインストールする
  3. DAG テストを実行する

    
    steps:
      # install dependencies
      - name: python:3.8-slim
        entrypoint: pip
        args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]
    
      - name: python:3.8-slim
        entrypoint: pip
        args: ["install", "-r", "requirements-test.txt", "--user"]
    
      # run in python 3.8 which is latest version in Cloud Composer
      - name: python:3.8-slim
        entrypoint: python3.8
        args: ["-m", "pytest", "-s", "dags/"]
    
Cloud Build トリガーを作成する

GitHub からリポジトリを構築するためのガイドに沿って、次の構成で GitHub アプリベースのトリガーを作成します。

  • 名前: test-dags
  • イベント: pull リクエスト
  • ソース - リポジトリ: 自分のリポジトリを選択
  • ソース - ベースブランチ: ^main$(main は必要に応じてリポジトリのベースブランチの名前に変更します)
  • ソース - コメント制御: 不要
  • ビルド構成 - Cloud ビルド構成ファイル: /test-dags.cloudbuild.yaml(ビルドファイルへのパス)

DAG 同期ジョブを追加する

次に、ユーティリティ スクリプトを実行する Cloud Build ジョブを構成します。このジョブのユーティリティ スクリプトは、リポジトリ内のメインブランチにマージされた後、Cloud Composer 環境と DAG を同期します。

DAG ユーティリティ スクリプトを追加する

このユーティリティ スクリプトは、すべての非 DAG Paython ファイルを無視して、リポジトリの dags/ ディレクトリにあるすべての DAG ファイルを一時ディレクトリにコピーします。次に、スクリプトは、Cloud Storage クライアント ライブラリを使用して、この一時ディレクトリにあるすべてのファイルを Cloud Composer 環境の dags/ フォルダにアップロードします。

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile
from typing import List, Tuple

# Imports the Google Cloud client library
from google.cloud import storage

def _create_dags_list(dags_directory: str) -> Tuple[str, List[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)

def upload_dags_to_composer(dags_directory: str, bucket_name: str) -> None:
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            # if/else is a relative directory workaround for our tests
            current_directory = os.listdir()
            if "dags" in current_directory:
                dag = dag.replace(f"{temp_dir}/", "dags/")
            else:
                dag = dag.replace(f"{temp_dir}/", "../dags/")

            # Upload to your bucket
            blob = bucket.blob(dag)
            blob.upload_from_filename(dag)
            print(f"File {dag} uploaded to {bucket_name}/{dag}.")

    else:
        print("No DAGs to upload.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

DAG を同期する Cloud Build ジョブを追加する

このセクションでは、DAG 同期ジョブの Cloud Build YAML 構成と Cloud Build トリガーを作成します。

Cloud Build YAML 構成を作成する

YAML ファイルを作成して、add-dags-to-composer.cloudbuild.yaml という名前の Cloud Build ジョブを構成します。次の 2 つのステップを行います。

  1. DAG ユーティリティ スクリプトに必要な依存関係をインストールする
  2. ユーティリティ スクリプトを実行して、リポジトリ内の DAG を Cloud Composer 環境と同期する

    steps:
      # install dependencies
      - name: python
        entrypoint: pip
        args: ["install", "-r", "utils/requirements.txt", "--user"]
    
      # run
      - name: python
        entrypoint: python
        args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]
    
Cloud Build トリガーを作成する

こちらのガイドに沿って、以下の設定の GitHub アプリベースのトリガーを作成します。

  • 名前: add-dags-to-composer
  • イベント: ブランチに push する
  • ソース - リポジトリ: 自分のリポジトリを選択
  • ソース - ベースブランチ: ^main$(main は必要に応じてリポジトリのベースブランチの名前に変更します)
  • ソース - 含まれるファイル フィルタ(glob): dags/**
  • ビルド構成 - Cloud ビルド構成ファイル: /add-dags-to-composer.cloudbuild.yaml(ビルドファイルへのパス)

詳細設定で 2 つの置換変数を追加します。

  • _DAGS_DIRECTORY - リポジトリに DAG が配置されているディレクトリ。このチュートリアルの手順に沿った場合は dags/ です。
  • _DAGS_BUCKET - gs:// 接頭辞が付いていない開発環境の Cloud Composer の dags/ フォルダを含む Cloud Storage バケット。例: us-east1-my-env-1234ab56-bucket

すべてを組み合わせる

このセクションでは、新しく作成された Cloud Build トリガーを利用する DAG 開発フローに従います。

送信前ジョブを実行する

メインブランチに対する pull リクエストを作成し、ビルドをテストします。チェックはページの下部にあります。[詳細] をクリックして [Google Cloud Build の詳細を表示する] を選択すると、Google Cloud Console にビルドログが表示されます。

かっこで囲まれたプロジェクト名を指す赤い矢印の付いた test-dags と呼ばれる github チェックのスクリーンショット
図 2. GitHub での Cloud Build に関する送信前チェック ステータスのスクリーンショット(クリックして拡大)

送信前チェックが失敗した場合は、ビルドの失敗に対処するをご覧ください。

開発環境の Cloud Composer で DAG の成功を検証する

pull リクエストが承認されたら、メインブランチにマージします。Google Cloud Console を使用してビルド結果を表示します。Cloud Build トリガーが多数ある場合は、トリガー名 add-dags-to-composer でビルドをフィルタリングできます。

Cloud Build 同期ジョブが成功すると、開発環境の Cloud Composer に DAG が表示されます。そこで、DAG が想定どおりに機能していることを確認できます。DAG が想定どおりに動作するようになったら、本番環境の Cloud Composer の dags/ フォルダに手動で DAG ファイルを追加して、DAG を本番環境に昇格させます。

DAG 同期ジョブが失敗した場合、または開発環境の Cloud Composer で DAG が想定どおりに動作しない場合は、ビルドの失敗に対処するをご覧ください。

ビルドの失敗に対処する

このセクションでは、一般的なビルドの失敗シナリオに対処する方法について説明します。

送信前チェックが失敗した場合はどうなりますか?

pull リクエストで [詳細] をクリックして [Google Cloud Build の詳細を表示する] を選択すると、Google Cloud Console にビルドログが表示されます。これらのログを使用して、DAG の問題をデバッグしてください。問題を解決したら、修正を commit し、ブランチに push します。送信前チェックが再実行され、デバッグツールとしてログの使用を繰り返すことができます。

DAG 同期ジョブが失敗した場合はどうなりますか?

Google Cloud Console を使用してビルド結果を表示します。Cloud Build トリガーが多数ある場合は、トリガー名 add-dags-to-composer でビルドをフィルタリングできます。ビルドジョブのログを確認し、エラーを解決します。エラーの解決にサポートが必要な場合は、サポート チャネルをご利用ください。

DAG が Cloud Composer 環境で適切に機能しない場合はどうなりますか?

DAG が開発環境の Cloud Composer で想定どおりに機能しない場合は、DAG を本番環境の Cloud Composer に手動で昇格しないでください。代わりに、次のいずれかを行います。

  1. DAG を破損した変更を使用して pull リクエストを元に戻し、変更の直前の状態に復元する(これにより、pull リクエスト内の他のすべてのファイルも元に戻されます)
  2. 新しい pull リクエストを作成して、壊れた DAG の変更を手動で元に戻す
  3. DAG のエラーを修正する新しい pull リクエストを作成する

これらの手順のいずれかを実行すると、送信前チェックが再びトリガーされ、統合時に DAG 同期ジョブがトリガーされます。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

リソースを個別に削除する

Cloud Build リソースをクリーンアップする

Google Cloud Console または Google Cloud CLI を使用して、2 つのビルドトリガーを削除します。

  • test-dags
  • add-dags-to-composer

Cloud Composer リソースをクリーンアップする

Cloud Composer 環境を削除する

このチュートリアル用に作成した Cloud Composer 環境を引き続き使用しない場合は、削除して課金が発生しないようにしてください。環境を保持する場合は、DAG を削除します

DAG を Cloud Composer から削除する

開発環境と本番環境の Cloud Composer から DAG を削除します

次のステップ