DAG(ワークフロー)の追加と更新

このページでは、ご使用の環境に応じたストレージ バケットの決定方法と、環境から DAG を追加、更新、および削除する方法について説明します。

Cloud Composer は Cloud Storage を使用して、Apache Airflow DAG(ワークフローとも呼ばれる)を保存します。各環境には、対応する Cloud Storage バケットがあります。Cloud Composer では、Cloud Storage バケット内の DAG のみがスケジュール設定されます。

始める前に

  • Apache Airflow では強固な DAG 分離が行われないため、DAG の干渉を回避するために本番環境とテスト環境を個別に管理することをおすすめします。詳細については、DAG のテストをご覧ください。
  • Cloud Composer 環境の Cloud Storage バケットで、プラグインを追加、更新するには次の権限が必要です。
    • storage.objectAdmin は、ファイルをアップロードするために使用します。
    • composer.environments.get は、DAG 宛先バケットを検索するために使用します。Cloud Storage API または gsutil を使用する場合、この権限は必要ありません。
  • DAG の変更が 3~5 分で行われます。タスクのステータスは、Airflow ウェブ インターフェースで確認できます。

ストレージ バケット名の決定

環境に関連付けられたストレージ バケットの名前を確認するには:

コンソール

  1. Cloud Console の [環境] ページを開きます。
    [環境] ページを開く
  2. [名前] 列で、環境の名前をクリックして環境の詳細ページを開きます。

    [構成] タブに、Cloud Storage バケットの名前が DAGs フォルダの右側に表示されます。

  3. (省略可)Cloud Storage でバケットを表示するには、バケット名をクリックします。

gcloud

次のコマンドを入力します。

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

ここで

  • ENVIRONMENT_NAME は、環境の名前です。
  • LOCATION は、環境が配置される Compute Engine のリージョンです。
  • --format は、すべての環境の詳細ではなく、dagGcsPrefix プロパティのみを指定するためのオプションです。

dagGcsPrefix プロパティにバケット名が表示されます。

gs://region-environment_name-random_id-bucket/

REST

認証情報の詳細については、Cloud Composer の認証をご覧ください。

  1. projects.locations.environments.get メソッドを呼び出します。
  2. config.dagGcsPrefixEnvironment のレスポンスから読み取ります。

rpc

認証情報の詳細については、Cloud Composer の認証をご覧ください。

  1. Environments.GetEnvironment メソッドを呼び出します。
  2. config.dag_gcs_prefixEnvironment のレスポンスから読み取ります。

python

google-auth ライブラリを使用して認証情報を取得し、requests ライブラリを使用して REST API を呼び出します。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

DAG の追加または更新

DAG を追加または更新するには、DAG の Python .pyファイルを Cloud Storage の環境の dags フォルダに移動します。

コンソール

  1. Cloud Console の [環境] ページを開きます。
    [環境] ページを開く
  2. [名前] 列で、環境の名前をクリックして環境の詳細ページを開きます。

    [構成] タブに、Cloud Storage バケットの名前が DAGs フォルダの右側に表示されます。

  3. Cloud Storage でバケットを表示するには、バケット名をクリックします。

    デフォルトで、dags フォルダが開きます。

  4. [ファイルをアップロード] をクリックし、アップロードする DAG のローカルコピーを選択します。

  5. dags フォルダにファイルをアップロードするには、[開く] をクリックします。

gcloud

DAG を追加または更新するには:

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

ここで

  • ENVIRONMENT_NAME は、環境の名前です。
  • LOCATION は、環境が配置される Compute Engine のリージョンです。
  • LOCAL_FILE_TO_UPLOAD は、アップロードする DAG です。

DAG の削除

環境内で DAG を削除する

DAG を削除するには、DAG の Python .py ファイルを Cloud Storage の環境の dags フォルダから削除します。DAG を削除しても、DAG メタデータは Airflow ウェブ インターフェースから削除されません。

コンソール

  1. Cloud Console の [環境] ページに移動します。
    [環境] ページを開く
  2. [名前] 列で、環境の名前をクリックして環境の詳細ページを開きます。

    [構成] タブに、Cloud Storage バケットの名前が DAGs フォルダの右側に表示されます。

  3. Cloud Storage でバケットを表示するには、バケット名をクリックします。

    デフォルトで、dags フォルダが開きます。

  4. 削除する DAG の名前の横にあるチェックボックスをクリックします。

  5. Cloud Console の上部にある [削除] をクリックします。

  6. 表示されるダイアログで [OK] をクリックします。

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

ここで

  • ENVIRONMENT_NAME は、環境の名前です。
  • LOCATION は、環境が配置される Compute Engine のリージョンです。
  • DAG_NAME.py は、削除する DAG です。
  • Airflow 1.9.0: 削除された DAG のメタデータは Airflow ウェブ インターフェースで表示されたままになります。

  • Airflow 1.10.0 以降: gcloud ツールを使用して、DAG メタデータを削除できます。

Airflow ウェブ インターフェースからの DAG の削除

Airflow ウェブ インターフェースから DAG のメタデータを削除するには、次のように入力します。

  gcloud composer environments run --location LOCATION \
  ENVIRONMENT_NAME delete_dag -- DAG_NAME

ここで

  • ENVIRONMENT_NAME は、環境の名前です。
  • LOCATION は、環境が配置される Compute Engine のリージョンです。
  • DAG_NAME は、削除する DAG の名前です。

次のステップ