Airflow コマンドライン インターフェースにアクセスする

Cloud Composer 1 | Cloud Composer 2

Apache Airflow には、DAG のトリガーと管理、DAG 実行とタスクに関する情報の取得、接続とユーザーの追加と削除など、タスクの実行に使用できるコマンドライン インターフェース(CLI)があります。

CLI 構文のバージョンについて

Cloud Composer 2 の Airflow では、Airflow 2 CLI 構文を使用します

サポートされている Airflow CLI コマンド

サポートされている Airflow コマンドの完全な一覧については、gcloud composer environments run リファレンスをご覧ください。

準備

Airflow CLI コマンドを実行

Airflow CLI コマンドをお使いの環境で実行するには、gcloud を使用します。

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

以下のように置き換えます。

  • ENVIRONMENT_NAME を環境の名前にする。
  • LOCATION は、環境が配置されているリージョン。
  • SUBCOMMAND を、サポートされている Airflow CLI コマンドのいずれかに置き換えます。
  • SUBCOMMAND_ARGUMENTS は、Airflow CLI コマンドの引数に置き換えます。

サブコマンド引数の区切り文字

-- で、指定された Airflow CLI コマンドの引数を区切ります。

Airflow 2

Airflow 2 CLI の構文:

  • 複合 CLI コマンドをサブコマンドとして指定します。
  • 複合コマンドの引数は、-- 区切り文字の後にサブコマンド引数として指定します。
gcloud composer environments run example-environment \
    dags list -- --output=json

Airflow 1

Cloud Composer 2 は Airflow 2 のみをサポートしています。

デフォルトのロケーション

大部分の gcloud composer コマンドでは、ロケーションを指定する必要があります。ロケーションを指定するには、--location フラグを使用するか、デフォルトのロケーションを設定します

たとえば、Cloud Composer 環境で ID 5077 を指定し、sample_quickstart という名前の DAG をトリガーするには、次のようにします。

Airflow 2

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Airflow 1

Cloud Composer 2 は Airflow 2 のみをサポートしています。

プライベート IP 環境でのコマンドの実行

Cloud Composer バージョン 2.4.0 以降では、構成を追加しなくても、プライベート IP 環境で Airflow CLI コマンドを実行できます。これらのコマンドを実行するために、マシンが環境のクラスタ コントロール プレーン エンドポイントへのアクセス権は必要はありません。

Cloud Composer バージョン 2.4.0 より前:

プライベート IP 環境で Airflow CLI コマンドを実行するには、GKE クラスタのコントロール プレーン エンドポイントにアクセスできるマシンで実行する必要があります。使用可能なオプションは、ご使用のプライベート クラスタの構成によって異なる場合があります。

環境のクラスタでパブリック エンドポイント アクセスが無効になっている場合、gcloud composer コマンドを使用して Airflow CLI を実行することはできません。Airflow CLI コマンドを実行できるようにするには、次の手順を行います。

  1. VPC ネットワークに VM を作成する
  2. クラスタ認証情報を取得します。次のコマンドを実行します。 bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
  • kubectl を使用して Airflow コマンドを実行します。次に例を示します。
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

COMPOSER_NAMESPACEcomposer-2-0-28-airflow-2-3-394zxc12411 のような名前空間に置き換えます。Cloud Composer は、ワークロード リストで確認できます。また、kubectl get namespaces コマンドを使用して確認することもできます。

環境のクラスタでパブリック エンドポイント アクセスが有効になっている場合は、承認済みネットワークに追加された外部 IP アドレスを持つマシンから Airflow CLI コマンドを実行することもできます。マシンからのアクセスを有効にするには、環境の承認済みネットワークのリストに、マシンの外部アドレスを追加します。

Cloud Composer API を使用して Airflow CLI コマンドを実行する

Cloud Composer バージョン 2.4.0 以降では、Cloud Composer API を使用して Airflow CLI コマンドを実行できます。

コマンドの実行

environments.executeAirflowCommand API リクエストを作成します。

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

以下を置き換えます。

  • PROJECT_ID: プロジェクト ID
  • LOCATION: 環境が配置されているリージョン。
  • ENVIRONMENT_NAME: 環境の名前。
  • AIRFLOW_COMMAND: 実行する Airflow CLI コマンド(dags など)。
  • AIRFLOW_SUBCOMMAND: 実行する Airflow CLI コマンドのサブコマンド(list など)。
  • (省略可)SUBCOMMAND_PARAMETER: サブコマンドのパラメータ。複数のパラメータを使用する場合は、リストにアイテムを追加します。

例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

ポーリング コマンドのステータス

Cloud Composer API を使用して Airflow CLI コマンドを実行した後、PollAirflowCommand リクエストを発行し、エラーとステータス コード用の exitInfo のフィールドを調べて、コマンドが正常に完了したかどうかを確認します。output フィールドにはログ行が含まれます。

コマンド実行ステータスを取得してログを取得するには、ExecuteAirflowCommandRequest によって返される executionIdpodpodNamespace の値を指定します。

例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

トラブルシューティング

クラスタ コントロール プレーンに接続できない

gcloud composer environments run コマンドまたは kubectl コマンドを実行すると、次のエラーが発生することがあります。

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

症状: このエラー メッセージは、これらのコマンドを実行したパソコンからネットワーク接続がないことを示しています。

解決策: プライベート IP 環境でのコマンドの実行に示されているガイドラインに従うか、kubectl コマンドのタイムアウト セクションに記載されている指示に従ってください。