DAG を追加および更新する

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

このページでは、Cloud Composer 環境で DAG を管理する方法について説明します。

Cloud Composer は、Cloud Storage バケットを使用して Cloud Composer 環境の DAG を保存します。環境では、このバケットから Airflow ワーカーやスケジューラなどの Airflow コンポーネントに DAG が同期されます。

準備

  • Apache Airflow では強固な DAG 分離が行われないため、DAG の干渉を回避するために本番環境とテスト環境を個別に管理することをおすすめします。詳細については、DAG のテストをご覧ください。
  • アカウントに DAG の管理に十分な権限が付与されていることを確認してください。
  • DAG に対する変更は 3~5 分以内に Airflow に反映されます。タスクのステータスは、Airflow ウェブ インターフェースで確認できます。

環境のバケットにアクセスする

環境に関連付けられたバケットにアクセスするには:

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、環境の名前を含む行を探し、[DAG フォルダ] 列で [DAG] リンクをクリックします。[バケットの詳細] ページが開きます。環境のバケット内の /dags フォルダの内容が表示されます。

gcloud

gcloud CLI には、環境のバケットに DAG を追加および削除するための個別のコマンドがあります。

環境のバケットを操作する場合は、Google Cloud CLI を使用することもできます。環境のバケットのアドレスを取得するには、次の gcloud CLI コマンドを実行します。

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。

例:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

environments.get API リクエストを作成します。Environment リソースでは、dagGcsPrefix リソースの EnvironmentConfig リソースは環境のバケットのアドレスです。

例:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

google-auth ライブラリを使用して認証情報を取得し、requests ライブラリを使用して REST API を呼び出します。

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

DAG を追加または更新する

DAG を追加または更新するには、DAG の Python .py ファイルを環境のバケット内の /dags フォルダに移動します。

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、環境の名前を含む行を探し、[DAG フォルダ] 列で [DAG] リンクをクリックします。[バケットの詳細] ページが開きます。環境のバケット内の /dags フォルダの内容が表示されます。

  3. [ファイルをアップロード] をクリックします。次に、ブラウザのダイアログを使用して DAG の Python .py ファイルを選択し、確定します。

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • LOCAL_FILE_TO_UPLOAD は、DAG の Python .py ファイルに置き換えます。

例:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

アクティブな DAG の実行を含む DAG を更新する

DAG 実行がアクティブな DAG を更新する場合は、次のようになります。

  • 現在実行中のすべてのタスクは、元の DAG ファイルを使用して終了します。
  • スケジュールされているが、現在実行されていないすべてのタスクでは、更新された DAG ファイルが使用されます。
  • 更新された DAG ファイルに存在しないタスクは、すべて削除済みとしてマークされます。

頻繁なスケジュールを実行する DAG を更新する

DAG ファイルをアップロードした後、Airflow がこのファイルを読み込んで DAG を更新するまでには時間がかかります。DAG が頻度の高いスケジュールで実行される場合は、DAG が更新されたバージョンの DAG ファイルを使用するようにしてください。手順は次のとおりです。

  1. DAG を Airflow UI で一時停止します。

  2. 更新した DAG ファイルをアップロードします。

  3. Airflow UI に更新が表示されるまで待ちます。これは、DAG がスケジューラによって正しく解析され、Airflow データベースで更新されたことを意味します。

    Airflow UI に更新された DAG が表示されても、Airflow ワーカーに更新された DAG ファイルがあることは保証されません。これは、DAG ファイルがスケジューラとワーカーに対して別々に同期されるためです。

  4. 待機時間を延長して、DAG ファイルが環境内のすべてのワーカーと同期されるようにすることをおすすめします。同期は毎分数回行われます。正常な環境では、約 20~30 秒待つとすべてのワーカーが同期します。

  5. (省略可)すべてのワーカーに DAG ファイルの新しいバージョンが含まれていることを確認するには、個々のワーカーのログを調べます。手順は次のとおりです。

    1. Google Cloud コンソールで、環境の [ログ] タブを開きます。

    2. [Composer のログ] > [インフラストラクチャ] > [Cloud Storage の同期] 項目に移動し、環境内のすべてのワーカーのログを検査します。新しい DAG ファイルをアップロードした後のタイムスタンプを持つ最新の Syncing dags directory ログアイテムを探します。その後に Finished syncing アイテムが表示されている場合は、このワーカーで DAG が正常に同期されています。

  6. DAG の一時停止を解除します。

環境内の DAG を削除する

DAG を削除するには、DAG の Python .py ファイルを環境のバケット内の環境の /dags フォルダから削除します。

コンソール

  1. Google Cloud Console で [環境] ページに移動します。

    [環境] に移動

  2. 環境のリストで、環境の名前を含む行を探し、[DAG フォルダ] 列で [DAG] リンクをクリックします。[バケットの詳細] ページが開きます。環境のバケット内の /dags フォルダの内容が表示されます。

  3. DAG ファイルを選択し、[削除] をクリックして、オペレーションを確定します。

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • DAG_FILE は、DAG の Python .py ファイルに置き換えます。

例:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Airflow UI から DAG を削除する

Airflow ウェブ インターフェースから DAG のメタデータを削除するには、次のように操作します。

Airflow UI

  1. 環境の Airflow UI に移動します。
  2. DAG で、[DAG を削除] をクリックします。

gcloud

1.14.0 より前の Airflow 1 バージョンでは、gcloud CLI で次のコマンドを実行します。

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

Airflow 2、Airflow 1.14.0 以降のバージョンでは、gcloud CLI で次のコマンドを実行します。

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

次のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • DAG_NAME は、削除する DAG の名前です。

次のステップ