Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页面介绍了如何使用 KubernetesPodOperator 来部署 Kubernetes Pod 从 Cloud Composer 迁移到 Google Kubernetes Engine 属于您的 Cloud Composer 环境
KubernetesPodOperator 会在您环境的集群中启动 Kubernetes pod。相比之下, Google Kubernetes Engine 操作器在您的 集群可以是与您的集群无关的单独集群 环境您还可以使用 Google Kubernetes Engine 运算符。
如果您需要以下内容,KubernetesPodOperator 是一个不错的选择:
- 无法通过公共 PyPI 代码库获得的自定义 Python 依赖项。
- 无法通过现有 Cloud Composer 工作器映像获得的二进制依赖项。
准备工作
如果使用的是 CNCF Kubernetes Provider 版本 5.0.0,请按照“CNCF Kubernetes Provider”部分中记录的说明操作。
Cloud Composer 2 不提供 Pod 亲和性配置。如果您想 要使用 Pod 亲和性,请使用 GKE 操作器来启动 Pod 其他集群中
关于 Cloud Composer 2 中的 KubernetesPodOperator
本部分介绍 KubernetesPodOperator 在 Cloud Composer 2 中的工作原理。
资源使用情况
在 Cloud Composer 2 中,您的环境的集群会自动扩缩。使用 KubernetesPodOperator 运行的额外工作负载会独立于您的环境进行扩缩。
您的环境不受资源需求增加的影响,但环境的集群会根据资源需求进行扩缩。
您在环境的集群中运行的额外工作负载的价格遵循 Cloud Composer 2 价格模式并使用 Cloud Composer 计算 SKU。
Cloud Composer 2 使用的 Autopilot 集群引入了这一概念 包括以下类别的计算类:
Cloud Composer 仅支持
general-purpose
计算类。默认情况下,如果未选择任何类,则在您使用 KubernetesPodOperator 创建 Pod 时,系统会假定
general-purpose
类。每个类都与特定的属性和资源限制相关联,您可以在 Autopilot 文档中了解它们。对于 例如,在
general-purpose
类中运行的 Pod 最多可以使用 GiB 内存。
对项目资源的访问权限
Cloud Composer 2 使用 GKE 集群,
适用于 GKE 的工作负载身份联合。在 composer-user-workloads
中运行的 Pod
命名空间可以访问您项目中的 Google Cloud 资源,
额外配置您的环境的服务账号
用于访问这些资源。
如果您想使用自定义命名空间,则必须将与此命名空间关联的 Kubernetes 服务账号映射到 Google Cloud 服务账号,以便为对 Google API 和其他服务的请求启用服务身份授权。如果您在环境集群的自定义命名空间中运行 Pod,则系统不会在 Kubernetes 和 Google Cloud 服务账号之间创建 IAM 绑定,并且这些 Pod 无法访问您的 Google Cloud 项目的资源。
如果您使用自定义命名空间,并且希望您的 Pod 能够访问 Google Cloud 资源,请按照适用于 GKE 的工作负载身份联合中的指南操作,为自定义命名空间设置绑定:
- 在您的环境集群中创建单独的命名空间。
- 在自定义命名空间 Kubernetes 服务账号之间建立绑定 和环境的服务账号。
- 将您的环境的服务账号注释添加到 Kubernetes 服务账号。
- 在使用 KubernetesPodOperator 时,请指定命名空间和
namespace
和service_account_name
中的 Kubernetes 服务账号 参数。
最少的配置工作量
如需创建 KubernetesPodOperator,只能使用 Pod 的 name
、image
和
task_id
参数为必填项。/home/airflow/composer_kube_config
包含用于向 GKE 进行身份验证的凭据。
其他配置
此示例展示了您可以在 KubernetesPodOperator 中配置的其他参数。
如需详细了解参数,请参阅 KubernetesPodOperator 的 Airflow 参考文档。如需了解如何使用 Kubernetes Secret 和 ConfigMap,请参阅使用 Kubernetes Secret 和 ConfigMap。如需了解如何将 Jinja 模板与 KubernetesPodOperator 搭配使用,请参阅使用 Jinja 模板。
使用 Jinja 模板
Airflow 支持在 DAG 中使用 Jinja 模板。
您必须声明必需的 Airflow 参数(task_id
、name
和
image
)。如以下示例所示,您可以搭配使用 Jinja 和所有其他参数生成模板,包括 cmds
、arguments
、env_vars
和 config_file
。
示例中的 env_vars
参数是从
名为 my_value
的 Airflow 变量。示例 DAG
从 Airflow 中的 vars
模板变量获取值。Airflow 提供了更多可用于访问不同类型信息的变量。例如,您可以使用 conf
模板变量来访问 Airflow 配置选项的值。如需了解详情和
Airflow 中可用的变量列表,请参阅
Airflow 中的模板参考文档
文档。
在不更改 DAG 或创建 env_vars
变量的情况下,
示例中的 ex-kube-templates
任务失败,因为该变量
存在。在 Airflow 界面中或使用 Google Cloud CLI 创建以下变量:
Airflow 界面
转到 Airflow 界面。
在工具栏中,依次选择 Admin > Variables。
在 List Variable 页面上,点击 Add a new record。
在 Add Variable 页面上,输入以下信息:
- Key:
my_value
- Val:
example_value
- Key:
点击保存。
gcloud
输入以下命令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
您需要将其中的:
ENVIRONMENT
替换为环境的名称。LOCATION
替换为环境所在的区域。
以下示例演示了如何将 Jinja 模板与 KubernetesPodOperator:
使用 Kubernetes Secret 和 ConfigMap
Kubernetes Secret 是包含敏感数据的对象。Kubernetes ConfigMap 对象包含 键值对中的非机密数据。
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform,然后通过 KubernetesPodOperator 即可实现。
YAML 配置文件简介
当您使用 Google Cloud CLI 创建 Kubernetes Secret 或 ConfigMap 时, API 时,您需要提供 YAML 格式的文件。此文件必须采用与 Kubernetes Secret 和 ConfigMap 相同的格式。Kubernetes 文档提供了许多 ConfigMap 和 Secret 的代码示例。首先,您可以 请参阅 使用 Secret 安全地分发凭据 页面和 ConfigMaps。
与 Kubernetes Secret 中相同,在 Secret 中定义值时,请使用 base64 表示法。
要对某个值进行编码,您可以使用以下命令(这也是多种方法之一, 以获取 base64 编码的值):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
输出:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
以下两个 YAML 文件示例将在本指南后面的示例中使用。 Kubernetes Secret 的 YAML 配置文件示例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
另一个示例演示了如何包含文件。与上一个示例一样,先对文件内容 (cat ./key.json | base64
) 进行编码,然后在 YAML 文件中提供此值:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap 的 YAML 配置文件示例。您无需在 ConfigMap 中使用 base64 表示法:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
管理 Kubernetes Secret
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl
创建 Secret:
获取有关环境集群的信息:
运行以下命令:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
替换:
- 将
ENVIRONMENT
替换为您的环境名称。 - 将
LOCATION
替换为 Cloud Composer 环境所在的区域。
此命令的输出使用以下格式:
projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id>
。- 将
为了获取 GKE 集群 ID,请复制
/clusters/
后面的输出内容(以-gke
结尾)。
使用以下命令连接到您的 GKE 集群:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --region LOCATION
替换以下内容:
CLUSTER_ID
:环境的集群 ID。PROJECT_ID
:项目 ID。LOCATION
:环境所在的区域。
创建 Kubernetes Secret:
以下命令演示了创建 Kubernetes Secret 的两种不同方法。
--from-literal
方法使用键值对。--from-file
方法使用文件内容。如需通过提供键值对来创建 Kubernetes Secret,请运行 。此示例创建了一个名为
airflow-secrets
的 Secret,其中包含一个值为test_value
的sql_alchemy_conn
字段。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
如需通过提供文件内容创建 Kubernetes Secret,请运行 。本示例将创建一个名为 包含
service-account.json
字段的service-account
值取自本地./key.json
文件的内容。kubectl create secret generic service-account \ --from-file service-account.json=./key.json -n composer-user-workloads
在 DAG 中使用 Kubernetes Secret
此示例展示了使用 Kubernetes Secret 的两种方法:作为环境变量,以及由 pod 装载的卷。
第一个 Secret (airflow-secrets
) 已设置
名为 SQL_CONN
的 Kubernetes 环境变量(而不是
Airflow 或 Cloud Composer 环境变量)。
第二个 Secret service-account
将包含服务账号令牌的文件 service-account.json
装载到 /var/secrets/google
。
Secret 对象如下所示:
第一个 Kubernetes Secret 的名称在 secret_env
变量中定义。
此 Secret 名为 airflow-secrets
。deploy_type
参数指定它必须公开为环境变量。环境变量的名称为 SQL_CONN
,如 deploy_target
参数中所指定。最后,
SQL_CONN
环境变量的值已设置为
sql_alchemy_conn
键。
第二个 Kubernetes Secret 的名称在 secret_volume
变量中定义。此 Secret 的名称为 service-account
。它显示为卷,如 deploy_type
参数中所指定。目标文件路径
装载项 deploy_target
的状态为 /var/secrets/google
。最后,key
存储在 deploy_target
中的 Secret 为 service-account.json
。
操作节点配置类似于如下所示:
关于 CNCF Kubernetes 提供程序的信息
KubernetesPodOperator 在 apache-airflow-providers-cncf-kubernetes
提供程序中实现。
如需查看 CNCF Kubernetes 提供商的详细版本说明,请访问 CNCF Kubernetes 提供商网站。
版本 6.0.0
在 CNCF Kubernetes Provider 软件包 6.0.0 版中,
KubernetesPodOperator 中默认使用 kubernetes_default
连接。
如果您在版本 5.0.0 中指定了自定义连接,运营商仍会使用此自定义连接。切换回使用kubernetes_default
连接,您可能需要相应地调整 DAG。
版本 5.0.0
与版本 4.4.0 相比,此版本引入了一些向后不兼容的更改。最重要的指标
kubernetes_default
连接(未在 5.0.0 版中使用)。
- 需要修改
kubernetes_default
连接。Kubernetes 配置路径必须设置为/home/airflow/composer_kube_config
(如以下图所示)。或者,您也可以将config_file
添加到 KubernetesPodOperator 配置中(如以下代码示例所示)。
- 通过以下方式使用 KubernetesPodOperator 修改任务的代码:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
如需详细了解版本 5.0.0,请参阅 CNCF Kubernetes 提供程序版本说明。
问题排查
本部分提供了有关排查常见 KubernetesPodOperator 问题的建议:
查看日志
排查问题时,您可以按以下顺序检查日志:
Airflow 任务日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到 DAG 标签页。
点击 DAG 的名称,然后点击 DAG 运行作业以查看详细信息和日志。
Airflow 调度器日志:
前往环境详情页面。
前往日志标签页。
检查 Airflow 调度器日志。
Google Cloud 控制台中 GKE 工作负载下的 Pod 日志。这些日志包括 Pod 定义 YAML 文件、Pod 事件 Pod 详情。
非零返回代码
使用 KubernetesPodOperator(和 GKEStartPodOperator)时, 容器入口点的返回代码决定了任务 是否成功返回非零代码表示失败。
一种常见的模式是执行 Shell 脚本作为容器入口点 将容器中的多个操作组合在一起。
如果您要编写这样的脚本,建议您在脚本的顶部加入 set -e
命令,因而脚本中失败的命令会终止脚本并将故障传播至 Airflow 任务实例。
Pod 超时
KubernetesPodOperator 的默认超时时间为 120 秒,
可能会导致在下载较大图片之前发生超时。您可以
在以下情况下,可通过更改 startup_timeout_seconds
参数来延长超时时间:
您需要创建 KubernetesPodOperator
当 Pod 超时时,可通过 Airflow 界面例如:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
在以下情况下,也会出现 Pod 超时: Cloud Composer 服务账号 缺少必要的 IAM 权限,无法在 。如需验证是否属于这种情况,请使用 GKE 信息中心查看 Pod 级错误,找到特定工作负载的日志;或者使用 Cloud Logging。
未能建立新连接
GKE 集群默认启用自动升级。 如果集群中的某个节点池正在升级,您可能会看到以下错误:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
如需检查您的集群是否正在升级,请在 Google Cloud 控制台中前往 Kubernetes 集群页面,查找您的 环境的集群名称。