Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
このページでは、KubernetesPodOperator を使用して、Cloud Composer から Cloud Composer 環境の一部である Google Kubernetes Engine クラスタに Kubernetes Pod をデプロイする方法について説明します。
KubernetesPodOperator は、使用中の環境のクラスタで Kubernetes Pod を起動します。これに対し、Google Kubernetes Engine 演算子は、指定されたクラスタ内で Kubernetes Pod を実行します。このクラスタは、使用中の環境とは関係がない別のクラスタであっても構いません。Google Kubernetes Engine 演算子を使用してクラスタを作成、削除することもできます。
KubernetesPodOperator は、以下を必要とする場合に適しています。
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Cloud Composer ワーカーのストック画像では使用できないバイナリ依存関係。
始める前に
- Cloud Composer の最新バージョンを使用することをおすすめします。このバージョンは、少なくとも非推奨とサポート ポリシーの一部としてサポートされている必要があります。
- 環境に十分なリソースが存在することを確認してください。リソースが不足している Pod を環境で起動すると、Airflow ワーカーと Airflow スケジューラでエラーが発生する可能性があります。
Cloud Composer 環境のリソースを設定する
Cloud Composer 環境を作成する際には、環境のクラスタのパフォーマンス パラメータなどのパフォーマンス パラメータを指定します。Kubernetes Pod を環境クラスタで起動すると、CPU やメモリなどのクラスタ リソースの競合が発生する可能性があります。Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しません。
リソースの不足を防ぐには、次の操作を行います。
ノードプールを作成
Cloud Composer 環境でリソースの不足を防ぐには、新しいノードプールを作成し、そのプールからのリソースのみを使用して実行するように Kubernetes Pod を構成します。
コンソール
gcloud
環境のクラスタの名前を決定します。
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
以下のように置き換えます。
ENVIRONMENT_NAME
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
出力には、環境のクラスタの名前が含まれます。例:
europe-west3-example-enviro-af810e25-gke
ノードプールの追加の説明に沿って、ノードプールを作成します。
環境内のノード数を増やす
Cloud Composer 環境でノード数を増やすと、ワークロードが使用できるコンピューティング能力が増加します。指定したマシンタイプに用意されているよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加のリソースは割り当てられません。
ノード数を増やすには、環境を更新してください。
適切なマシンタイプを指定する
Cloud Composer 環境の作成の際に、マシンタイプを指定できます。使用可能なリソースを確保するには、Cloud Composer 環境で行うコンピューティングの種類のマシンタイプを指定します。
構成を最小限にしたい
KubernetesPodOperator の作成に必要になるのは、Pod の name
、使用する image
、task_id
パラメータのみです。/home/airflow/composer_kube_config
には、GKE の認証に使用する認証情報が含まれています。
Airflow 2
Airflow 1
Pod アフィニティの構成
KubernetesPodOperator で affinity
パラメータを構成する際に、Pod のスケジュールを設定する対象ノードを制御します(特定のノードプール内のノードなど)。この例では、演算子は pool-0
や pool-1
という名前のノードプールでのみ実行されます。Cloud Composer 1 環境ノードが default-pool
にあるため、Pod は環境内のノードで実行されません。
Airflow 2
Airflow 1
例のように構成されている場合、タスクは失敗します。ログを確認すると、ノードプール pool-0
と pool-1
が存在しないためタスクが失敗しています。
values
のノードプールが存在することを確認するために、次のいずれかの構成の変更を行います。
ノードプールを以前に作成した場合は、
pool-0
とpool-1
をノードプールの名前に置き換えてからもう一度 DAG をアップロードします。pool-0
またはpool-1
という名前のノードプールを作成します。両方のノードプールを作成できますが、タスクを成功させるために必要なのは 1 つのみです。pool-0
とpool-1
をdefault-pool
に置き換えます。これは、Airflow が使用するデフォルトのプールです。もう一度 DAG をアップロードします。
変更を加えた後、環境が更新されるまで数分待ちます。次に、ex-pod-affinity
タスクをもう一度実行し、ex-pod-affinity
タスクが成功したことを確認します。
追加構成
この例では、KubernetesPodOperator で構成できる追加のパラメータを示します。
パラメータの詳細については、KubernetesPodOperator の Airflow リファレンスをご覧ください。Kubernetes Secret と ConfigMap の使用方法については、Kubernetes Secret と ConfigMap を使用するをご覧ください。KubernetesPodOperator で Jinja テンプレートを使用する方法については、Jinja テンプレートを使用するをご覧ください。
Airflow 2
Airflow 1
Jinja テンプレートを使用する
Airflow は、DAG で Jinja テンプレートをサポートしています。
必要な Airflow パラメータ(task_id
、name
、image
)を演算子で宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds
、arguments
、env_vars
、config_file
など)をテンプレート化できます。
この例の env_vars
パラメータは、my_value
という名前の Airflow 変数から設定されます。例の DAG は、Airflow の vars
テンプレート変数から値を取得します。Airflow には、さまざまな種類の情報にアクセスできる変数が他にもあります。たとえば、conf
テンプレート変数を使用して、Airflow 構成オプションの値にアクセスできます。詳細と Airflow で使用可能な変数のリストについては、Airflow ドキュメントのテンプレート リファレンスをご覧ください。
DAG を変更するか env_vars
変数を作成しないと、変数が存在しないため、例の ex-kube-templates
タスクは失敗します。この変数は、Airflow UI または Google Cloud CLI で作成します。
Airflow UI
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[List Variable] ページで、[新しいレコードを追加する] をクリックします。
[Add Variable] ページで、次の情報を入力します。
- Key:
my_value
- Val:
example_value
- Key:
[保存] をクリックします。
環境で Airflow 1 を使用している場合は、代わりに次のコマンドを実行します。
Airflow UI に移動します。
ツールバーで、[管理者] > [変数] を選択します。
[Variables] ページで、[Create] タブをクリックします。
[Variable] ページで、次の情報を入力します。
- Key:
my_value
- Val:
example_value
- Key:
[保存] をクリックします。
gcloud
次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
環境で Airflow 1 を使用している場合は、代わりに次のコマンドを実行します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
以下のように置き換えます。
ENVIRONMENT
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
次の例では、KubernetesPodOperator で Jinja テンプレートを使用する方法を示します。
Airflow 2
Airflow 1
Kubernetes Secret と ConfigMap を使用する
Kubernetes Secret は、センシティブ データを含むオブジェクトです。Kubernetes ConfigMap は、Key-Value ペアに非機密データを含むオブジェクトです。
Cloud Composer 2 では、Google Cloud CLI、API、Terraform を使用して Secret と ConfigMap を作成し、KubernetesPodOperator からそれらにアクセスできます。
YAML 構成ファイルについて
Google Cloud CLI と API を使用して Kubernetes Secret または ConfigMap を作成する場合は、YAML 形式のファイルを指定します。このファイルは、Kubernetes Secret と ConfigMap で使用される形式に従っている必要があります。Kubernetes ドキュメントには、ConfigMap と Secret の多くのコードサンプルが用意されています。使用を開始するには、Secret を使用して安全に認証情報を配布するページと ConfigMaps をご覧ください。
Kubernetes Secret の場合と同じように、Secret で値を定義する場合は base64 表現を使用します。
値をエンコードするには、次のコマンドを使用します(これは、Base64 でエンコードされた値を取得する多くの方法の一つです)。
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
出力:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
次の 2 つの YAML ファイルの例は、このガイドの後半のサンプルで使用します。Kubernetes Secret の YAML 構成ファイルの例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
ファイルを含める方法を示す別の例。前の例と同様に、まずファイルの内容(cat ./key.json | base64
)をエンコードしてから、YAML ファイルでこの値を指定します。
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap の YAML 構成ファイルの例。ConfigMap で base64 表現を使用する必要はありません。
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
Kubernetes Secret を管理する
Cloud Composer 2 では、Google Cloud CLI と kubectl
を使用して Secret を作成します。
環境のクラスタに関する情報を取得します。
次のコマンドを実行します。
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
以下のように置き換えます。
ENVIRONMENT
は、環境の名前です。LOCATION
は、Cloud Composer 環境が配置されているリージョンです。
このコマンドの出力では、
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
の形式が使用されます。GKE クラスタ ID を取得するには、
/clusters/
の後の出力(末尾は-gke
)をコピーします。ゾーンを取得するには、
/zones/
の後の出力をコピーします。
次のコマンドを使用して GKE クラスタに接続します。
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
以下のように置き換えます。
CLUSTER_ID
: 環境のクラスタ ID。PROJECT_ID
: プロジェクト IDZONE
は、環境のクラスタが配置されているゾーンに置き換えます。
Kubernetes Secret を作成します。
次のコマンドは、Kubernetes Secret を作成するための 2 つの方法を示しています。
--from-literal
の方法では、Key-Value ペアを使用します。--from-file
の方法では、ファイルの内容を使用します。Key-Value ペアを指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、
test_value
という値のsql_alchemy_conn
フィールドを持つairflow-secrets
という名前の Secret を作成します。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
ファイルの内容を指定して Kubernetes Secret を作成するには、次のコマンドを実行します。この例では、ローカル
./key.json
ファイルの内容から取得した値を持つservice-account.json
フィールドを持つservice-account
という名前の Secret を作成します。kubectl create secret generic service-account \ --from-file service-account.json=./key.json
DAG で Kubernetes Secret を使用する
この例では、Kubernetes Secret を使用する 2 つの方法を紹介します。環境変数としての方法と、Pod によってマウントされたボリュームによる方法です。
最初の Secret である airflow-secrets
は、SQL_CONN
という名前の Kubernetes の環境変数に設定されます(Airflow の環境変数や Cloud Composer の環境変数とは異なります)。
2 番目の Secret である service-account
は、サービス アカウント トークンを含むファイルである service-account.json
を /var/secrets/google
にマウントします。
Secret オブジェクトは次のようになります。
Airflow 2
Airflow 1
最初の Kubernetes Secret の名前は、secret_env
変数で定義されます。この Secret の名前は airflow-secrets
です。deploy_type
パラメータでは、環境変数として公開する必要があることを指定します。環境変数の名前は SQL_CONN
で、deploy_target
パラメータで指定されています。最後に、SQL_CONN
環境変数の値が sql_alchemy_conn
キーの値に設定されます。
2 番目の Kubernetes Secret の名前は、secret_volume
変数で定義されています。この Secret の名前は service-account
です。deploy_type
パラメータで指定されているように、ボリュームとして公開されます。マウントするファイルのパス deploy_target
は /var/secrets/google
です。最後に、deploy_target
に保管される Secret の key
は service-account.json
です。
演算子の構成は、次のようになります。
Airflow 2
Airflow 1
CNCF Kubernetes プロバイダに関する情報
KubernetesPodOperator は apache-airflow-providers-cncf-kubernetes
プロバイダに実装されています。
CNCF Kubernetes プロバイダ向けの詳細なリリースノートについては、CNCF Kubernetes プロバイダのウェブサイトをご覧ください。
バージョン 6.0.0
CNCF Kubernetes プロバイダ パッケージ バージョン 6.0.0 では、KubernetesPodOperator で kubernetes_default
接続がデフォルトで使用されます。
バージョン 5.0.0 でカスタム接続を指定した場合も、このカスタム接続はオペレータによって引き続き使用されます。kubernetes_default
接続を使用するように戻すには、DAG を適宜調整しなければならない場合があります。
バージョン 5.0.0
このバージョンでは、バージョン 4.4.0 と比較して下位互換性のない変更がいくつか導入されています。最も重要なのは、バージョン 5.0.0 で使用されない kubernetes_default
接続に関連した変更です。
kubernetes_default
接続を変更する必要があります。Kubernetes 構成パスは/home/airflow/composer_kube_config
に設定する必要があります(次の図を参照)。または、config_file
を KubernetesPodOperator 構成に追加する必要があります(次のコード例を参照)。
- 次の方法で KubernetesPodOperator を使用してタスクのコードを変更します。
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
バージョン 5.0.0 の詳細については、CNCF Kubernetes プロバイダ リリースノートをご覧ください。
トラブルシューティング
このセクションでは、KubernetesPodOperator の一般的な問題のトラブルシューティングに関するアドバイスを提供します。
ログを表示
問題のトラブルシューティング時には、次の順序でログを確認できます。
Airflow タスクログ:
Google Cloud コンソールで [環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
DAG の名前をクリックし、DAG 実行をクリックして詳細とログを表示します。
Airflow スケジューラ ログ:
[環境の詳細] ページに移動します。
[ログ] タブに移動します。
Airflow スケジューラ ログを調べます。
Google Cloud コンソールで GKE ワークロードの下にある Pod ログ。これらのログには、Pod 定義 YAML ファイル、Pod イベント、および Pod の詳細が含まれます。
ゼロ以外の戻りコード
KubernetesPodOperator(および GKEStartPodOperator)を使用する場合、コンテナのエントリ ポイントの戻りコードによって、タスクが成功したかどうかを判断できます。ゼロ以外の戻りコードは失敗を示します。
一般的なパターンは、コンテナのエントリ ポイントとしてシェル スクリプトを実行して、コンテナ内の複数のオペレーションをグループ化することです。
このようなスクリプトを記述する場合は、スクリプト内のコマンドが失敗した場合にスクリプトを終了し Airflow タスク インスタンスに失敗を伝播させるために、スクリプトの先頭に set -e
コマンドを含めることをおすすめします。
Pod のタイムアウト
KubernetesPodOperator のデフォルトのタイムアウトは 120 秒であるため、サイズの大きなイメージをダウンロードする前にタイムアウトが発生する場合があります。タイムアウトの時間を長くするには、KubernetesPodOperator を作成する際に startup_timeout_seconds
パラメータを変更します。
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
新しい接続の確立に失敗した
GKE クラスタでは、自動アップグレードがデフォルトで有効になっています。 アップグレード中のクラスタにノードプールが存在する場合、次のエラーが表示される場合があります。
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
クラスタがアップグレードされているかどうかを確認するには、Google Cloud コンソールで [Kubernetes クラスタ] ページに移動し、環境のクラスタ名の横にある読み込みアイコンを確認します。