Cloud Composer 2 で Apache Airflow DAG を実行する

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Composer 環境を作成し、Cloud Composer 2 で Apache Airflow DAG を実行する方法について説明します。

Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用方法の詳細を示したこちらのチュートリアルをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Cloud Composer API を有効にします。

    API を有効にする

環境の作成

コンソール

  1. Google Cloud Console で、[環境の作成] ページに移動します。

    [環境の作成] に移動

  2. プロジェクトで環境を作成するときに、Cloud Composer サービス エージェントに環境のサービス アカウントに対する必要な権限がない場合は、Cloud Composer サービス アカウントに必要な権限を付与するセクションが表示されます。

    環境のサービス アカウントの新しいプリンシパルとして Cloud Composer サービス エージェント アカウントを追加し、Cloud Composer v2 API サービス エージェント拡張機能の(roles/composer.ServiceAgentV2Ext)ロールを付与します。

    環境で使用するサービス アカウントを使用していることを確認し、[付与] をクリックします。

  3. [名前] フィールドに「example-environment」と入力します。

  4. [ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。

  5. その他の環境構成オプションには、指定されたデフォルト値を使用します。

  6. 環境を作成するには、[作成] をクリックします。

  7. 環境が作成されるまで待ちます。処理が完了すると、緑色のチェックマークが環境名の横に表示されます。

gcloud

環境のサービス アカウントに新しいプリンシパルとして Cloud Composer サービス エージェント アカウントを追加し、Cloud Composer v2 API サービス エージェント拡張機能roles/composer.ServiceAgentV2Ext)のロールを付与します。

デフォルトでは、環境はデフォルトの Compute Engine サービス アカウントを使用します。

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

新しい環境を作成する:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。このクイックスタートでは example-environment を使用します。
  • LOCATION を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。
  • IMAGE_VERSION は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-2.6.6-airflow-2.6.3 を使用して、最新の Cloud Composer 2 イメージがある環境を作成します。

例:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-2.6.6-airflow-2.6.3

Terraform

Terraform を使用してこの環境を構成するには、Terraform 構成に次のリソース ブロックを追加して terraform apply を実行します。

このリソース ブロックを利用するには、Terraform が使用するサービス アカウントに、composer.environments.create 権限が有効になっているロールが割り当てられている必要があります。Terraform のサービス アカウントの詳細については、Google プロバイダの構成リファレンスをご覧ください。

Terraform を使用した Cloud Composer 環境の作成の詳細については、Terraform のドキュメントをご覧ください。

resource "google_composer_environment" "example" {
  provider = google-beta
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME を環境の名前にする。このクイックスタートでは example-environment を使用します。

  • LOCATION を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。

  • IMAGE_VERSION は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-2.6.6-airflow-2.6.3 を使用して、最新の Cloud Composer 2 イメージがある環境を作成します。

例:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.6.6-airflow-2.6.3"
    }
  }

}

環境の詳細を表示する

環境の作成が完了した後は、環境の情報(Cloud Composer のバージョン、Airflow ウェブ インターフェースの URL、Cloud Storage の DAG フォルダなど)を表示できます。

環境情報を表示するには、次のようにします。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. [環境の詳細] ページを表示するには、環境の名前 example-environment をクリックします。

DAG を作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

quickstart.py の Python コードは、次のとおりです。

  1. DAG(composer_sample_dag)を作成します。DAG は 1 日 1 回実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

DAG を作成するには、ローカルマシンに quickstart.py ファイルのコピーを作成します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG を Cloud Storage にアップロードする

Cloud Composer がスケジュールを設定するのは、環境の Cloud Storage バケット内の /dags フォルダにある DAG のみです。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の /dags フォルダにアップロードします。

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. example-environmentDAG フォルダのリンクをクリックして、/dags フォルダを開きます。

  3. バケットの詳細ページで、[ファイルをアップロード] をクリックしてから、quickstart.py のローカルコピーを選択します。

  4. ファイルをアップロードするには、[開く] をクリックします。

    DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG の実行がスケジュールされます。DAG が Airflow ウェブ インターフェースに表示されるまで数分かかる場合があります。

gcloud

gcloudquickstart.py をアップロードするには、次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

DAG を Airflow UI に表示する

Cloud Composer 環境ごとに、Airflow ウェブ インターフェースを実行するウェブサーバーを持ちます。Airflow ウェブ インターフェースから DAG を管理できます。

Airflow ウェブ インターフェースで DAG を表示するには、次の手順を行います。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. Airflow ウェブ インターフェースを開くには、example-environment についての [Airflow] リンクをクリックします。新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。

  3. Airflow ツールバーで、[DAGs] ページに移動します。

  4. DAG の詳細ページを開くには、composer_sample_dag をクリックします。

    Airflow UI の [DAG] ページ
    図 1: Airflow UI の [DAG] ページ(クリックして拡大)

    DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。

    composer_sample_dags DAG のツリービュー
    図 2: composer_sample_dags DAG のツリービュー

Airflow ログのタスク インスタンスの詳細を表示する

スケジュール設定した DAG には、print_dag_run_conf タスクが含まれています。タスクは、タスク インスタンスの Airflow ログにある DAG 実行の構成を出力します。

タスク インスタンスの詳細を表示するには、次の手順を行います。

  1. Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。

    ポインタを print_dag_run_conf タスクの上に合わせると、ステータスが表示されます。

    composer_sample_dags DAG のツリービュー
    図 3.print_dag_run_conf タスクのステータス
  2. print_dag_run_conf タスクをクリックします。

    タスク インスタンスのコンテキスト メニューで、メタデータを取得して、いくつかの操作を実行できます。

    composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー
    図 4. composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー
  3. タスク インスタンスのコンテキスト メニューで、[Log] をクリックします。

  4. ログ内で Running command: ['bash' を見つけて、bash 演算子からの出力を確認します。

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

  3. 環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。

    1. Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。

      [ディスク] に移動

    2. 環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。

      たとえば、このディスクの名前は pvc-02bc4842-2312-4347-8519-d87bdcd31115 です。Cloud Composer 2 のディスクは常に Balanced persistent disk タイプで、サイズ 2 GB です。

次のステップ