Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
本頁說明如何使用 KubernetesPodOperator,將 Cloud Composer 中的 Kubernetes Pod 部署到 Cloud Composer 環境的 Google Kubernetes Engine 叢集。
KubernetesPodOperator 會在環境的叢集中啟動 Kubernetes Pod。相較之下,Google Kubernetes Engine 運算子會在指定叢集中執行 Kubernetes Pod,這個叢集可能與您的環境無關。您也可以使用 Google Kubernetes Engine 運算子建立及刪除叢集。
如果您需要下列功能,KubernetesPodOperator 會是不錯的選擇:
- 無法透過公開 PyPI 存放區取得的自訂 Python 依附元件。
- Cloud Composer 工作站映像檔中沒有的二進位依附元件。
事前準備
如果使用 CNCF Kubernetes Provider 5.0.0 版,請按照這節的操作說明進行。
Cloud Composer 2 不提供 Pod 親和性設定。如要使用 Pod 親和性,請改用 GKE 運算子在其他叢集中啟動 Pod。
Cloud Composer 2 中的 KubernetesPodOperator 簡介
本節說明 KubernetesPodOperator 在 Cloud Composer 2 中的運作方式。
資源使用情況
在 Cloud Composer 2 中,環境的叢集會自動調度資源。您使用 KubernetesPodOperator 執行的額外工作負載,會與環境分開擴充。
環境不會受到資源需求增加的影響,但環境的叢集會根據資源需求擴大或縮小。
在環境叢集中執行的額外工作負載,其定價會採用 Cloud Composer 2 定價模式,並使用 Cloud Composer 運算 SKU。
Cloud Composer 2 使用 Autopilot 叢集,並導入運算級別的概念:
Cloud Composer 僅支援
general-purpose
運算類別。根據預設,如果未選取任何類別,使用 KubernetesPodOperator 建立 Pod 時,系統會假設為
general-purpose
類別。每個類別都與特定屬性和資源限制相關聯,詳情請參閱 Autopilot 說明文件。舉例來說,在
general-purpose
類別中執行的 Pod 最多可使用 110 GiB 的記憶體。
專案資源的存取權
Cloud Composer 2 使用 GKE 叢集,並搭配 Workload Identity Federation for GKE。在 composer-user-workloads
命名空間中執行的 Pod 可以存取專案中的資源, Google Cloud 不必進行額外設定。環境的服務帳戶會用於存取這些資源。
如要使用自訂命名空間,與這個命名空間相關聯的 Kubernetes 服務帳戶必須對應至 Google Cloud 服務帳戶,才能為傳送至 Google API 和其他服務的要求啟用服務身分授權。如果您在環境的叢集中,以自訂命名空間執行 Pod,系統就不會建立 Kubernetes 與Google Cloud 服務帳戶之間的 IAM 繫結,這些 Pod 也無法存取專案的 Google Cloud 資源。
如果您使用自訂命名空間,且希望 Pod 存取Google Cloud 資源,請按照「適用於 GKE 的工作負載身分聯盟」中的指引,為自訂命名空間設定繫結:
- 在環境的叢集中建立獨立的命名空間。
- 在自訂命名空間 Kubernetes 服務帳戶和環境的服務帳戶之間建立繫結。
- 將環境的服務帳戶註解新增至 Kubernetes 服務帳戶。
- 使用 KubernetesPodOperator 時,請在
namespace
和service_account_name
參數中指定命名空間和 Kubernetes 服務帳戶。
希望將設定需求減至最低
如要建立 KubernetesPodOperator,只需要 Pod 的 name
、要使用的 image
和 task_id
參數。/home/airflow/composer_kube_config
包含向 GKE 進行驗證的憑證。
額外設定
這個範例顯示您可以在 KubernetesPodOperator 中設定的其他參數。
詳情請參閱下列資源:
如要瞭解如何使用 Kubernetes Secret 和 ConfigMap,請參閱「使用 Kubernetes Secret 和 ConfigMap」。
如要瞭解如何搭配 KubernetesPodOperator 使用 Jinja 範本,請參閱「使用 Jinja 範本」。
如要瞭解 KubernetesPodOperator 參數,請參閱 Airflow 說明文件中的運算子參考資料。
使用 Jinja 範本
Airflow 支援 DAG 中的 Jinja 範本。
您必須使用運算子宣告必要的 Airflow 參數 (task_id
、name
和 image
)。如下列範例所示,您可以使用 Jinja 為所有其他參數建立範本,包括 cmds
、arguments
、env_vars
和 config_file
。
範例中的 env_vars
參數是從名為 my_value
的 Airflow 變數設定。範例 DAG 會從 Airflow 的 vars
範本變數取得值。Airflow 具有更多變數,可存取不同類型的資訊。舉例來說,您可以使用 conf
範本變數存取 Airflow 設定選項的值。如要進一步瞭解 Airflow 中可用的變數清單,請參閱 Airflow 說明文件中的「範本參考資料」。
在不變更 DAG 或建立 env_vars
變數的情況下,範例中的 ex-kube-templates
工作會失敗,因為變數不存在。在 Airflow 使用者介面或使用 Google Cloud CLI 建立這個變數:
Airflow UI
前往 Airflow UI。
在工具列中,依序選取「管理」>「變數」。
在「List Variable」(清單變數) 頁面中,按一下「Add a new record」(新增記錄)。
在「新增變數」頁面中輸入下列資訊:
- 按鍵:
my_value
- Val:
example_value
- 按鍵:
按一下 [儲存]。
gcloud
輸入下列指令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
取代:
- 將
ENVIRONMENT
替換為環境的名稱。 LOCATION
改成環境所在的地區。
以下範例說明如何搭配 KubernetesPodOperator 使用 Jinja 範本:
使用 Kubernetes Secret 和 ConfigMap
Kubernetes Secret 是包含機密資料的物件。Kubernetes ConfigMap 是一個物件,包含鍵/值組合形式的非機密資料。
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform 建立 Secret 和 ConfigMap,然後從 KubernetesPodOperator 存取這些項目。
關於 YAML 設定檔
使用 Google Cloud CLI 和 API 建立 Kubernetes Secret 或 ConfigMap 時,您會提供 YAML 格式的檔案。這個檔案的格式必須與 Kubernetes Secret 和 ConfigMap 相同。Kubernetes 說明文件提供許多 ConfigMap 和 Secret 的程式碼範例。如要開始使用,請參閱「使用 Secret 安全地發布憑證」頁面和 ConfigMaps。
與 Kubernetes Secret 相同,在 Secret 中定義值時,請使用 base64 表示法。
如要編碼值,可以使用下列指令 (這是取得 Base64 編碼值的其中一種方式):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
輸出:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
本指南稍後的範例會使用下列兩個 YAML 檔案。 Kubernetes Secret 的 YAML 設定檔範例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
另一個範例,說明如何納入檔案。與上一個範例相同,請先編碼檔案內容 (cat ./key.json | base64
),然後在 YAML 檔案中提供這個值:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap 的 YAML 設定檔範例。您不需要在 ConfigMap 中使用 base64 表示法:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
管理 Kubernetes Secret
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl
建立密鑰:
取得環境叢集的相關資訊:
執行下列指令:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
取代:
ENVIRONMENT
改為您的環境名稱。LOCATION
,並將其替換為 Cloud Composer 環境所在的地區。
這個指令的輸出格式如下:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
。如要取得 GKE 叢集 ID,請複製
/clusters/
後的輸出內容 (結尾為-gke
)。
使用下列指令連線至 GKE 叢集:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
更改下列內容:
CLUSTER_ID
:環境的叢集 ID。PROJECT_ID
:專案 ID。LOCATION
:環境所在的區域。
建立 Kubernetes Secret:
下列指令示範了建立 Kubernetes Secret 的兩種不同方法。
--from-literal
方法會使用鍵/值組合。--from-file
方法會使用檔案內容。如要提供鍵/值組合來建立 Kubernetes Secret,請執行下列指令。這個範例會建立名為
airflow-secrets
的 Secret,其中包含sql_alchemy_conn
欄位,值為test_value
。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
如要提供檔案內容來建立 Kubernetes Secret,請執行下列指令。這個範例會建立名為
service-account
的 Secret,其中的service-account.json
欄位值取自本機./key.json
檔案的內容。kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
在 DAG 中使用 Kubernetes Secret
這個範例說明 Kubernetes Secret 的兩種用法:做為環境變數,以及做為 Pod 掛接的磁碟區。
第一個密鑰 airflow-secrets
會設為名為 SQL_CONN
的 Kubernetes 環境變數 (而非 Airflow 或 Cloud Composer 環境變數)。
第二個 Secret service-account
會將含有服務帳戶權杖的檔案 service-account.json
掛載至 /var/secrets/google
。
Secret 物件如下所示:
第一個 Kubernetes Secret 的名稱是在 secret_env
變數中定義。這個 Secret 名稱為 airflow-secrets
。deploy_type
參數會指定必須以環境變數形式公開。環境變數的名稱為 SQL_CONN
,如 deploy_target
參數所指定。最後,SQL_CONN
環境變數的值會設為 sql_alchemy_conn
鍵的值。
第二個 Kubernetes Secret 的名稱定義於 secret_volume
變數中。這個 Secret 名稱為 service-account
。如 deploy_type
參數所指定,這會公開為磁碟區。要掛接的檔案路徑 deploy_target
為 /var/secrets/google
。最後,儲存在 deploy_target
中的 Secret 的 key
為 service-account.json
。
運算子設定如下所示:
CNCF Kubernetes 供應商相關資訊
KubernetesPodOperator 會在 apache-airflow-providers-cncf-kubernetes
provider 中實作。
如需 CNCF Kubernetes 供應商的詳細版本資訊,請參閱 CNCF Kubernetes 供應商網站。
6.0.0 版
在 CNCF Kubernetes Provider 套件 6.0.0 版中,KubernetesPodOperator 預設會使用 kubernetes_default
連線。
如果您在 5.0.0 版中指定了自訂連線,運算子仍會使用這個自訂連線。如要改回使用 kubernetes_default
連線,可能需要相應調整 DAG。
5.0.0 版
與 4.4.0 版相比,這個版本導入了幾項不相容的變更。其中最重要的項目與 5.0.0 版未使用的 kubernetes_default
連線有關。
kubernetes_default
連線需要修改。Kubernetes 設定路徑必須設為/home/airflow/composer_kube_config
(如下圖所示)。或者,您必須將config_file
新增至 KubernetesPodOperator 設定 (如下列程式碼範例所示)。

- 以 KubernetesPodOperator 修改工作程式碼的方式如下:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
如要進一步瞭解 5.0.0 版,請參閱 CNCF Kubernetes 供應商版本資訊。
疑難排解
本節提供建議,協助您排解常見的 KubernetesPodOperator 問題:
查看記錄
排解問題時,您可以依下列順序檢查記錄:
Airflow 工作記錄:
前往 Google Cloud 控制台的「Environments」頁面。
在環境清單中,按一下環境名稱。 「環境詳細資料」頁面隨即開啟。
前往「DAG」分頁。
按一下 DAG 名稱,然後按一下 DAG 執行作業,即可查看詳細資料和記錄。
Airflow 排程器記錄:
前往「環境詳細資料」頁面。
前往「記錄」分頁。
檢查 Airflow 排程器記錄。
Google Cloud 控制台中的 Pod 記錄檔 (位於 GKE 工作負載下方)。這些記錄包括 Pod 定義 YAML 檔案、Pod 事件和 Pod 詳細資料。
非零的傳回代碼
使用 KubernetesPodOperator (和 GKEStartPodOperator) 時,容器進入點的回傳代碼會決定工作是否視為成功。非零回傳代碼表示失敗。
常見模式是將殼層指令碼做為容器進入點執行,以便在容器內將多項作業歸為一組。
如果您要編寫這類指令碼,建議在指令碼頂端加入 set -e
指令,這樣一來,指令碼中失敗的指令就會終止指令碼,並將失敗傳播至 Airflow 工作執行個體。
Pod 逾時
KubernetesPodOperator 的預設逾時時間為 120 秒,這可能會導致系統在下載較大的映像檔前逾時。建立 KubernetesPodOperator 時,您可以變更 startup_timeout_seconds
參數來延長逾時時間。
Pod 超時時,您可以在 Airflow UI 中查看特定工作記錄。例如:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
如果 Cloud Composer 服務帳戶缺少執行目前工作所需的 IAM 權限,也可能發生 Pod 超時問題。如要驗證這點,請使用 GKE 資訊主頁查看 Pod 層級的錯誤,找出特定工作負載的記錄,或使用 Cloud Logging。
無法建立新連線
GKE 叢集預設會啟用自動升級功能。 如果節點集區位於升級中的叢集,您可能會看到下列錯誤:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
如要檢查叢集是否正在升級,請在 Google Cloud 控制台前往「Kubernetes clusters」(Kubernetes 叢集) 頁面,然後查看環境叢集名稱旁邊是否有載入圖示。