クイックスタート

Cloud Composer 1 | Cloud Composer 2

このページでは、Cloud Composer 環境を作成し、Cloud Composer 2 で Apache Airflow DAG を実行する方法について説明します。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  6. Cloud Composer API を有効にします。

    API を有効にする

環境の作成

Console

  1. Google Cloud Console で、[環境の作成] ページに移動します。

    [環境の作成] に移動

  2. [名前] フィールドに「example-environment」と入力します。

  3. [ロケーション] プルダウン リストで、Cloud Composer 環境のリージョンを選択します。リージョンの選択については、利用可能なリージョンをご覧ください。

  4. その他の環境構成オプションには、指定されたデフォルト値を使用します。

  5. 環境を作成するには、[作成] をクリックします。

  6. 環境が作成されるまで待ちます。処理が完了すると、緑色のチェックマークが環境名の横に表示されます。

gcloud

gcloud composer environments create ENVIRONMENT_NAME \
    --location LOCATION \
    --image-version IMAGE_VERSION

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。 このクイックスタートでは example-environment を使用します。

  • LOCATION を Cloud Composer 環境のリージョンに置き換えます。リージョンの選択については、利用可能なリージョンをご覧ください。

  • IMAGE_VERSION は、Cloud Composer イメージの名前に置き換えます。

例:

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.0.0-preview.3-airflow-2.1.2

Terraform

Terraform を使用してこの環境を構成するには、Terraform 構成に次のリソース ブロックを追加して terraform apply を実行します。

Terraform を使用した Cloud Composer 環境の作成の詳細については、Terraform のドキュメントをご覧ください。

resource "google_composer_environment" "example" {
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。

  • LOCATION は、環境のリージョンに置き換えます。

    ロケーションは Cloud Composer 環境のリージョンです。リージョンの選択については、利用可能なリージョンをご覧ください。

  • IMAGE_VERSION は、Cloud Composer イメージの名前に置き換えます。

例:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.0.0-preview.3-airflow-2.1.2"
    }
  }

}

環境の詳細を表示する

環境の作成が完了した後は、環境の情報(Cloud Composer のバージョン、Airflow ウェブ インターフェースの URL、Cloud Storage の DAG フォルダなど)を表示できます。

環境情報を表示するには、次のようにします。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. [環境の詳細] ページを表示するには、環境の名前 example-environment をクリックします。

DAG を作成する

Airflow DAG は、スケジュールを設定して実行する体系的なタスクの集まりです。DAG は、標準の Python ファイルで定義されます。

quickstart.py の Python コードは、次のとおりです。

  1. DAG(composer_sample_dag)を作成します。DAG は 1 日 1 回実行されます。
  2. タスク(print_dag_run_conf)を実行します。このタスクは、bash 演算子を使用して DAG 実行の構成を出力します。

DAG を作成するには、ローカルマシンに quickstart.py ファイルのコピーを作成します。

import datetime

import airflow
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    'owner': 'Composer Example',
    'depends_on_past': False,
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': YESTERDAY,
}

with airflow.DAG(
        'composer_sample_dag',
        'catchup=False',
        default_args=default_args,
        schedule_interval=datetime.timedelta(days=1)) as dag:

    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id='print_dag_run_conf', bash_command='echo {{ dag_run.id }}')

DAG を Cloud Storage にアップロードする

Cloud Composer がスケジュールを設定するのは、環境の Cloud Storage バケット内の /dags フォルダにある DAG のみです。

DAG のスケジュールを設定するには、quickstart.py をローカルマシンから環境の /dags フォルダにアップロードします。

Console

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. example-environmentDAG フォルダのリンクをクリックして、/dags フォルダを開きます。

  3. バケットの詳細ページで、[ファイルをアップロード] をクリックしてから、quickstart.py のローカルコピーを選択します。

  4. ファイルをアップロードするには、[開く] をクリックします。

    DAG をアップロードすると、Cloud Composer によって DAG が Airflow に追加され、その後すぐに DAG の実行がスケジュールされます。DAG が Airflow ウェブ インターフェースに表示されるまで数分かかる場合があります。

gcloud

gcloudquickstart.py をアップロードするには、次のコマンドを実行します。

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Airflow ウェブ インターフェースで DAG を表示する

Cloud Composer 環境ごとに、Airflow ウェブ インターフェースを実行するウェブサーバーを持ちます。Airflow ウェブ インターフェースから DAG を管理できます。

Airflow ウェブ インターフェースで DAG を表示するには、次の手順を行います。

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. Airflow ウェブ インターフェースを開くには、example-environment についての [Airflow] リンクをクリックします。新しいブラウザ ウィンドウで Airflow ウェブ UI が開きます。

  3. Airflow ツールバーで、[DAGs] ページに移動します。

  4. DAG の詳細ページを開くには、composer_sample_dag をクリックします。

    DAG のページには、ワークフローのタスクと依存関係をグラフィカルに表現するツリービューが表示されます。

Airflow ログのタスク インスタンスの詳細を表示する

スケジュール設定した DAG には、print_dag_run_conf タスクが含まれています。タスクは、タスク インスタンスの Airflow ログにある DAG 実行の構成を出力します。

タスク インスタンスの詳細を表示するには、次の手順を行います。

  1. Airflow ウェブ インターフェースの DAG のツリービューで、[Graph View] をクリックします。

    ポインタを print_dag_run_conf タスクの上に合わせると、ステータスが表示されます。

  2. print_dag_run_conf タスクをクリックします。

    タスク インスタンスのコンテキスト メニューで、メタデータを取得して、いくつかの操作を実行できます。

  3. タスク インスタンスのコンテキスト メニューで、[Log] をクリックします。

  4. ログ内で Running command: ['bash' を見つけて、bash 演算子からの出力を確認します。

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

次のいずれかを選択します。

  • 最も簡単にクリーンアップするには、クイックスタート用に作成したプロジェクトを削除します。
  • 代わりに、個々のリソースを削除することもできます。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. 削除するプロジェクトが組織に関連付けられている場合は、[名前] 列の [組織] リストを展開します。
  3. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  4. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

リソースを個別に削除する

プロジェクトを削除する代わりに、このチュートリアルで使用したリソースを削除できます。

  1. Cloud Composer 環境を削除します。

    1. Google Cloud Console で [環境] ページに移動します。

      [環境] に移動

    2. [example-environment] を選択し、[削除] をクリックします。

    3. 環境が削除されるまで待ちます。

  2. 環境のバケットを削除します。Cloud Composer 環境を削除しても、バケットは削除されません。

    1. Google Cloud Console で、[ストレージ] > [ブラウザ] ページに移動します。

      [ストレージ] > [ブラウザ] に移動します。

    2. 環境のバケットを選択して、[削除] をクリックします。たとえば、このバケットの名前を us-central1-example-environ-c1616fe8-bucket にします。

  3. 環境の Redis のキューの永続ディスクを削除します。Cloud Composer 環境を削除しても、永続ディスクは削除されません。

    1. Google Cloud Console で、[Compute Engine] > [ディスク] に移動します。

      [ディスク] に移動

    2. 環境の Redis のキューの永続ディスクを選択し、[削除] をクリックします。たとえば、このディスクの名前は pvc-02bc4842-2312-4347-8519-d87bdcd31115 です。そのようなディスクは常に Balanced persistent disk タイプで、サイズは 2 GB です。

次のステップ