Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
本页介绍如何使用 KubernetesPodOperator 将 Kubernetes pod 从 Cloud Composer 部署到作为 Cloud Composer 环境一部分的 Google Kubernetes Engine 集群。
KubernetesPodOperator 启动 您的环境的集群中的 Kubernetes Pod。相比之下,Google Kubernetes Engine Operator 在指定集群中运行 Kubernetes pod,该集群可以是与您的环境无关的独立集群。您还可以使用 Google Kubernetes Engine Operator 来创建和删除集群。
如果您需要以下内容,KubernetesPodOperator 是一个好的选项:
- 无法通过公共 PyPI 代码库获得的自定义 Python 依赖项。
- 无法通过现有 Cloud Composer 工作器映像获得的二进制依赖项。
准备工作
- 我们建议使用最新版本的 Cloud Composer。此版本必须至少作为弃用和支持政策的一部分受到支持。
- 请确保您的环境中有足够的资源。将 Pod 启动到资源耗尽的环境中可能会导致 Airflow 工作器和 Airflow 调度器错误
设置 Cloud Composer 环境资源
创建 Cloud Composer 环境时,您可以指定其性能参数,包括环境集群的性能参数。将 Kubernetes pod 启动到环境集群可能会导致集群资源(例如 CPU 或内存)争用。由于 Airflow 调度程序和工作器属于同一 GKE 集群,因此如果资源争用导致资源耗尽,则调度程序和工作器将无法正常工作。
为了防止资源耗尽,请采取以下一项或多项措施:
- (推荐)创建节点池
- 增加环境中的节点数量
- 指定适当的机器类型
创建节点池
防止 Cloud Composer 环境中的资源耗尽的首选方法是新建节点池并仅使用该池中的资源来配置 Kubernetes pod。
控制台
gcloud
确定您的环境的集群的名称:
gcloud composer environments describe ENVIRONMENT_NAME \ --location LOCATION \ --format="value(config.gkeCluster)"
替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。
输出会包含环境的集群名称。例如,此值可以是
europe-west3-example-enviro-af810e25-gke
。按照添加节点池中所述创建节点池。
增加环境中的节点数量
增加 Cloud Composer 环境中的节点数将提高工作负载可用的计算能力,但如果任务需要的 CPU 或 RAM 高于指定机器类型所能提供的,这种增加不会为其提供额外的资源。
如需增加节点数,请更新您的环境。
指定适当的机器类型
在 Cloud Composer 环境创建期间,您可以指定机器类型。为确保有可用的资源,请针对 Cloud Composer 环境中的计算类型指定机器类型。
最少的配置工作量
如需创建 KubernetesPodOperator,只有要使用的 Pod 的 name
、image
和 task_id
参数是必需的。/home/airflow/composer_kube_config
包含用于向 GKE 进行身份验证的凭据。
Airflow 2
Airflow 1
Pod 亲和性配置
在 KubernetesPodOperator 中配置 affinity
参数后,您可以控制要在哪些节点上安排 Pod,例如仅在特定节点池中的节点上安排。在此示例中,操作节点仅在名为 pool-0
和 pool-1
的节点池上运行。您的 Cloud Composer 1 环境节点在 default-pool
中,因此您的 Pod 不会在您的环境中的节点上运行。
Airflow 2
Airflow 1
根据示例的配置,任务将失败。查看日志时,
失败,因为节点池“pool-0
”和“pool-1
”不存在。
如需确保 values
中的节点池存在,请进行以下任一配置更改:
如果您之前创建了节点池,请将
pool-0
和pool-1
替换为您的节点池名称,然后重新上传 DAG。创建一个节点池并将其命名为
pool-0
或pool-1
。您可以同时创建两者,但任务只需一个即可成功。将
pool-0
和pool-1
替换为 Airflow 使用的默认池default-pool
,然后再次上传您的 DAG。
进行更改后,等待几分钟,让您的环境更新。然后再次运行 ex-pod-affinity
任务,并验证 ex-pod-affinity
任务是否成功。
其他配置
此示例展示了您可在 KubernetesPodOperator 即可实现。
有关参数的详细信息,请参阅 Airflow 参考 KubernetesPodOperator 即可实现。有关使用 Kubernetes Secret 和 ConfigMap,请参阅使用 Kubernetes Secret 和 ConfigMap。如需了解如何将 Jinja 模板与 KubernetesPodOperator 搭配使用,请参阅使用 Jinja 模板。
Airflow 2
Airflow 1
使用 Jinja 模板
Airflow 支持 DAG 中的 Jinja 模板。
您必须在操作节点 (operator) 中声明所需的 Airflow 参数(task_id
、name
和 image
)。如以下示例所示,您可以搭配使用 Jinja 和所有其他参数生成模板,包括 cmds
、arguments
、env_vars
和 config_file
。
示例中的 env_vars
参数是通过名为 my_value
的 Airflow 变量设置的。示例 DAG
从 Airflow 中的 vars
模板变量获取值。Airflow 提供了
变量,以便用户访问不同类型的信息。例如,您可以使用 conf
模板变量来访问 Airflow 配置选项的值。如需了解详情和
Airflow 中可用的变量列表,请参阅
Airflow 中的模板参考文档
文档。
如果不更改 DAG 或创建 env_vars
变量,示例中的 ex-kube-templates
任务会因变量不存在而失败。在 Airflow 界面中或使用 Google Cloud CLI 创建以下变量:
Airflow 界面
转到 Airflow 界面。
在工具栏中,依次选择 Admin > Variables。
在 List Variable 页面上,点击 Add a new record。
在 Add Variable 页面上,输入以下信息:
- Key:
my_value
- Val:
example_value
- Key:
点击保存。
如果您的环境使用 Airflow 1,请改为运行以下命令:
转到 Airflow 界面。
在工具栏中,选择 Admin > Variables。
在 Variables 页面上,点击 Create 标签页。
在 Variable 页面上,输入以下信息:
- Key:
my_value
- Val:
example_value
- Key:
点击保存。
gcloud
输入以下命令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
如果您的环境使用 Airflow 1,请改为运行以下命令:
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables -- \
--set my_value example_value
您需要将其中的:
ENVIRONMENT
替换为环境的名称。LOCATION
替换为环境所在的区域。
以下示例演示了如何将 Jinja 模板与 KubernetesPodOperator:
Airflow 2
Airflow 1
使用 Kubernetes Secret 和 ConfigMap
Kubernetes Secret 是包含敏感数据的对象。Kubernetes ConfigMap 对象包含 键值对中的非机密数据。
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI、API 或 Terraform,然后通过 KubernetesPodOperator 即可实现。
YAML 配置文件简介
使用 Google Cloud CLI 和 API 创建 Kubernetes Secret 或 ConfigMap 时,您需要提供 YAML 格式的文件。此文件必须遵循 和 ConfigMap 所使用的格式相同。Kubernetes 文档 提供了许多 ConfigMap 和 Secret 的代码示例。首先,您可以查看使用 Secret 安全地分发凭据页面和 ConfigMap。
与 Kubernetes Secret 中相同,在 Secret 中定义值时,请使用 base64 表示法。
要对某个值进行编码,您可以使用以下命令(这也是多种方法之一, 以获取 base64 编码的值):
echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64
输出:
cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
以下两个 YAML 文件示例将在本指南后面的示例中使用。Kubernetes Secret 的 YAML 配置文件示例:
apiVersion: v1
kind: Secret
metadata:
name: airflow-secrets
data:
sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==
另一个示例演示了如何包含文件。与上一个相同
例如,先对文件的内容 (cat ./key.json | base64
) 进行编码,然后
在 YAML 文件中提供此值:
apiVersion: v1
kind: Secret
metadata:
name: service-account
data:
service-account.json: |
ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K
ConfigMap 的 YAML 配置文件示例。您无需使用 base64 ConfigMap 中的表示法:
apiVersion: v1
kind: ConfigMap
metadata:
name: example-configmap
data:
example_key: example_value
管理 Kubernetes Secret
在 Cloud Composer 2 中,您可以使用 Google Cloud CLI 和 kubectl
创建 Secret:
获取有关环境集群的信息:
运行以下命令:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
替换:
- 将
ENVIRONMENT
替换为您的环境名称。 - 将
LOCATION
替换为 Cloud Composer 环境所在的区域。
此命令的输出使用以下格式:
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
。- 将
为了获取 GKE 集群 ID,请复制
/clusters/
后面的输出内容(以-gke
结尾)。要获取该可用区,请复制
/zones/
后面的输出内容。
使用以下命令连接到您的 GKE 集群:
gcloud container clusters get-credentials CLUSTER_ID \ --project PROJECT \ --zone ZONE
您需要将其中的:
CLUSTER_ID
:环境的集群 ID。PROJECT_ID
:项目 ID。- 将
ZONE
替换为环境集群所在的可用区。
创建 Kubernetes Secret:
以下命令展示了创建 Kubernetes Secret
--from-literal
方法使用键值对。--from-file
方法使用文件内容。如需通过提供键值对来创建 Kubernetes Secret,请运行 。此示例创建了一个名为
airflow-secrets
的 Secret,其中包含一个值为test_value
的sql_alchemy_conn
字段。kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
如需通过提供文件内容创建 Kubernetes Secret,请运行 。此示例创建一个名为
service-account
的 Secret,其中包含一个service-account.json
字段,该字段的值取自本地./key.json
文件的内容。kubectl create secret generic service-account \ --from-file service-account.json=./key.json
在 DAG 中使用 Kubernetes Secret
此示例展示了使用 Kubernetes Secret 的两种方法:作为环境变量,以及由 pod 装载的卷。
第一个 Secret (airflow-secrets
) 已设置
名为 SQL_CONN
的 Kubernetes 环境变量(而不是
Airflow 或 Cloud Composer 环境变量)。
第二个 Secret service-account
将包含服务账号令牌的文件 service-account.json
装载到 /var/secrets/google
。
Secret 对象如下所示:
Airflow 2
Airflow 1
第一个 Kubernetes Secret 的名称在 secret_env
变量中定义。
此 Secret 的名称为 airflow-secrets
。deploy_type
参数指定它必须公开为环境变量。环境变量的名称为 SQL_CONN
,如 deploy_target
参数中所指定。最后,
SQL_CONN
环境变量的值已设置为
sql_alchemy_conn
键。
第二个 Kubernetes Secret 的名称在 secret_volume
中定义。
变量。此 Secret 的名称为 service-account
。它以
音量(如 deploy_type
参数中所指定)。要装载的文件的路径 deploy_target
是 /var/secrets/google
。最后,存储在 deploy_target
中的 Secret 的 key
为 service-account.json
。
操作节点配置类似于如下所示:
Airflow 2
Airflow 1
有关 CNCF Kubernetes 提供程序的信息
KubernetesPodOperator 是在
apache-airflow-providers-cncf-kubernetes
提供商。
如需查看 CNCF Kubernetes 提供商的详细版本说明,请访问 CNCF Kubernetes 提供商网站。
版本 6.0.0
在 CNCF Kubernetes Provider 软件包 6.0.0 版中,
KubernetesPodOperator 中默认使用 kubernetes_default
连接。
如果您在版本 5.0.0 中指定了自定义连接,运营商仍会使用此自定义连接。如需改回使用 kubernetes_default
连接,您可能需要相应地调整 DAG。
版本 5.0.0
此版本引入了一些向后不兼容的更改
(与版本 4.4.0 相比)最重要的指标
kubernetes_default
连接(未在 5.0.0 版中使用)。
- 需要修改
kubernetes_default
连接。Kubernetes 配置 路径必须设置为/home/airflow/composer_kube_config
(如下图所示)。作为替代方案,config_file
添加到 KubernetesPodOperator 配置(如 以下代码示例)。
- 按照以下方式使用 KubernetesPodOperator 修改任务的代码:
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
有关 5.0.0 版的详细信息,请参阅 CNCF Kubernetes 提供程序版本说明。
问题排查
本部分提供了有关排查常见 KubernetesPodOperator 问题的建议:
查看日志
排查问题时,您可以按以下顺序查看日志:
Airflow 任务日志:
在 Google Cloud 控制台中,前往环境页面。
在环境列表中,点击您的环境名称。环境详情页面会打开。
转到 DAG 标签页。
点击 DAG 的名称,然后点击 DAG 运行作业以查看详细信息 和日志。
Airflow 调度器日志:
前往环境详情页面。
前往日志标签页。
检查 Airflow 调度程序日志。
Google Cloud 控制台中的 GKE 下的 Pod 日志 工作负载这些日志包括 Pod 定义 YAML 文件、Pod 事件 Pod 详情。
非零返回代码
使用 KubernetesPodOperator(和 GKEStartPodOperator)时,容器入口点的返回代码确定任务是否成功。返回非零代码表示失败。
常见模式是执行一个 shell 脚本作为容器入口点,以将容器内的多个操作分组到一起。
如果您要编写这样的脚本,建议您在脚本的顶部加入 set -e
命令,因而脚本中失败的命令会终止脚本并将故障传播至 Airflow 任务实例。
Pod 超时
KubernetesPodOperator 的默认超时时间为 120 秒,这会导致在较大的映像下载完成之前发生超时。您可以通过在创建 KubernetesPodOperator 时更改 startup_timeout_seconds
参数来增加超时值。
如果某个 pod 超时,您可从 Airflow 界面中查看相应任务专属的日志。例如:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
在以下情况下,也会出现 Pod 超时: Cloud Composer 服务账号 缺少必要的 IAM 权限,无法在 。要验证这一点,请使用 GKE 信息中心:用于查看 或使用 Cloud Logging。
无法建立新连接
GKE 集群默认启用自动升级。 如果集群中的某个节点池正在升级,您可能会看到以下错误:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
如需检查您的集群是否正在升级,请在 Google Cloud 控制台中前往 Kubernetes 集群页面,然后查看您的环境的集群名称旁是否有加载图标。