このページでは、KubernetesPodOperator
を使用して Cloud Composer から Kubernetes Pod を Cloud Composer 環境の一部である Google Kubernetes Engine クラスタに起動し、環境に適切なリソースを確保する方法について説明します。
KubernetesPodOperator
は、以下を必要とする場合に適しています。
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Cloud Composer ワーカー イメージでは使用できないバイナリ依存関係。
このページでは、次の KubernetesPodOperator
構成を含む DAG の例について説明します。
- 最小の構成: 必要なパラメータのみを設定します。
- テンプレートの構成: Jinja でテンプレートを作成できるパラメータを使用します。
- Secret 変数の構成: Kubernetes Secret オブジェクトを Pod に渡します。
- Pod アフィニティの構成: Pod のスケジュールに使用できるノードを制限します。
- すべての構成: すべての構成が含まれます。
始める前に
- Cloud Composer の最新バージョンを使用することをおすすめします。少なくとも、使用するバージョンは、Cloud Composer の非推奨のポリシーとサポート ポリシーの一部としてサポートされている必要があります。イメージのバージョンを確認するには、環境の詳細をご覧ください。
- 環境に十分なリソースが存在することを確認してください。 リソースが不足している Pod を環境で起動すると、Airflow ワーカーと Airflow スケジューラでエラーが発生する可能性があります。
環境に適したリソースの確保
Cloud Composer 環境を作成するときは、環境のコンピューティング能力を指定し、GKE クラスタに一定量を割り当てます。Kubernetes Pod を環境に起動すると、CPU やメモリなどのリソースに対して、プログラム同士が競合する可能性があります。Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しません。
リソースの不足を防ぐには、次の操作を行います。
ノードプールを作成する
Cloud Composer 環境でリソースの不足を回避するための推奨方法は、新しいノードプールを作成し、そのプールのリソースのみを使用して実行するように Kubernetes Pod を構成することです。
既存のクラスタにノードプールを作成するには、次の手順を行います。
コンソール
Cloud Console で、GKE メニューに移動します。
目的のクラスタを選択します。
[編集] をクリックします。
[ノードプール] で、[ノードプールを追加] をクリックします。
ノードプールを構成します。
(省略可)詳細オプション(自動アップグレードや自動スケーリングなど)を有効にします。
[保存] をクリックします。
gcloud
次のコマンドを入力します。
gcloud container node-pools create POOL_NAME \ --cluster CLUSTER_NAME \ --project PROJECT_ID \ --zone ZONE \ ADDITIONAL_FLAGS
ここで
POOL_NAME
は、ノードプールの名前です。CLUSTER
は、ノードプールを作成するクラスタの名前です。PROJECT_ID
は、Cloud Composer のプロジェクト名です。ZONE
は、GKE クラスタが配置されるゾーンです。オプションの一覧については、
gcloud container node-pools create
ドキュメントをご覧ください。node-pools create
リクエストが正しく処理されると、ノードプール情報が返されます。Creating node pool example-pool...done. Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool]. NAME MACHINE_TYPE DISK_SIZE_GB NODE_VERSION example-pool n1-standard-1 100 1.2.4
環境内のノード数を増やす
Cloud Composer 環境でノード数を増やすと、ワーカーが使用できるコンピューティング能力が増加します。指定したマシンタイプに用意されているよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加のリソースは割り当てられません。
ノード数を増やすには、環境を更新してください。
適切なマシンタイプを指定する
Cloud Composer 環境の作成の際に、マシンタイプを指定できます。使用可能なリソースを確保するには、Cloud Composer 環境で行うコンピューティングの種類に適したマシンタイプを指定します。
KubernetesPodOperator の構成
この例に沿った操作を行うために、kubernetes_pod_operator.py
ファイル全体をご使用の環境の DAG フォルダに配置するか、関連する KubernetesPodOperator
コードを DAG に追加します。
次のセクションでは、この例のそれぞれの KubernetesPodOperator
構成について説明します。各構成変数について詳しくは、Airflow リファレンスをご覧ください。
最小の構成
KubernetesPodOperator の作成に必要になるのは、name
、namespace
、image
、および task_id
のみです。
次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config
のデフォルトが構成に使用されます。pod-ex-minimum
タスクを成功させるのに、コードを変更する必要はありません。
テンプレートの構成
Airflow は Jinja テンプレートの使用をサポートしています。
必須変数(task_id
、name
、namespace
、image
)を演算子を使用して宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds
、arguments
、env_vars
、config_file
など)をテンプレート化できます。
DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates
タスクが失敗します。タスクをデバッグしてエラーを解決する方法は、次のとおりです。
ex-kube-templates
タスクの失敗を確認します。ex-kube-templates
タスクのログを確認します。ログによると、このタスクは適切な変数が存在しないため失敗しています(
my_value
)。my_value
とgcloud
または Airflow UI を設定するには、次の手順を行います。gcloud
次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set my_value example_value
ここで
ENVIRONMENT
は、Cloud Composer 環境の名前です。LOCATION
は、Cloud Composer 環境が配置されているリージョンです。
Airflow UI
- ツールバーで、[Admin] > [Variables] をクリックします。
- [作成] をクリックします。
- 次の情報を入力します。
- Key:
my_value
- Val:
example_value
- Key:
- [保存] をクリックします。
ex-kube-templates
タスクのステータスを確認します。ex-kube-templates
タスクはまだ失敗します。ログを確認すると、core/kubeconfig
がconfig
で見つからないためタスクが失敗しています。カスタムのconfig_file
(Kubernetes 構成ファイル)を参照するには、airflow.cfg
ファイルのkube_config
変数を有効な Kubernetes 構成に設定する必要があります。kube_config
変数を設定するには、次のコマンドを入力します。gcloud composer environments update ENVIRONMENT \ --location LOCATION \ --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config
ここで
ENVIRONMENT
は、Cloud Composer 環境の名前です。LOCATION
は、Cloud Composer 環境が配置されているリージョンです。
環境が更新されるまで数分待ちます。
ex-kube-templates
タスクの成功を確認します。
Secret 変数を構成する
Kubernetes Secret は、少量の機密データを含むオブジェクトです。Secret を Kubernetes Pod に渡すには、KubernetesPodOperator
を使用します。
Kubernetes で Secret を定義してください。Secret を定義しない場合、Pod の起動に失敗します。
この例では、Kubernetes Secret airflow-secrets
を SQL_CONN
という名前の Kubernetes の環境変数にデプロイしています(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。
Secret は次のようになります。
Kubernetes Secret の名前は、secret
変数で定義されます。
この特定の Secret の名前は、airflow-secrets
です。この Secret は、環境変数として公開されるように deploy_type
で指定されます。この Secret で deploy_target
に設定される環境変数は、SQL_CONN
です。最後に、deploy_target
に保管される Secret の key
は sql_alchemy_conn
です。
演算子の構成は、次のようになります。
DAG または環境を変更しない場合、ex-kube-secrets
タスクは失敗します。タスクをデバッグしてエラーを解決する方法は、次のとおりです。
ex-kube-secrets
タスクの失敗を確認します。ex-kube-secrets
タスクのログを確認します。ログを確認すると、
Pod took too long to start
エラーのためタスクが失敗していることがわかります。このエラーは、構成(secret_env
)で指定された Secret が見つからない場合に発生します。gcloud
を使用して Secret を設定するには、次の手順を行います。次のコマンドを実行して、Cloud Composer 環境の詳細を表示します。
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
ここで
ENVIRONMENT
は、Cloud Composer 環境の名前です。LOCATION
は、Cloud Composer 環境が配置されているリージョンです。次のような出力が返されます。
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
GKE クラスタ ID を取得するには、後で取得できる場所に、
/clusters/
の後の出力(文末は-gke
)をコピーして貼り付けます。この出力がクラスタ ID になります。ゾーンを取得するには、後で取得できる場所に、
/zones/
の後の出力をコピーして貼り付けます。次のコマンドを実行して GKE クラスタに接続します。
gcloud container clusters get-credentials CLUSTER_ID \ --zone ZONE \ --project PROJECT
ここで
CLUSTER_ID
は、GKE クラスタ ID です。ZONE
は、GKE が配置されるゾーンです。PROJECT
は、Google Cloud プロジェクトの ID です。
次のコマンドを実行して、
sql_alchemy_conn
~test_value
の値を設定する Kubernetes Secret を作成します。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
Secret を設定した後、もう一度
ex-kube-secrets
タスクを実行します。ex-kube-secrets
タスクの成功を確認します。
Pod アフィニティの構成
KubernetesPodOperator
で affinity
パラメータを構成する際に、Pod のスケジュールを設定する対象ノードを制御します(特定のノードプール内のノードなど)。この例では、演算子は pool-0
や pool-1
という名前のノードプールでのみ実行されます。
例のように構成されている場合、タスクは失敗します。タスクをデバッグしてエラーを解決する方法は、次のとおりです。
ex-pod-affinity
タスクの失敗を確認します。ex-pod-affinity
タスクのログを確認します。ログを確認すると、
pool-0
とpool-1
のノードプールが存在しないためタスクが失敗していることがわかります。values
のノードプールが存在するようにするため、次のいずれかの構成変更を行います。- ノードプールを以前に作成した場合は、
pool-0
とpool-1
をノードプールの名前に置き換えてからもう一度 DAG をアップロードします。 pool-0
またはpool-1
という名前のノードプールを作成します。両方のノードプールを作成できますが、タスクを成功させるために必要なのは 1 つのみです。pool-0
とpool-1
をdefault-pool
に置き換えます。これは、Airflow が使用するデフォルトのプールです。もう一度 DAG をアップロードします。 注: デフォルトでは、Kubernetes Pod の予定はdefault-pool
に設定されます。 プールを後で追加すると、プールはdefault-pool
に制限されます。
- ノードプールを以前に作成した場合は、
環境が更新されるまで数分待ちます。
ex-pod-affinity
タスクの成功を確認します。
すべての構成
この例では、KubernetesPodOperator
で構成できるすべての変数が示されます。ex-all-configs
タスクを成功させるのに、コードを変更する必要はありません。
各変数について詳しくは、Airflow の KubernetesPodOperator
リファレンスをご覧ください。
DAG の管理
タスクのステータスの表示
- Airflow ウェブ インターフェースに移動します。
- DAG ページで、DAG 名(
composer_sample_kubernetes_pod
など)をクリックします。 - DAG の詳細ページで、[Graph View] をクリックします。
ステータスを確認します。
Failed: タスクの周囲に赤色のボックスが表示されます(例の
ex-kube-templates
)。ポインタをタスクの上に置いても、State: Failed と表示されます。Succeed: タスクの周囲に緑色のボックスが表示されます(例の
pod-ex-minimum
)。ポインタをタスクの上に置いても、State: Success と表示されます。
タスクログを確認する
- Airflow UI にタスクのステータスが表示されます。
- DAG のグラフビューで、タスク名をクリックします。
- タスク インスタンスのコンテキスト メニューで、[View Log] をクリックします。
タスクを再実行する
- DAG に戻るには、次の手順を行います。
- Airflow UI のツールバーで、[DAGs] をクリックします。
- DAG 名をクリックします(
composer_samples_kubernetes_pod
など)。
- タスクを再実行するには、次の手順を行います。
- タスク名をクリックします。
- [Clear] をクリックしてから、[OK] をクリックします。タスクが自動的に再実行されます。
トラブルシューティング
Pod の失敗をトラブルシューティングするヒント
タスクログの確認だけでなく、次のログも確認してください。
Airflow スケジューラと Airflow ワーカーの出力:
- Cloud Composer 環境の Cloud Storage バケットに移動します。 これは、DAG が配置されているバケットです。
logs/
DAG_NAME
/
TASK_ID
/
EXECUTION_DATE
のログを確認します。
GKE ワークロードに表示される Cloud Console の詳細な Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。
GKEPodOperator
も使用する場合は、ゼロ以外のコードも返されます。
KubernetesPodOperator
と GKEPodOperator
を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。
KubernetesPodOperator
と GKEPodOperator
を使用する際には、一般的にコンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化します。
このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e
コマンドを含めることをおすすめします。
Pod が成功してもタスクが失敗する
composer-1.4.1-airflow-*
以前を実行中の Cloud Composer 環境の場合は、次のとおりです。
Airflow タスクが 1 時間実行され、タスクログが kubernetes.client.rest.ApiException: (401)
と Reason: Unauthorized
で終わる場合、基盤となっている Kubernetes Job はその時点以降も実行を継続し、成功する可能性もあります。ただし、そのタスクは失敗としてレポートされます。
この問題を解決するには、kubernetes>=8.0.1
に PyPI パッケージの依存関係を明示的に追加します。
Pod のタイムアウト
KubernetesPodOperator
のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator
を作成する際に startup_timeout_seconds
パラメータを変更します。
Pod がタイムアウトすると、Airflow ウェブ UI にタスク固有のログが表示されます。例:
Executingon 2018-07-23 19:06:58.133811 Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py'] Event: pod-name-9a8e9d06 had an event of type Pending ... ... Event: pod-name-9a8e9d06 had an event of type Pending Traceback (most recent call last): File "/usr/local/bin/airflow", line 27, in args.func(args) File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run pool=args.pool, File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Composer サービス アカウントに必要な IAM 権限がない場合も、Pod のタイムアウトが発生する可能性があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Stackdriver Logging を使用します。
新しい接続の確立に失敗した
GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse
クラスタがアップグレードされているかどうかを確認するには、Cloud Console の GKE メニューに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。