Cloud Composer 1 | Cloud Composer 2
このページでは、KubernetesPodOperator
を使用して Kubernetes Pod を Cloud Composer から Cloud Composer 環境の一部である Google Kubernetes Engine クラスタにデプロイし、環境に適切なリソースを確保する方法について説明します。
KubernetesPodOperator
は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。
KubernetesPodOperator
は、以下を必要とする場合に適しています。
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Cloud Composer ワーカー イメージでは使用できないバイナリ依存関係。
このページでは、次の KubernetesPodOperator
構成を含む Airflow DAG の例について説明します。
- 最小の構成: 必要なパラメータのみを設定します。
- テンプレートの構成: Jinja でテンプレートを作成できるパラメータを使用します。
Secret 変数の構成: Kubernetes Secret オブジェクトを Pod に渡します。
Pod アフィニティの構成は Cloud Composer 2 では使用できません。代わりに、GKE 演算子を使用して別のクラスタで Pod を起動します。
すべての構成: すべての構成が含まれます。
始める前に
Cloud Composer 2 では、環境のクラスタは自動的にスケーリングされます。
KubernetesPodOperator
を使用して実行する追加のワークロードは、環境とは独立してスケーリングされます。 環境は、リソース需要の増加による影響を受けませんが、環境のクラスタは、リソース需要に応じてスケールアップおよびスケールダウンされます。環境のクラスタで実行する追加のワークロードの料金は Cloud Composer 2 料金モデルに従い、Cloud Composer Compute SKU を使用します。Cloud Composer 2 は Workload Identity で GKE クラスタを使用します。デフォルトでは、新しく作成された名前空間または
composer-user-workloads
名前空間で実行される Pod は Google Cloud リソースにアクセスできません。Workload Identity を使用する場合は、名前空間に関連付けられた Kubernetes サービス アカウントを Google Cloud サービス アカウントにマッピングして、Google API やその他のサービスへのリクエストに対してサービス ID の承認を有効にする必要があります。このため、環境のクラスタ内で
composer-user-workloads
名前空間または新しく作成された名前空間で Pod を実行すると、Kubernetes と Google Cloud サービス アカウント間の適切な IAM バインディングが作成されず、これらの Pod は、Google Cloud プロジェクトのリソースにアクセスできません。Pod が Google Cloud リソースにアクセスできるようにするには、後で説明するように、
composer-user-workloads
名前空間を使用するか、独自の名前空間を作成します。プロジェクトのリソースへのアクセス権を付与するには、Workload Identity のガイダンスに沿って、次のようにバインディングを設定します。
- 環境のクラスタに別の名前空間を作成します。
composer-user-workloads/<namespace_name>
Kubernetes サービス アカウントと環境のサービス アカウントの間にバインディングを作成します。- 環境のサービス アカウント アノテーションを Kubernetes サービス アカウントに追加します。
KubernetesPodOperator
を使用する場合は、namespace
パラメータとservice_account_name
パラメータで名前空間と Kubernetes サービス アカウントを指定します。
Cloud Composer 2 は Workload Identity で GKE クラスタを使用します。GKE メタデータ サーバーが新しく作成された Pod でのリクエストの承認を開始するまでに数秒かかります。したがって、Pod の有効期間の最初の数秒以内に Workload Identity を使用して認証を試みると、失敗する可能性があります。この制限について詳しくは、Workload Identity の制限事項をご覧ください。
Cloud Composer 2 では、コンピューティング クラスの概念が導入されている Autopilot クラスタを使用します。
デフォルトでは、クラスが選択されていない場合、
KubernetesPodOperator
を使用して Pod を作成する際にはgeneral-purpose
クラスが想定されます。各クラスは特定のプロパティとリソース制限に関連付けられています。それらの詳細については、Autopilot のドキュメントをご覧ください。たとえば、
general-purpose
クラス内で実行される Pod では、最大 110 GiB のメモリを使用できます。
CNCF Kubernetes プロバイダのバージョン 5.0.0 を使用している場合は、CNCF Kubernetes プロバイダ セクションに記載されている手順に沿って操作します。
KubernetesPodOperator の構成
この例の操作を行うためには、kubernetes_pod_operator.py
ファイル全体をご使用の環境の dags/
フォルダに配置するか、関連する KubernetesPodOperator
コードを DAG に追加します。
次のセクションでは、この例のそれぞれの KubernetesPodOperator
構成について説明します。各構成変数について詳しくは、Airflow リファレンスをご覧ください。
構成を最小限にしたい
KubernetesPodOperator
の作成に必要なものは、Pod の name
、Pod を実行する namespace
、使用する image
、task_id
のみです。
次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config
のデフォルトが構成に使用されます。pod-ex-minimum
タスクを成功させるのに、コードを変更する必要はありません。
テンプレートの構成
Airflow は Jinja テンプレートの使用をサポートしています。必須変数(task_id
、name
、namespace
、image
など)を演算子を使用して宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds
、arguments
、env_vars
、config_file
など)をテンプレート化できます。
DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates
タスクが失敗します。ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value
)。2 番目のエラーは最初のエラーの修正後に取得できますが、config
で core/kube_config
が見つからないためタスクが失敗していることを示しています。
両方のエラーを修正するには、以下で説明されている手順を行います。
my_value
と gcloud
または Airflow UI を設定するには、次の手順を行います。
Airflow UI
Airflow 2 UI で:
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[List Variable] ページで、[新しいレコードを追加する] をクリックします。
[Add Variable] ページで、次の情報を入力します。
- Key:
my_value
- Val:
example_value
- Key:
[保存] をクリックします。
gcloud
Airflow 2 の場合は、次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
次のように置き換えます。
ENVIRONMENT
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
カスタム config_file
(Kubernetes 構成ファイル)を参照するには、kube_config
Airflow 構成オプションを有効な Kubernetes 構成にオーバーライドします。
セクション | キー | 値 |
---|---|---|
core |
kube_config |
/home/airflow/composer_kube_config |
環境が更新されるまで数分待ちます。次に、ex-kube-templates
タスクを再度実行し、ex-kube-templates
タスクが成功したことを確認します。
Secret 変数の構成
Kubernetes Secret は、センシティブ データを含むオブジェクトです。Secret を Kubernetes Pod に渡すには、KubernetesPodOperator
を使用します。
Kubernetes で Secret を定義してください。Secret を定義しない場合、Pod の起動に失敗します。
この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。
最初の Secret である airflow-secrets
は、SQL_CONN
という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。
2 番目の Secret である service-account
は、サービス アカウント トークンを含むファイルである service-account.json
を /var/secrets/google
にマウントします。
Secret は次のようになります。
最初の Kubernetes Secret の名前は、secret
変数で定義されます。この特定の Secret の名前は、airflow-secrets
です。この Secret は、環境変数として公開されるように deploy_type
で指定されます。この Secret で deploy_target
に設定される環境変数は、SQL_CONN
です。最後に、deploy_target
に保管される Secret の key
は sql_alchemy_conn
です。
2 番目の Kubernetes Secret の名前は、secret
変数で定義されています。この特定の Secret の名前は、service-account
です。deploy_type
で指定されるように、ボリュームとして公開されます。マウントするファイルのパス deploy_target
は /var/secrets/google
です。最後に、deploy_target
に保管される Secret の key
は service-account.json
です。
演算子の構成は、次のようになります。
DAG または環境を変更しない場合、ex-kube-secrets
タスクは失敗します。ログを確認すると、Pod took too long to start
エラーのためタスクが失敗しています。このエラーは、構成(secret_env
)で指定された Secret が見つからない場合に発生します。
gcloud
gcloud
を使用して Secret を設定するには、次の手順を行います。
Cloud Composer 環境クラスタに関する情報を取得します。
次のコマンドを実行します。
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
次のように置き換えます。
ENVIRONMENT
は、環境の名前です。LOCATION
は、Cloud Composer 環境が配置されているリージョンです。
このコマンドの出力では、
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
の形式が使用されます。GKE クラスタ ID を取得するには、
/clusters/
の後に出力(-gke
で終わるもの)をコピーします。
次のコマンドを実行して GKE クラスタに接続します。
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
次のように置き換えます。
CLUSTER_ID
は GKE クラスタ ID です。PROJECT
は、Google Cloud プロジェクトの ID です。LOCATION
は、Cloud Composer 環境が配置されているリージョンです。
Kubernetes Secret を作成します。
次のコマンドを実行して、
sql_alchemy_conn
~test_value
の値を設定する Kubernetes Secret を作成します。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
次のコマンドを実行して、
service-account.json
の値をkey.json
というサービス アカウント キーファイルのローカルパスに設定する Kubernetes Secret を作成します。kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
Secret を設定した後、Airflow UI でもう一度
ex-kube-secrets
タスクを実行します。ex-kube-secrets
タスクの成功を確認します。
すべての構成
この例では、KubernetesPodOperator
で構成できるすべての変数が示されます。ex-all-configs
タスクを成功させるのに、コードを変更する必要はありません。
各変数について詳しくは、Airflow の KubernetesPodOperator
リファレンスをご覧ください。
CNCF Kubernetes プロバイダに関する情報
GKEStartPodOperator と KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes
プロバイダ内に実装されます。
CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。
バージョン 6.0.0
CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperator
で kubernetes_default
接続がデフォルトで使用されます。
バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default
接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。
バージョン 5.0.0
このバージョンでは、バージョン 4.4.0 と比較して、下位互換性のないいくつかの変更が導入されています。最も重要な点は、バージョン 5.0.0 で使用されない kubernetes_default
接続に関連したものです。
kubernetes_default
接続を変更する必要があります。Kube 構成パスを/home/airflow/composer_kube_config
に設定(図 1 を参照)するか、config_file
をKubernetesPodOperator
構成に追加(次のコード例を参照)する必要があります。
- 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。
トラブルシューティング
Pod の失敗をトラブルシューティングするヒント
Airflow UI でタスクログを確認するだけでなく、次のログも確認してください。
Airflow スケジューラと Airflow ワーカーの出力:
Google Cloud Console で [環境] ページに移動します。
ご使用の環境の [DAG] リンクをクリックします。
環境のバケットで、1 つ上のレベルに移動します。
logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
フォルダのログを確認します。
GKE ワークロードに表示される Google Cloud コンソールの詳細な Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。
GKEStartPodOperator
も使用する場合は、ゼロ以外のコードも返されます。
KubernetesPodOperator
と GKEStartPodOperator
を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。
KubernetesPodOperator
と GKEStartPodOperator
を使用する際には、一般的にコンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化します。
このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e
コマンドを含めることをおすすめします。
Pod のタイムアウト
KubernetesPodOperator
のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator
を作成する際に startup_timeout_seconds
パラメータを変更します。
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
新しい接続の確立に失敗した
GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
クラスタがアップグレードされているかどうかを確認するには、Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。