Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Apache Airflow には、DAG のトリガーと管理、DAG 実行とタスクに関する情報の取得、接続とユーザーの追加と削除など、タスクの実行に使用できるコマンドライン インターフェース(CLI)があります。
CLI 構文のバージョンについて
Cloud Composer 2 の Airflow では、Airflow 2 CLI 構文を使用します
サポートされている Airflow CLI コマンド
サポートされている Airflow コマンドの完全な一覧については、gcloud composer environments run
リファレンスをご覧ください。
始める前に
Cloud Composer で Google Cloud CLI を使用し、Airflow CLI コマンドを実行するための十分な権限が必要です。
2.4.0 より前のバージョンの Cloud Composer では、Airflow CLI コマンドを実行するために環境のクラスタのコントロール プレーンへのアクセス権が必要です。
Airflow CLI コマンドを実行
Airflow CLI コマンドをお使いの環境で実行するには、gcloud
を使用します。
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。SUBCOMMAND
を、サポートされている Airflow CLI コマンドのいずれかに置き換えます。SUBCOMMAND_ARGUMENTS
は、Airflow CLI コマンドの引数に置き換えます。
サブコマンド引数の区切り文字
--
で、指定された Airflow CLI コマンドの引数を区切ります。
Airflow 2
Airflow 2 CLI の構文:
- 複合 CLI コマンドをサブコマンドとして指定します。
- 複合コマンドの引数は、
--
区切り文字の後にサブコマンド引数として指定します。
gcloud composer environments run example-environment \
dags list -- --output=json
Airflow 1
Cloud Composer 2 は Airflow 2 のみをサポートしています。
デフォルトのロケーション
大部分の gcloud composer
コマンドでは、ロケーションを指定する必要があります。ロケーションを指定するには、--location
フラグを使用するか、デフォルトのロケーションを設定します。
例
たとえば、Cloud Composer 環境で ID 5077
を指定し、sample_quickstart
という名前の DAG をトリガーするには、次のようにします。
Airflow 2
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
Airflow 1
Cloud Composer 2 は Airflow 2 のみをサポートしています。
プライベート IP 環境でのコマンドの実行
Cloud Composer バージョン 2.4.0 以降では、追加の構成を行わずにプライベート IP 環境で Airflow CLI コマンドを実行できます。これらのコマンドを実行するために、マシンが環境のクラスタ コントロール プレーン エンドポイントへのアクセス権は必要はありません。
2.4.0 より前のバージョンの Cloud Composer の場合:
プライベート IP 環境で Airflow CLI コマンドを実行するには、GKE クラスタのコントロール プレーン エンドポイントにアクセスできるマシンで実行する必要があります。使用可能なオプションは、ご使用のプライベート クラスタの構成によって異なる場合があります。
環境のクラスタでパブリック エンドポイント アクセスが無効になっている場合、gcloud composer
コマンドを使用して Airflow CLI を実行することはできません。Airflow CLI コマンドを実行できるようにするには、次の操作を行います。
- VPC ネットワークに VM を作成します。
- クラスタの認証情報を取得します。次のコマンドを実行します。
bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
kubectl
を使用して Airflow コマンドを実行します。次に例を示します。
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
COMPOSER_NAMESPACE
は、composer-2-0-28-airflow-2-3-394zxc12411
のような名前空間に置き換えます。Cloud Composer は、ワークロードのリストで確認できます。また、kubectl get namespaces
コマンドを使用して確認することもできます。
環境のクラスタでパブリック エンドポイント アクセスが有効になっている場合は、承認済みネットワークに追加された外部 IP アドレスを持つマシンから Airflow CLI コマンドを実行することもできます。マシンからのアクセスを有効にするには、マシンの外部アドレスを環境の承認済みネットワークのリストに追加します。
Cloud Composer API を使用して Airflow CLI コマンドを実行する
Cloud Composer バージョン 2.4.0 以降では、Cloud Composer API を使用して Airflow CLI コマンドを実行できます。
コマンドの実行
environments.executeAirflowCommand
API リクエストを作成します。
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
以下を置き換えます。
PROJECT_ID
: プロジェクト IDLOCATION
: 環境が配置されているリージョン。ENVIRONMENT_NAME
: 環境の名前。AIRFLOW_COMMAND
: 実行する Airflow CLI コマンド(dags
など)。AIRFLOW_SUBCOMMAND
: 実行する Airflow CLI コマンドのサブコマンド(list
など)。- (省略可)
SUBCOMMAND_PARAMETER
: サブコマンドのパラメータ。複数のパラメータを使用する場合は、リストにアイテムを追加します。
例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
ポーリング コマンドのステータス
Cloud Composer API を使用して Airflow CLI コマンドを実行した後、PollAirflowCommand リクエストを発行し、エラーとステータス コード用の exitInfo
のフィールドを調べて、コマンドが正常に完了したかどうかを確認します。output
フィールドにはログ行が含まれます。
コマンドの実行ステータスを取得してログを取得するには、ExecuteAirflowCommandRequest
から返された executionId
、pod
、podNamespace
の値を指定します。
例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}
トラブルシューティング
クラスタ コントロール プレーンに接続できない
gcloud composer environments run
コマンドまたは kubectl
コマンドを実行すると、次のエラーが発生することがあります。
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
症状: このエラー メッセージは、これらのコマンドを実行したパソコンからのネットワーク接続がないことを示しています。
解決策: プライベート IP 環境でのコマンドの実行に示されているガイドラインに従うか、kubectl
コマンドのタイムアウト セクションに記載されている指示に従ってください。