Cloud Composer 1 | Cloud Composer 2
このページでは、KubernetesPodOperator
を使用して Cloud Composer から Kubernetes Pod を Cloud Composer 環境の一部である Google Kubernetes Engine クラスタにデプロイして、環境に適切なリソースが配置されるようにする方法を説明します。
KubernetesPodOperator
は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても問題ありません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。
KubernetesPodOperator
は、以下を必要とする場合に適しています。
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Cloud Composer ワーカーのストック画像では使用できないバイナリ依存関係。
このページでは、次の KubernetesPodOperator
構成を含む Airflow DAG の例について説明します。
- 最小の構成: 必要なパラメータのみを設定します。
- テンプレートの構成: Jinja でテンプレートを作成できるパラメータを使用します。
Secret 変数の構成: Kubernetes Secret オブジェクトを Pod に渡します。
Cloud Composer 3 では、Pod アフィニティの構成は使用できません。代わりに、GKE 演算子を使用して別のクラスタで Pod を起動します。
すべての構成: すべての構成が含まれます。
始める前に
Cloud Composer 3 では、環境のクラスタが自動的にスケーリングされます。
KubernetesPodOperator
を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。 環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。環境のクラスタで実行する追加のワークロードの料金は Cloud Composer 2 料金モデルに従い、Cloud Composer Compute SKU を使用します。Cloud Composer 3 では、環境のクラスタはテナント プロジェクトに配置されています。ただし、Cloud Composer 2 とは異なり、KubernetesPodOperator はコード変更を必要とすることなく同じように動作します。Pod は、環境のクラスタ内で分離された名前空間で実行されますが、VPC ネットワークにアクセスできます(有効になっている場合)。
Cloud Composer 3 は、Workload Identity で GKE クラスタを使用します。デフォルトでは、新しく作成された名前空間または
composer-user-workloads
名前空間で実行される Pod は Google Cloud リソースにアクセスできません。Workload Identity を使用する場合は、名前空間に関連付けられた Kubernetes サービス アカウントを Google Cloud サービス アカウントにマッピングして、Google API やその他のサービスへのリクエストに対してサービス ID の承認を有効にする必要があります。このため、環境のクラスタ内で
composer-user-workloads
名前空間または新しく作成された名前空間で Pod を実行すると、Kubernetes と Google Cloud サービス アカウント間の適切な IAM バインディングが作成されず、これらの Pod は、Google Cloud プロジェクトのリソースにアクセスできません。Pod が Google Cloud リソースにアクセスできるようにするには、後で説明するように、
composer-user-workloads
名前空間を使用するか、独自の名前空間を作成します。プロジェクトのリソースへのアクセス権を付与するには、Workload Identity のガイダンスに沿って、次のようにバインディングを設定します。
- 環境のクラスタに別の名前空間を作成します。
composer-user-workloads/<namespace_name>
Kubernetes サービス アカウントと環境のサービス アカウント間のバインディングを作成します。- 環境のサービス アカウント アノテーションを Kubernetes サービス アカウントに追加します。
KubernetesPodOperator
を使用する場合は、namespace
パラメータとservice_account_name
パラメータで名前空間と Kubernetes サービス アカウントを指定します。
Cloud Composer 3 は、Workload Identity で GKE クラスタを使用します。GKE メタデータ サーバーが新しく作成された Pod でのリクエストの承認を開始するまでに数秒かかります。したがって、Pod の有効期間の最初の数秒以内に Workload Identity を使用して認証を試みると、失敗する可能性があります。この制限の詳細については、Workload Identity の制限事項をご覧ください。
Cloud Composer 2 では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。
デフォルトでは、クラスが選択されていない場合、
KubernetesPodOperator
を使用して Pod を作成する際にはgeneral-purpose
クラスが想定されます。各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。たとえば、
general-purpose
クラス内で実行される Pod では、最大 110 GiB のメモリを使用できます。
KubernetesPodOperator の構成
この例の操作を行うためには、kubernetes_pod_operator.py
ファイル全体をご使用の環境の dags/
フォルダに配置するか、関連する KubernetesPodOperator
コードを DAG に追加します。
次のセクションでは、この例のそれぞれの KubernetesPodOperator
構成について説明します。各構成変数について詳しくは、Airflow リファレンスをご覧ください。
構成を最小限にしたい
KubernetesPodOperator
の作成に必要なものは、Pod の name
、Pod を実行する namespace
、使用する image
、task_id
のみです。
次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config
のデフォルトが構成に使用されます。pod-ex-minimum
タスクを成功させるのに、コードを変更する必要はありません。
テンプレートの構成
Airflow は Jinja テンプレートの使用をサポートしています。必須変数(task_id
、name
、namespace
、image
など)を演算子を使用して宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds
、arguments
、env_vars
、config_file
など)をテンプレート化できます。
DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates
タスクが失敗します。ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value
)。2 番目のエラーは最初のエラーの修正後に取得できますが、config
で core/kube_config
が見つからないためタスクが失敗していることを示しています。
両方のエラーを修正するには、以下で説明されている手順を行います。
my_value
と gcloud
または Airflow UI を設定するには、次の手順を行います。
Airflow UI
Airflow 2 UI で:
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[List Variable] ページで、[新しいレコードを追加する] をクリックします。
[Add Variable] ページで、次の情報を入力します。
- Key:
my_value
- Val:
example_value
- Key:
[保存] をクリックします。
gcloud
Airflow 2 の場合は、次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
次のように置き換えます。
ENVIRONMENT
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
カスタムの config_file
(Kubernetes 構成ファイル)を参照するには、有効な Kubernetes 構成に kube_config
Airflow 構成オプションをオーバーライドします。
セクション | キー | 値 |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
環境が更新されるまで数分待ちます。次に、ex-kube-templates
タスクを再度実行し、ex-kube-templates
タスクが成功したことを確認します。
すべての構成
この例では、KubernetesPodOperator
で構成できるすべての変数が示されます。ex-all-configs
タスクを成功させるのに、コードを変更する必要はありません。
各変数について詳しくは、Airflow の KubernetesPodOperator
リファレンスをご覧ください。
CNCF Kubernetes プロバイダに関する情報
GKEStartPodOperator と KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes
プロバイダ内に実装されます。
CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。
バージョン 6.0.0
CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperator
で kubernetes_default
接続がデフォルトで使用されます。
バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default
接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。
バージョン 5.0.0
このバージョンでは、バージョン 4.4.0 と比較して、下位互換性のないいくつかの変更が導入されています。最も重要な変更は、バージョン 5.0.0 で使用されない kubernetes_default
接続に関連しています。
kubernetes_default
接続を変更する必要があります。Kube 構成パスを/home/airflow/composer_kube_config
に設定(図 1 を参照)するか、config_file
をKubernetesPodOperator
構成に追加(次のコード例を参照)する必要があります。
- 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。
トラブルシューティング
Pod の失敗をトラブルシューティングするヒント
Airflow UI でタスクログを確認するだけでなく、次のログも確認してください。
Airflow スケジューラと Airflow ワーカーの出力:
Google Cloud Console で [環境] ページに移動します。
環境の DAG リンクのリンク先に移動します。
環境のバケットで、1 つ上のレベルに移動します。
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
フォルダのログを確認します。
GKE ワークロードに表示される Google Cloud コンソールの詳細な Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。
GKEStartPodOperator
も使用する場合は、ゼロ以外のコードも返されます。
KubernetesPodOperator
と GKEStartPodOperator
を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。
KubernetesPodOperator
と GKEStartPodOperator
を使用する際には、一般的にコンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化します。
このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e
コマンドを含めることをおすすめします。
Pod のタイムアウト
KubernetesPodOperator
のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator
を作成する際に startup_timeout_seconds
パラメータを変更します。
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
新しい接続の確立に失敗した
GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
クラスタがアップグレードされているかどうかを確認するには、Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。