Cloud Composer 3 で Apache Airflow DAG を実行する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このクイックスタート ガイドでは、Cloud Composer 環境を作成し、Apache Airflow DAG を Cloud Composer 3 で実行する方法について説明します。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Cloud Composer API.

    Enable the API

  7. このクイックスタートを完了するために必要な権限を取得するには、プロジェクトに対する次の IAM ロールを付与するよう管理者に依頼してください。

    ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

    必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

環境の作成

  1. Google Cloud コンソールで、[環境の作成] ページに移動します。

    [環境の作成] に移動

  2. [名前] フィールドに「example-environment」と入力します。

  3. [ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。このガイドでは、us-central1 リージョンを使用します。

  4. その他の環境構成オプションには、指定されたデフォルト値を使用します。

  5. [作成] をクリックして環境が作成されるまで待ちます。

  6. 処理が完了すると、環境名の横に緑色のチェックマークが表示されます。

DAG ファイルを作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

このガイドでは、quickstart.py ファイルで定義された Airflow DAG の例を使用します。このファイルの Python コードは、次の処理を行います。

  1. DAG(composer_sample_dag)を作成します。この DAG は毎日実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

ローカルマシンに quickstart.py ファイルのコピーを保存します。

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

DAG ファイルを環境のバケットにアップロードする

すべての Cloud Composer 環境には、Cloud Storage バケットが関連付けられています。Cloud Composer の Airflow は、このバケットの /dags フォルダにある DAG のみをスケジュール設定します。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから使用中の環境の /dags フォルダにアップロードします。

  1. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、ご利用の環境の名前 example-environment をクリックします。[環境の詳細] ページが開きます。

  3. [DAG フォルダを開く] をクリックします。[バケットの詳細] ページが開きます。

  4. [ファイルをアップロード] をクリックし、quickstart.py のコピーを選択します。

  5. ファイルをアップロードするには、[開く] をクリックします。

DAG を表示する

DAG ファイルをアップロードすると、Airflow によって次の処理が行われます。

  1. アップロードした DAG ファイルを解析します。DAG が Airflow で使用可能になるまでに数分かかる場合があります。
  2. DAG を使用可能な DAG のリストに追加します。
  3. DAG ファイルで指定したスケジュールに沿って DAG を実行します。

DAG UI で DAG を表示して、DAG がエラーなしで処理され、Airflow で使用できることを確認します。DAG UI は、Google Cloud コンソールで DAG 情報を表示するための Cloud Composer インターフェースです。Cloud Composer は、ネイティブの Airflow ウェブ インターフェースである Airflow UI にもアクセスできます。

  1. 以前にアップロードした DAG ファイルを Airflow が処理し、最初の DAG 実行(後述)を完了するまで、約 5 分間待ちます。

  2. Google Cloud コンソールで [環境] ページに移動します。

    [環境] に移動

  3. 環境のリストで、ご利用の環境の名前 example-environment をクリックします。[環境の詳細] ページが開きます。

  4. [DAG] タブに移動します。

  5. composer_quickstart DAG が DAG のリストに含まれていることを確認します。

    DAG のリストに、状態やスケジュールなどの追加情報とともに composer_quickstart DAG が表示されます。
    図 1.DAG のリストに composer_quickstart DAG が表示されます(クリックして拡大)

DAG 実行の詳細を表示する

DAG の 1 回の実行は DAG 実行と呼ばれます。DAG ファイルの開始日が昨日に設定されているため、Airflow はサンプル DAG の DAG 実行をすぐに実行します。このようにして、Airflow は指定された DAG のスケジュールに追いつきます。

サンプル DAG には、コンソールで echo コマンドを実行する 1 つのタスク print_dag_run_conf が含まれています。このコマンドは、DAG に関するメタ情報(DAG 実行の数値識別子)を出力します。

  1. [DAG] タブで composer_quickstart をクリックします。DAG の [実行] タブが開きます。

  2. DAG 実行のリストで、最初のエントリをクリックします。

    DAG 実行のリストには、最近の DAG 実行(実行日とステータス)が表示されます。
    図 2.composer_quickstart DAG の DAG 実行のリスト(クリックして拡大)
  3. DAG 実行の詳細が表示され、サンプル DAG の個々のタスクに関する詳細情報が表示されます。

    print_dag_run_conf というエントリを含むタスクのリスト、開始時間、終了時間、所要時間
    図 3.DAG 実行で実行されたタスクのリスト(クリックして拡大)
  4. [Logs for DAG run] セクションには、DAG 実行のすべてのタスクのログが一覧表示されます。echo コマンドの出力はログで確認できます。

    タスクのログエントリ。1 つは出力で、もう 1 つは識別子を一覧表示します
    図 4.print_dag_run_conf タスクのログ(クリックして拡大)

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

このチュートリアルで使用したリソースを削除します。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

  3. 環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。

    1. Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。

      [ディスク] に移動

    2. 環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。

      たとえば、このディスクの名前は gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee です。Cloud Composer 1 のディスクは常に Standard persistent disk タイプで、サイズは 2 GB です。

次のステップ