Cloud Composer 1 で Apache Airflow DAG を実行する

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Composer 環境を作成し、Cloud Composer で Apache Airflow DAG を実行する方法について説明します。

Airflow を初めてご利用の場合は、Airflow のコンセプト、オブジェクト、使用方法の詳細を示したこちらのチュートリアルをご覧ください。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Cloud Composer API を有効にします。

    API を有効にする

環境の作成

コンソール

  1. Google Cloud Console で、[環境の作成] ページに移動します。

    [環境の作成] に移動

  2. [名前] フィールドに「example-environment」と入力します。

  3. [ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。

  4. その他の環境構成オプションには、指定されたデフォルト値を使用します。

  5. 環境を作成するには、[作成] をクリックします。

  6. 環境が作成されるまで待ちます。処理が完了すると、緑色のチェックマークが環境名の横に表示されます。

gcloud

環境のサービス アカウントに新しいプリンシパルとして Cloud Composer サービス エージェント アカウントを追加し、Cloud Composer v2 API サービス エージェント拡張機能roles/composer.ServiceAgentV2Ext)のロールを付与します。

デフォルトでは、環境ではデフォルトの Compute Engine サービス アカウントを使用します。

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

新しい環境を作成する:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。このクイックスタートでは example-environment を使用します。
  • LOCATION を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。
  • IMAGE_VERSION は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-1.20.12-airflow-1.10.15 を使用して、最新の Cloud Composer イメージがある環境を作成します。

例:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-1.20.12-airflow-1.10.15

Terraform

Terraform を使用してこの環境を構成するには、Terraform 構成に次のリソース ブロックを追加して terraform apply を実行します。

このリソース ブロックを利用するには、Terraform が使用するサービス アカウントに、composer.environments.create 権限が有効になっているロールが割り当てられている必要があります。Terraform のサービス アカウントの詳細については、Google プロバイダの構成リファレンスをご覧ください。

Terraform を使用した Cloud Composer 環境の作成の詳細については、Terraform のドキュメントをご覧ください。

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME を環境の名前にする。このクイックスタートでは example-environment を使用します。

  • LOCATION を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。

  • IMAGE_VERSION は、Cloud Composer イメージの名前に置き換えます。このガイドでは、composer-1.20.12-airflow-1.10.15 を使用して、最新の Cloud Composer イメージがある環境を作成します。

例:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-1.20.12-airflow-1.10.15"
    }
  }

}

環境の詳細を表示する

環境の作成が完了した後は、環境の情報(Cloud Composer のバージョン、Airflow ウェブ インターフェースの URL、Cloud Storage の DAG フォルダなど)を表示できます。

環境情報を表示するには、次のようにします。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. [環境の詳細] ページを表示するには、環境の名前 example-environment をクリックします。

DAG を作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

quickstart.py の Python コードは、次のとおりです。

  1. DAG(composer_sample_dag)を作成します。DAG は 1 日 1 回実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

DAG を作成するには、ローカルマシンに quickstart.py ファイルのコピーを作成します。

Airflow 1

import datetime

import airflow
from airflow.operators import bash_operator

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with airflow.DAG(
    "composer_sample_dag",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Airflow 2

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG を Cloud Storage にアップロードする

Cloud Composer がスケジュールを設定するのは、環境の Cloud Storage バケット内の /dags フォルダにある DAG のみです。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の /dags フォルダにアップロードします。

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. example-environmentDAG フォルダのリンクをクリックして、/dags フォルダを開きます。

  3. バケットの詳細ページで、[ファイルをアップロード] をクリックしてから、quickstart.py のローカルコピーを選択します。

  4. ファイルをアップロードするには、[開く] をクリックします。

    DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG の実行がスケジュールされます。DAG が Airflow ウェブ インターフェースに表示されるまで数分かかる場合があります。

gcloud

gcloudquickstart.py をアップロードするには、次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

DAG を Airflow UI に表示する

Cloud Composer 環境ごとに、Airflow ウェブ インターフェースを実行するウェブサーバーを持ちます。Airflow ウェブ インターフェースから DAG を管理できます。

Airflow ウェブ インターフェースで DAG を表示するには、次の手順を行います。

Airflow 1

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. Airflow ウェブ インターフェースを開くには、example-environment についての [Airflow] リンクをクリックします。新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。

  3. Airflow ツールバーで、[DAGs] ページに移動します。

  4. DAG の詳細ページを開くには、composer_sample_dag をクリックします。

    Airflow UI の DAG ページ
    図 1: Airflow UI の DAG ページ(クリックして拡大)

    DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。

    composer_sample_dags DAG のツリービュー
    図 2: composer_sample_dags DAG のツリービュー

Airflow 2

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. Airflow ウェブ インターフェースを開くには、example-environment についての [Airflow] リンクをクリックします。新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。

  3. Airflow ツールバーで、[DAGs] ページに移動します。

  4. DAG の詳細ページを開くには、composer_sample_dag をクリックします。

    Airflow UI の DAG ページ
    図 1: Airflow UI の DAG ページ(クリックして拡大)

    DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。

    composer_sample_dags DAG のツリービュー
    図 2: composer_sample_dags DAG のツリービュー

Airflow ログのタスク インスタンスの詳細を表示する

スケジュール設定した DAG には、print_dag_run_conf タスクが含まれています。タスクは、タスク インスタンスの Airflow ログにある DAG 実行の構成を出力します。

タスク インスタンスの詳細を表示するには、次の手順を行います。

Airflow 1

  1. Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。

    ポインタを print_dag_run_conf タスクの上に合わせると、ステータスが表示されます。

    composer_sample_dags DAG のツリービュー
    図 3.print_dag_run_conf タスクのステータス
  2. print_dag_run_conf タスクをクリックします。

    タスク インスタンスのコンテキスト メニューで、メタデータを取得して、いくつかの操作を実行できます。

    composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー
    図 4. composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー
  3. タスク インスタンスのコンテキスト メニューで、[View Log] をクリックします。

  4. ログ内で Running: ['bash' を見つけて、bash 演算子からの出力を確認します。

    bash 演算子のログ出力
    図 5. bash 演算子のログ出力

Airflow 2

  1. Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。

    ポインタを print_dag_run_conf タスクの上に合わせると、ステータスが表示されます。

    composer_sample_dags DAG のツリービュー
    図 3.print_dag_run_conf タスクのステータス
  2. print_dag_run_conf タスクをクリックします。

    タスク インスタンスのコンテキスト メニューで、メタデータを取得して、いくつかの操作を実行できます。

    composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー
    図 4. composer_sample_dags タスクのタスク インスタンスのコンテキスト メニュー
  3. タスク インスタンスのコンテキスト メニューで、[Log] をクリックします。

  4. ログ内で Running command: ['bash' を見つけて、bash 演算子からの出力を確認します。

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

  3. 環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。

    1. Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。

      [ディスク] に移動

    2. 環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。

      たとえば、このディスクの名前は gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee です。Cloud Composer 1 のディスクは常に Standard persistent disk タイプで、サイズは 2 GB です。

次のステップ