クイックスタート

このページでは、Google Cloud Console で Cloud Composer 環境を作成し、単純な Apache Airflow DAG(ワークフロー)を実行する方法を説明します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Composer API を有効にします。

    API を有効にする

環境の作成

Console

  1. Cloud Console で、[環境の作成] ページに移動します。

    [環境の作成] ページを開く

  2. [名前] フィールドに「example-environment」と入力します。

  3. [ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。

  4. その他の環境構成オプションには、指定されたデフォルト値を使用します。

  5. 環境を作成するには、[作成] をクリックします。

  6. 環境の作成が完了するまで待ちます。処理が完了すると、緑色のチェックマークが環境名の左側に表示されます。

gcloud

gcloud composer environments create example-environment \
    --location LOCATION

LOCATION は、環境が配置される Compute Engine のリージョンに置き換えます。指定する場所が Composer を利用できる場所であることを確認します。

Terraform

Terraform を使用してこの環境を構成するには、Terraform 構成に次のリソース ブロックを追加して terraform apply を実行します。

resource "google_composer_environment" "composer-quickstart" {
    name   = "example-environment"
    region = "LOCATION"
}

LOCATION は、環境が配置される Compute Engine のリージョンに置き換えます。指定する場所が Composer を利用できる場所であることを確認します。

環境の詳細の表示

環境の作成が完了した後は、環境のデプロイ情報(Cloud Composer のバージョン、Airflow ウェブ インターフェースの URL、Cloud Storage の DAG フォルダなど)を表示できます。

デプロイ情報を表示するには、次の手順を行います。

  1. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  2. [環境の詳細] ページを表示するには、example-environment をクリックします。

DAG の作成

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

quickstart.py の Python コードは、次のとおりです。

  1. DAG(composer_sample_dag)を作成します。DAG は 1 日 1 回実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

DAG を作成するには、ローカルマシンに quickstart.py ファイルのコピーを作成します。

import datetime

import airflow
from airflow.operators import bash_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash_operator.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

DAG の Cloud Storage へのアップロード

Cloud Composer がスケジュールを設定するのは、環境の Cloud Storage バケット内の DAG フォルダにある DAG のみです。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の DAG フォルダに移動します。

  1. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  2. example-environmentDAG フォルダのリンクをクリックして、/dags フォルダを開きます。

  3. バケットの詳細ページで、[ファイルをアップロード] をクリックしてから、quickstart.py のローカルコピーを選択します。

  4. ファイルをアップロードするには、[開く] をクリックします。

    DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG のスケジュールが設定されます。DAG が Airflow ウェブ インターフェースに表示されるまでに数分かかる場合があります。

DAG の Airflow ウェブ インターフェースでの表示

各 Cloud Composer 環境には、DAG の管理に使用できる Airflow ウェブ インターフェースを実行するウェブサーバーが用意されています。

Airflow ウェブ インターフェースで DAG を表示するには、次の手順を行います。

  1. Cloud Console で、[環境] ページに移動します。

    [環境] ページを開く

  2. Airflow ウェブ インターフェースを開くには、example-environment についての [Airflow] リンクをクリックします。インターフェースが新しいブラウザ ウィンドウに表示されます。

  3. Airflow ツールバーで、[DAGs] をクリックします。

  4. DAG の詳細ページを開くには、composer_sample_dag をクリックします。

    DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。

Airflow ログのタスク インスタンスの詳細表示

スケジュール設定した DAG には、print_dag_run_conf タスクが含まれています。タスクは、タスク インスタンスの Airflow ログにある DAG 実行の構成を出力します。

タスク インスタンスの詳細を表示するには、次の手順を行います。

  1. Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。

    print_dag_run_conf タスクのグラフィックにカーソルを合わせると、ステータスが表示されます。 各タスクを囲む枠線の色もステータスを表しています(黄緑色の枠線は実行中など)。

  2. [print_dag_run_confタスク] をクリックします。

    タスク インスタンスのコンテキスト メニューが表示されます。 この時点で、メタデータを取得していくつかの操作を実行できます。

  3. タスク インスタンスのコンテキスト メニューで、[View Log] をクリックします。

  4. ログ内で Running: ['bash' を見つけて、bash 演算子からの出力を確認します。

クリーンアップ

このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] ページに移動

  2. 削除するプロジェクトが組織に関連付けられている場合、ページ上部の [組織] リストから組織を選択します。
  3. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  4. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

あるいは、このチュートリアルで使用したリソースを削除することもできます。

  1. Cloud Composer 環境を削除します
  2. Cloud Composer 環境の Cloud Storage バケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。
  3. Cloud Composer 環境の Pub/Sub トピックを削除しますcomposer-agentcomposer-backend))。Cloud Composer 環境を削除しても、これらのトピックは削除されません。

次のステップ