このページでは、ご使用の環境に応じたストレージ バケットの決定方法と、環境から DAG を追加、更新、および削除する方法について説明します。
Cloud Composer は Cloud Storage を使用して、Apache Airflow DAG(ワークフローとも呼ばれる)を保存します。各環境には、対応する Cloud Storage バケットがあります。Cloud Composer では、Cloud Storage バケット内の DAG のみがスケジュール設定されます。
始める前に
- Apache Airflow では強固な DAG 分離が行われないため、DAG の干渉を回避するために本番環境とテスト環境を個別に管理することをおすすめします。詳細については、DAG のテストをご覧ください。
- Cloud Composer 環境の Cloud Storage バケットで、プラグインを追加、更新するには次の権限が必要です。
storage.objectAdmin
は、ファイルをアップロードするために使用します。composer.environments.get
は、DAG 宛先バケットを検索するために使用します。Cloud Storage API またはgsutil
を使用する場合、この権限は必要ありません。
- DAG の変更が 3~5 分で行われます。タスクのステータスは、Airflow ウェブ インターフェースで確認できます。
ストレージ バケット名の決定
環境に関連付けられたストレージ バケットの名前を確認するには:
コンソール
- Cloud Console の [環境] ページを開きます。
[環境] ページを開く [名前] 列で、環境の名前をクリックして環境の詳細ページを開きます。
[構成] タブに、Cloud Storage バケットの名前が DAGs フォルダの右側に表示されます。
(省略可)Cloud Storage でバケットを表示するには、バケット名をクリックします。
gcloud
次のコマンドを入力します。
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="get(config.dagGcsPrefix)"
ここで
ENVIRONMENT_NAME
は、環境の名前です。LOCATION
は、環境が配置される Compute Engine のリージョンです。--format
は、すべての環境の詳細ではなく、dagGcsPrefix
プロパティのみを指定するためのオプションです。
dagGcsPrefix
プロパティにバケット名が表示されます。
gs://region-environment_name-random_id-bucket/
REST
認証情報の詳細については、Cloud Composer の認証をご覧ください。
- projects.locations.environments.get メソッドを呼び出します。
- config.dagGcsPrefix を Environment のレスポンスから読み取ります。
rpc
認証情報の詳細については、Cloud Composer の認証をご覧ください。
- Environments.GetEnvironment メソッドを呼び出します。
- config.dag_gcs_prefix を Environment のレスポンスから読み取ります。
python
google-auth ライブラリを使用して認証情報を取得し、requests ライブラリを使用して REST API を呼び出します。
DAG の追加または更新
DAG を追加または更新するには、DAG の Python .py
ファイルを Cloud Storage の環境の dags
フォルダに移動します。
コンソール
- Cloud Console の [環境] ページを開きます。
[環境] ページを開く [名前] 列で、環境の名前をクリックして環境の詳細ページを開きます。
[構成] タブに、Cloud Storage バケットの名前が DAGs フォルダの右側に表示されます。
Cloud Storage でバケットを表示するには、バケット名をクリックします。
デフォルトで、
dags
フォルダが開きます。[ファイルをアップロード] をクリックし、アップロードする DAG のローカルコピーを選択します。
dags
フォルダにファイルをアップロードするには、[開く] をクリックします。
gcloud
DAG を追加または更新するには:
gcloud composer environments storage dags import \ --environment ENVIRONMENT_NAME \ --location LOCATION \ --source LOCAL_FILE_TO_UPLOAD
ここで
ENVIRONMENT_NAME
は、環境の名前です。LOCATION
は、環境が配置される Compute Engine のリージョンです。LOCAL_FILE_TO_UPLOAD
は、アップロードする DAG です。
DAG の削除
環境内で DAG を削除する
DAG を削除するには、DAG の Python .py
ファイルを Cloud Storage の環境の dags
フォルダから削除します。DAG を削除しても、DAG メタデータは Airflow ウェブ インターフェースから削除されません。
コンソール
- Cloud Console の [環境] ページに移動します。
[環境] ページを開く [名前] 列で、環境の名前をクリックして環境の詳細ページを開きます。
[構成] タブに、Cloud Storage バケットの名前が
DAGs
フォルダの右側に表示されます。Cloud Storage でバケットを表示するには、バケット名をクリックします。
デフォルトで、
dags
フォルダが開きます。削除する DAG の名前の横にあるチェックボックスをクリックします。
Cloud Console の上部にある [削除] をクリックします。
表示されるダイアログで [OK] をクリックします。
gcloud
gcloud composer environments storage dags delete
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py
ここで
ENVIRONMENT_NAME
は、環境の名前です。LOCATION
は、環境が配置される Compute Engine のリージョンです。DAG_NAME.py
は、削除する DAG です。
Airflow 1.9.0: 削除された DAG のメタデータは Airflow ウェブ インターフェースで表示されたままになります。
Airflow 1.10.0 以降:
gcloud
ツールを使用して、DAG メタデータを削除できます。
Airflow ウェブ インターフェースからの DAG の削除
Airflow ウェブ インターフェースから DAG のメタデータを削除するには、次のように入力します。
gcloud composer environments run --location LOCATION \ ENVIRONMENT_NAME delete_dag -- DAG_NAME
ここで
ENVIRONMENT_NAME
は、環境の名前です。LOCATION
は、環境が配置される Compute Engine のリージョンです。DAG_NAME
は、削除する DAG の名前です。