GitHub から DAG をテスト、同期、デプロイする

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このガイドでは、CI / CD パイプラインを作成して、DAG のテスト、同期、および GitHub リポジトリから Cloud Composer 環境へのデプロイを行う方法について説明します。

他のサービスからのデータのみを同期する場合は、他のサービスからデータを転送するをご覧ください。

CI / CD パイプラインの概要

フロー手順を示す構成図。送信前レビューと PR レビューは GitHub セクションにあり、DAG 同期と手動 DAG 検証は Google Cloud セクションにあります。
図 1.フロー手順を示す構成図(クリックして拡大)

DAG をテスト、同期、デプロイする CI / CD パイプラインには、以下の手順があります。

  1. DAG に変更を加えて、その変更をリポジトリ内の開発ブランチに push します。

  2. リポジトリのメインブランチに対して pull リクエストを開きます。

  3. Cloud Build で単体テストが行われ、DAG が有効であることを確認します。

  4. pull リクエストが承認され、リポジトリのメインブランチに統合されます。

  5. Cloud Build で新しい Cloud Composer 開発環境とこれらの新しい変更が同期されます。

  6. DAG が開発環境で想定どおりに動作していることを確認します。

  7. DAG が想定どおりに機能する場合は、DAG を本番環境の Cloud Composer 環境にアップロードします。

目標

始める前に

  • このガイドでは、2 つの同じ Cloud Composer 環境(開発環境と本番環境)を使用していることを前提としています。

    このガイドでは、開発環境のみに CI / CD パイプラインを構成します。使用する環境が本番環境ではないことを確認してください。

  • このガイドでは、DAG とそのテストが GitHub リポジトリに保存されていることを前提としています。

    CI / CD パイプラインの例は、サンプル リポジトリの内容を示します。DAG とテストが dags/ ディレクトリに保存され、要件ファイル、制約ファイル、Cloud Build 構成ファイルが最上位に保存されています。DAG 同期ユーティリティとその要件は utils ディレクトリにあります。

    この構造は、Airflow 1、Airflow 2、Cloud Composer 1、Cloud Composer 2 の環境で使用できます。

送信前チェックジョブと単体テストを作成する

最初の Cloud Build ジョブは、DAG の単体テストを実行する送信前チェックを実行します。

単体テストを追加する

まだ行っていない場合は、DAG の単体テストを作成します。これらのテストは、DAG と一緒にリポジトリに _test 接尾辞を付けて保存します。たとえば、example_dag.py の DAG のテストファイルは example_dag_test.py です。これらは、リポジトリで送信前チェックとして実行されるテストです。

送信前チェック用の Cloud Build YAML 構成を作成する

リポジトリに、送信前チェック用の Cloud Build ジョブを構成する test-dags.cloudbuild.yaml という名前の YAML ファイルを作成します。このファイルには、以下の 3 つのステップがあります。

  1. DAG に必要な依存関係をインストールします。
  2. 単体テストに必要な依存関係をインストールします。
  3. DAG テストを実行します。

steps:
  # install dependencies
  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements.txt", "-c", "constraints.txt", "--user"]

  - name: python:3.8-slim
    entrypoint: pip
    args: ["install", "-r", "requirements-test.txt", "--user"]

  # run in python 3.8 which is latest version in Cloud Composer
  - name: python:3.8-slim
    entrypoint: python3.8
    args: ["-m", "pytest", "-s", "dags/"]

送信前チェック用の Cloud Build トリガーを作成する

GitHub からのリポジトリのビルドガイドに沿って、次の構成で GitHub アプリベースのトリガーを作成します。

  • 名前: test-dags

  • イベント: pull リクエスト

  • ソース - リポジトリ: 自分のリポジトリを選択

  • ソース - ベースブランチ: ^main$main は必要に応じてリポジトリのベースブランチの名前に変更します)

  • ソース - コメント制御: 不要

  • ビルド構成 - Cloud ビルド構成ファイル: /test-dags.cloudbuild.yaml(ビルドファイルのパス)

DAG 同期ジョブを作成し、DAG ユーティリティ スクリプトを追加する

次に、DAG ユーティリティ スクリプトを実行する Cloud Build ジョブを構成します。このジョブのユーティリティ スクリプトは、DAG がリポジトリのメインブランチにマージされた後、DAG を Cloud Composer 環境と同期します。

DAG ユーティリティ スクリプトを追加する

DAG ユーティリティ スクリプトをリポジトリに追加します。このユーティリティ スクリプトは、すべての非 DAG Paython ファイルを無視して、リポジトリの dags/ ディレクトリにあるすべての DAG ファイルを一時ディレクトリにコピーします。次に、スクリプトは、Cloud Storage クライアント ライブラリを使用して、この一時ディレクトリにあるすべてのファイルを Cloud Composer 環境のバケット内の dags/ ディレクトリにアップロードします。

from __future__ import annotations

import argparse
import glob
import os
from shutil import copytree, ignore_patterns
import tempfile

# Imports the Google Cloud client library
from google.cloud import storage


def _create_dags_list(dags_directory: str) -> tuple[str, list[str]]:
    temp_dir = tempfile.mkdtemp()

    # ignore non-DAG Python files
    files_to_ignore = ignore_patterns("__init__.py", "*_test.py")

    # Copy everything but the ignored files to a temp directory
    copytree(dags_directory, f"{temp_dir}/", ignore=files_to_ignore, dirs_exist_ok=True)

    # The only Python files left in our temp directory are DAG files
    # so we can exclude all non Python files
    dags = glob.glob(f"{temp_dir}/*.py")
    return (temp_dir, dags)


def upload_dags_to_composer(
    dags_directory: str, bucket_name: str, name_replacement: str = "dags/"
) -> None:
    """
    Given a directory, this function moves all DAG files from that directory
    to a temporary directory, then uploads all contents of the temporary directory
    to a given cloud storage bucket
    Args:
        dags_directory (str): a fully qualified path to a directory that contains a "dags/" subdirectory
        bucket_name (str): the GCS bucket of the Cloud Composer environment to upload DAGs to
        name_replacement (str, optional): the name of the "dags/" subdirectory that will be used when constructing the temporary directory path name Defaults to "dags/".
    """
    temp_dir, dags = _create_dags_list(dags_directory)

    if len(dags) > 0:
        # Note - the GCS client library does not currently support batch requests on uploads
        # if you have a large number of files, consider using
        # the Python subprocess module to run gsutil -m cp -r on your dags
        # See https://cloud.google.com/storage/docs/gsutil/commands/cp for more info
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)

        for dag in dags:
            # Remove path to temp dir
            dag = dag.replace(f"{temp_dir}/", name_replacement)

            try:
                # Upload to your bucket
                blob = bucket.blob(dag)
                blob.upload_from_filename(dag)
                print(f"File {dag} uploaded to {bucket_name}/{dag}.")
            except FileNotFoundError:
                current_directory = os.listdir()
                print(
                    f"{name_replacement} directory not found in {current_directory}, you may need to override the default value of name_replacement to point to a relative directory"
                )
                raise

    else:
        print("No DAGs to upload.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter
    )
    parser.add_argument(
        "--dags_directory",
        help="Relative path to the source directory containing your DAGs",
    )
    parser.add_argument(
        "--dags_bucket",
        help="Name of the DAGs bucket of your Composer environment without the gs:// prefix",
    )

    args = parser.parse_args()

    upload_dags_to_composer(args.dags_directory, args.dags_bucket)

DAG を同期するための Cloud Build YAML 構成を作成する

リポジトリに、DAG の同期用の Cloud Build ジョブを構成する add-dags-to-composer.cloudbuild.yaml という名前の YAML ファイルを作成します。このファイルについては、以下の 2 つのステップがあります。

  1. DAG ユーティリティ スクリプトに必要な依存関係をインストールします。

  2. ユーティリティ スクリプトを実行して、リポジトリ内の DAG を Cloud Composer 環境と同期します。

steps:
  # install dependencies
  - name: python
    entrypoint: pip
    args: ["install", "-r", "utils/requirements.txt", "--user"]

  # run
  - name: python
    entrypoint: python
    args: ["utils/add_dags_to_composer.py", "--dags_directory=${_DAGS_DIRECTORY}", "--dags_bucket=${_DAGS_BUCKET}"]

Cloud Build トリガーを作成する

GitHub からのリポジトリのビルドガイドに沿って、次の構成で GitHub アプリベースのトリガーを作成します。

  • 名前: add-dags-to-composer

  • イベント: ブランチに push する

  • ソース - リポジトリ: 自分のリポジトリを選択

  • ソース - ベースブランチ: ^main$main は必要に応じてリポジトリのベースブランチの名前に変更します)

  • ソース - 含まれるファイル フィルタ(glob): dags/**

  • ビルド構成 - Cloud ビルド構成ファイル: /add-dags-to-composer.cloudbuild.yaml(ビルドファイルのパス)

詳細設定で 2 つの置換変数を追加します。

  • _DAGS_DIRECTORY - リポジトリに DAG が配置されているディレクトリ。 このガイドのサンプル リポジトリを使用している場合は、dags/ です。

  • _DAGS_BUCKET - 開発環境の Cloud Composer の dags/ ディレクトリを含む Cloud Storage バケット。gs:// 接頭辞は省略します。例: us-central1-example-env-1234ab56-bucket

CI / CD パイプラインをテストする

このセクションでは、新しく作成した Cloud Build トリガーを使用する DAG 開発フローに沿って作業します。

送信前ジョブを実行する

メインブランチへの pull リクエストを作成して、ビルドをテストします。ページで送信前チェックを見つけます。[詳細] をクリックして [View more details on Google Cloud Build] を選択し、Google Cloud コンソールでビルドログを表示します。

かっこ内にプロジェクト名を指す赤い矢印がある、test-dags という GitHub チェックのスクリーンショット
図 2. GitHub での Cloud Build に関する送信前チェック ステータスのスクリーンショット(クリックして拡大)

送信前チェックが失敗した場合は、ビルドの失敗に対処するをご覧ください。

DAG が開発環境の Cloud Composer で機能することを検証する

pull リクエストが承認されたら、メインブランチに統合します。Google Cloud コンソールを使用してビルド結果を表示します。Cloud Build トリガーが多数ある場合は、トリガー名 add-dags-to-composer でビルドをフィルタできます。

Cloud Build 同期ジョブが成功すると、同期された DAG が開発環境の Cloud Composer に表示されます。そこで、DAG が想定どおりに機能していることを確認できます。

DAG を本番環境にデプロイします。

DAG が想定どおりに動作したら、本番環境に手動で追加します。これを行うには、Cloud Composer の本番環境のバケットの dags/ ディレクトリに DAG ファイルをアップロードします。

DAG 同期ジョブが失敗した場合や、開発環境の Cloud Composer で DAG が想定どおりに動作しない場合は、ビルドの失敗に対処するをご覧ください。

ビルドの失敗に対処する

このセクションでは、一般的なビルドの失敗シナリオに対処する方法について説明します。

送信前チェックが失敗した場合はどうなりますか?

pull リクエストで [詳細] をクリックし、[View more details on Google Cloud Build] を選択すると、Google Cloud コンソールにビルドログが表示されます。これらのログを使用して、DAG の問題をデバッグします。問題を解決したら、修正を commit してブランチに push します。送信前チェックが再度実行され、ログをデバッグツールとして使用してイテレーションを続けることができます。

DAG 同期ジョブが失敗した場合はどうなりますか?

Google Cloud コンソールを使用してビルド結果を表示します。Cloud Build トリガーが多数ある場合は、トリガー名 add-dags-to-composer でビルドをフィルタできます。ビルドジョブのログを調べて、エラーを解決します。エラーの解決にサポートが必要な場合は、サポート チャネルをご利用ください。

DAG が Cloud Composer 環境で適切に機能しない場合はどうなりますか?

DAG が開発環境の Cloud Composer で想定どおりに機能しない場合は、DAG を本番環境の Cloud Composer に手動で昇格しないでください。代わりに、次のいずれかを行います。

  • DAG を破損した変更を使用して pull リクエストを元に戻し、変更の直前の状態に復元します(これにより、pull リクエスト内の他のすべてのファイルも元に戻されます)。
  • 新しい pull リクエストを作成して、破損した DAG への変更を手動で元に戻します。
  • 新しい pull リクエストを作成して、DAG 内のエラーを修正します。

これらの手順のいずれかを実行すると、新しい送信前チェックがトリガーされ、統合時に DAG 同期ジョブがトリガーされます。

次のステップ