使用 KubernetesPodOperator

Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3

本页介绍如何使用 KubernetesPodOperator 将 Kubernetes pod 从 Cloud Composer 部署到作为 Cloud Composer 环境一部分的 Google Kubernetes Engine 集群。

KubernetesPodOperator 启动 您的环境的集群中的 Kubernetes Pod。相比之下,Google Kubernetes Engine Operator 在指定集群中运行 Kubernetes pod,该集群可以是与您的环境无关的独立集群。您还可以使用 Google Kubernetes Engine 运算符。

如果您需要以下内容,KubernetesPodOperator 是一个不错的选择:

  • 无法通过公共 PyPI 代码库获得的自定义 Python 依赖项。
  • 无法通过现有 Cloud Composer 工作器映像获得的二进制依赖项。

准备工作

请查看以下列表,了解 KubernetesPodOperator 与 Cloud Composer 3 和 Cloud Composer 2,并确保您的 DAG 兼容:

  • 无法在 Cloud Composer 3 中创建自定义命名空间。Pod 始终在 composer-user-workloads 命名空间中运行,即使指定了其他命名空间也是如此。此命名空间中的 Pod 无需进行额外配置,即可访问您项目的资源和 VPC 网络(如果已启用)。

  • 无法使用 Kubernetes API 创建 Kubernetes Secret 和 ConfigMap。Cloud Composer 提供 Google Cloud CLI 命令 Terraform 资源和 Cloud Composer API 来管理 Kubernetes Secret 和 ConfigMap如需了解详情,请参阅 使用 Kubernetes Secret 和 ConfigMap

  • 与 Cloud Composer 2 中一样,Pod 亲和性配置不可用。如果 如果您要使用 Pod 亲和性,请使用 GKE 运算符 来启动 Pod

Cloud Composer 3 中的 KubernetesPodOperator 简介

本部分介绍 KubernetesPodOperator 在 Cloud Composer 3 中的工作原理。

资源使用情况

在 Cloud Composer 3 中,您的环境的集群 自动扩缩使用 KubernetesPodOperator 运行的额外工作负载会独立于您的环境进行扩缩。您的 环境不受资源需求增加的影响, 您的环境的集群会根据资源情况进行扩缩 需求

您在环境的集群中运行的额外工作负载的价格 遵循 Cloud Composer 3 价格模式, Cloud Composer 3 SKU。

Cloud Composer 3 使用的 Autopilot 集群引入了这一概念 包括以下类别的计算类

  • Cloud Composer 仅支持 general-purpose 计算类。

  • 默认情况下,如果未选择任何类,则在您使用 KubernetesPodOperator 创建 Pod 时,系统会假定 general-purpose 类。

  • 每个类都与特定的属性和资源限制相关联, 有关详情,请参见 Autopilot 文档。对于 例如,在 general-purpose 类中运行的 Pod 最多可以使用 GiB 内存。

对项目资源的访问权限

在 Cloud Composer 3 中,您的环境的集群位于 在租户项目中执行 Pod 时,Pod 会在环境的 隔离的命名空间中

在 Cloud Composer 3 中,Pod 始终在 composer-user-workloads 中运行 即使指定了其他命名空间也是如此。 此命名空间中的 Pod 可以访问 Google Cloud 项目和您的 VPC 网络中的资源(如果 即可。 您的环境的服务账号可用于访问这些 资源。无法指定其他服务账号。

最少的配置工作量

如需创建 KubernetesPodOperator,只能使用 Pod 的 nameimagetask_id 参数为必填项。/home/airflow/composer_kube_config 包含用于向 GKE 进行身份验证的凭据。

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`. Always use the
    # `composer-user-workloads` namespace with Composer 3.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

其他配置

此示例展示了您可以在 KubernetesPodOperator 中配置的其他参数。

有关参数的详细信息,请参阅 Airflow 参考 KubernetesPodOperator 即可实现。有关使用 Kubernetes Secret 和 ConfigMap,请参阅使用 Kubernetes Secret 和 ConfigMap。对于 如需了解如何将 Jinja 模板与 KubernetesPodOperator 搭配使用,请参阅 使用 Jinja 模板

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The Docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container.
    # The env_vars parameter is templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

使用 Jinja 模板

Airflow 支持 DAG 中的 Jinja 模板

您必须声明必需的 Airflow 参数(task_idnameimage)。如以下示例所示,您可以搭配使用 Jinja 和所有其他参数生成模板,包括 cmdsargumentsenv_varsconfig_file

示例中的 env_vars 参数是通过名为 my_valueAirflow 变量设置的。示例 DAG 从 Airflow 中的 vars 模板变量获取值。Airflow 提供了 变量,以便用户访问不同类型的信息。例如,您可以使用 conf 模板变量来访问 Airflow 配置选项的值。如需了解详情和 Airflow 中可用的变量列表,请参阅 Airflow 中的模板参考文档 文档。

如果不更改 DAG 或创建 env_vars 变量,示例中的 ex-kube-templates 任务会因变量不存在而失败。在 Airflow 界面中或使用 Google Cloud CLI 创建此变量:

Airflow 界面

  1. 转到 Airflow 界面

  2. 在工具栏中,依次选择 Admin > Variables

  3. List Variable 页面上,点击 Add a new record

  4. Add Variable 页面上,输入以下信息:

    • Key:my_value
    • Val:example_value
  5. 点击保存

gcloud

输入以下命令:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

您需要将其中的:

  • ENVIRONMENT 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。

以下示例演示了如何将 Jinja 模板与 KubernetesPodOperator:

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below can be templated with Jinja. For more information
    # and the list of variables available in Airflow, see
    # the Airflow templates reference:
    # https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in Jinja is the execution date as YYYY-MM-DD, this Docker image
    # will echo the execution date. Arguments to the entrypoint. The Docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI. The env_vars parameter
    # is templated.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Specifies path to Kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

使用 Kubernetes Secret 和 ConfigMap

一个 Kubernetes Secret 是包含敏感数据的对象。Kubernetes ConfigMap 对象包含 键值对中的非机密数据。

在 Cloud Composer 3 中,您可以使用 Google Cloud CLI、API 或 Terraform,然后通过 KubernetesPodOperator:

  • 使用 Google Cloud CLI 和 API 时,您需要提供 YAML 配置文件。
  • 借助 Terraform,您可以将 Secret 和 ConfigMap 定义为 Terraform 配置文件。

YAML 配置文件简介

使用 Google Cloud CLI 创建 Kubernetes Secret 或 ConfigMap 时 API 时,您需要提供 YAML 格式的文件。此文件必须遵循 和 ConfigMap 所使用的格式相同。Kubernetes 文档提供了许多 ConfigMap 和 Secret 的代码示例。首先,您可以查看使用 Secret 安全地分发凭据页面和 ConfigMap

与 Kubernetes Secret 中相同,在 Secret 中定义值时,请使用 base64 表示法。

要对某个值进行编码,您可以使用以下命令(这也是多种方法之一, 以获取 base64 编码的值):

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

输出:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

以下两个 YAML 文件示例将在本指南后面的示例中使用。Kubernetes Secret 的 YAML 配置文件示例:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

另一个示例演示了如何包含文件。与上一个示例一样,先对文件内容 (cat ./key.json | base64) 进行编码,然后在 YAML 文件中提供此值:

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap 的 YAML 配置文件示例。您无需在 ConfigMap 中使用 base64 表示法:

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

管理 Kubernetes Secret

gcloud

创建 Secret

如需创建 Kubernetes Secret,请运行以下命令:

gcloud beta composer environments user-workloads-secrets create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • SECRET_FILE:包含 Secret 的 配置。

示例:

gcloud beta composer environments user-workloads-secrets create \
  --environment example-environment \
  --location us-central1 \
  --secret-file-path ./secrets/example-secret.yaml

更新 Secret

如需更新 Kubernetes Secret,请运行以下命令。密钥的名称将 取自指定的 YAML 文件,并且 Secret 的内容将 已替换。

gcloud beta composer environments user-workloads-secrets update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --secret-file-path SECRET_FILE

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • SECRET_FILE:包含 Secret 配置的本地 YAML 文件的路径。在此文件的 metadata > name 字段中指定 Secret 的名称。

列出 Secret

如需获取环境中的 Secret 及其字段的列表,请运行 。输出中的键值将替换为星号。

gcloud beta composer environments user-workloads-secrets list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

获取 Secret 的详细信息

如需获取有关 Secret 的详细信息,请运行以下命令。图例 输出中的值将被替换为星号。

gcloud beta composer environments user-workloads-secrets describe \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

替换以下内容:

  • SECRET_NAME:密钥的名称,如包含密钥配置的 YAML 文件中的 metadata> name 字段中所定义。
  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

删除 Secret

如需删除 Secret,请运行以下命令:

gcloud beta composer environments user-workloads-secrets delete \
  SECRET_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • SECRET_NAME:Secret 的名称,如 metadata 中所定义 > name 字段,其中包含 Secret 的 配置。
  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

API

创建 Secret

  1. 创建 environments.userWorkloadsSecrets.create API 请求。

  2. 在此请求中:

    1. 在请求正文的 name 字段中,指定新 Secret 的 URI。
    2. 在请求正文的 data 字段中,指定键和 Secret 的 base64 编码值。

示例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo="
  }
}

更新 Secret

  1. 创建 environments.userWorkloadsSecrets.update API 请求。

  2. 在此请求中:

    1. 在请求正文的 name 字段中,指定 Secret。
    2. 在请求正文的 data 字段中,为 Secret 指定密钥和 base64 编码值。这些值将被替换。

示例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret",
  "data": {
    "example": "ZXhhbXBsZV92YWx1ZSAtbgo=",
    "another-example": "YW5vdGhlcl9leGFtcGxlX3ZhbHVlIC1uCg=="
  }
}

列出 Secret

创建 environments.userWorkloadsSecrets.list API 请求。输出中的键值将替换为星号。时间是 可以通过此请求使用分页功能,请参阅 。

示例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets

获取 Secret 的详细信息

创建 environments.userWorkloadsSecrets.get API 请求。输出中的键值将替换为星号。

示例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

删除 Secret

创建 environments.userWorkloadsSecrets.delete API 请求。

示例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsSecrets/example-secret

Terraform

google_composer_user_workloads_secret resource 定义一个 Kubernetes Secret,并使用 data 代码块。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "SECRET_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME:环境资源的名称,其中包含 Terraform 中的环境定义。实际 环境名称也在此资源中指定。
  • LOCATION:环境所在的区域。
  • SECRET_NAME:Secret 的名称。
  • KEY_NAME:此 Secret 的一个或多个密钥。
  • KEY_VALUE:键的 base64 编码值。您可以使用 base64encode 函数来对值进行编码(请参阅示例)。

以下两个 Kubernetes Secret 示例将在本指南后面的示例中使用。

resource "google_composer_user_workloads_secret" "example_secret" {
  provider = google-beta

  name = "airflow-secrets"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    sql_alchemy_conn: base64encode("postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db")
  }
}

另一个示例演示了如何包含文件。您可以使用 file 函数以字符串形式读取文件的内容,然后对其进行 base64 编码:

resource "google_composer_user_workloads_secret" "service_account_secret" {
  provider = google-beta

  name = "service-account"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "service-account.json": base64encode(file("./key.json"))
  }
}

在 DAG 中使用 Kubernetes Secret

此示例展示了将 Kubernetes Secret 用作环境的两种方法 变量,并将其装载为 Pod 所装载的卷。

第一个 Secret airflow-secrets 设置为名为 SQL_CONN 的 Kubernetes 环境变量(而不是 Airflow 或 Cloud Composer 环境变量)。

第二个 Secret service-account 会装载 service-account.json 文件, 服务账号到 /var/secrets/google

Secret 对象如下所示:

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

第一个 Kubernetes Secret 的名称在 secret_env 变量中定义。 此 Secret 的名称为 airflow-secretsdeploy_type 参数指定它必须公开为环境变量。环境变量的 名称为 SQL_CONN(如 deploy_target 参数中所指定)。最后, SQL_CONN 环境变量的值已设置为 sql_alchemy_conn 键。

第二个 Kubernetes Secret 的名称在 secret_volume 中定义。 变量。此 Secret 的名称为 service-account。它显示为卷,如 deploy_type 参数中所指定。目标文件路径 装载项 deploy_target 的状态为 /var/secrets/google。最后,存储在 deploy_target 中的 Secret 的 keyservice-account.json

操作节点配置类似于如下所示:

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. The env_vars parameter is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

管理 Kubernetes ConfigMap

gcloud

创建 ConfigMap

如需创建 ConfigMap,请运行以下命令:

gcloud beta composer environments user-workloads-config-maps create \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • CONFIG_MAP_FILE:本地 YAML 文件的路径,该文件包含 ConfigMap 的 配置。

示例:

gcloud beta composer environments user-workloads-config-maps create \
  --environment example-environment \
  --location us-central1 \
  --config-map-file-path ./configs/example-configmap.yaml

更新 ConfigMap

如需更新 ConfigMap,请运行以下命令。ConfigMap 的名称将从指定的 YAML 文件中获取,并且 ConfigMap 的内容将被替换。

gcloud beta composer environments user-workloads-config-maps update \
  --environment ENVIRONMENT_NAME \
  --location LOCATION \
  --config-map-file-path CONFIG_MAP_FILE

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。
  • CONFIG_MAP_FILE:本地 YAML 文件的路径,该文件包含 ConfigMap 的 配置。在此文件的 metadata > name 字段中指定 ConfigMap 的名称。

列出 ConfigMap

如需获取环境的 ConfigMap 及其字段的列表,请运行以下命令。输出中的键值对将按原样显示。

gcloud beta composer environments user-workloads-config-maps list \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

替换以下内容:

  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

获取 ConfigMap 的详细信息

如需获取有关 ConfigMap 的详细信息,请运行以下命令。输出中的键值将按原样显示。

gcloud beta composer environments user-workloads-config-maps describe \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION

替换以下内容:

  • CONFIG_MAP_NAME:ConfigMap 的名称,如 YAML 文件中的 metadata > name 字段, ConfigMap 的配置。
  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

删除 ConfigMap

如需删除 ConfigMap,请运行以下命令:

gcloud beta composer environments user-workloads-config-maps delete \
  CONFIG_MAP_NAME \
  --environment ENVIRONMENT_NAME \
  --location LOCATION
  • CONFIG_MAP_NAME:ConfigMap 的名称,如在包含 ConfigMap 配置的 YAML 文件的 metadata > name 字段中所定义。
  • ENVIRONMENT_NAME:您的环境的名称。
  • LOCATION:环境所在的区域。

API

创建 ConfigMap

  1. 创建 environments.userWorkloadsConfigMaps.create API 请求。

  2. 在此请求中:

    1. 在请求正文的 name 字段中,指定 新 ConfigMap。
    2. 在请求正文的 data 字段中,指定键和 ConfigMap 的实际值。

示例:

// POST https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value"
  }
}

更新 ConfigMap

  1. 创建 environments.userWorkloadsConfigMaps.update API 请求。

  2. 在此请求中:

    1. 在请求正文的 name 字段中,指定 ConfigMap 的映射。
    2. 在请求正文的 data 字段中,指定 ConfigMap 的键和值。这些值将被替换。

示例:

// PUT https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

{
  "name": "projects/example-project/locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap",
  "data": {
    "example_key": "example_value",
    "another_key": "another_value"
  }
}

列出 ConfigMap

创建 environments.userWorkloadsConfigMaps.list API 请求。输出中的键值将按原样显示。您可以对此请求使用分页功能,如需了解详情,请参阅该请求的参考文档。

示例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps

获取 ConfigMap 的详细信息

创建 environments.userWorkloadsConfigMaps.get API 请求。输出中的键值将按原样显示。

示例:

// GET https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

删除 ConfigMap

创建 environments.userWorkloadsConfigMaps.delete API 请求。

示例:

// DELETE https://composer.googleapis.com/v1beta1/projects/example-project/
// locations/us-central1/environments/example-environment/userWorkloadsConfigMaps/example-configmap

Terraform

通过 google_composer_user_workloads_config_map 资源定义了一个 ConfigMap,该 ConfigMap 具有 data 代码块。

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta
  environment = google_composer_environment.ENVIRONMENT_RESOURCE_NAME.name
  name = "CONFIG_MAP_NAME"
  region = "LOCATION"

  data = {
    KEY_NAME: "KEY_VALUE"
  }
}
  • ENVIRONMENT_RESOURCE_NAME:环境资源的名称, 包含 Terraform 中环境的定义。此资源中还指定了实际环境的名称。
  • LOCATION:环境所在的区域。
  • CONFIG_MAP_NAME:ConfigMap 的名称。
  • KEY_NAME:此 ConfigMap 的一个或多个键。
  • KEY_VALUE:键的值。

示例:

resource "google_composer_user_workloads_config_map" "example_config_map" {
  provider = google-beta

  name = "example-config-map"

  environment = google_composer_environment.example_environment.name
  region = "us-central1"

  data = {
    "example_key": "example_value"
  }
}

在 DAG 中使用 ConfigMap

此示例展示了如何在 DAG 中使用 ConfigMap。

在以下示例中,系统会在 configmaps 参数中传递 ConfigMap。此 ConfigMap 的所有键都以环境变量的形式提供:

import datetime

from airflow import models
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_env_vars',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'echo "Value: $example_key"',
    ],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

以下示例展示了如何将 ConfigMap 挂载为卷:

import datetime

from airflow import models
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

volume_mount = k8s.V1VolumeMount(name='confmap-example',
  mount_path='/config',
  sub_path=None,
  read_only=False)

volume = k8s.V1Volume(name='confmap-example',
  config_map=k8s.V1ConfigMapVolumeSource(name='example-configmap'))

with models.DAG(
    dag_id="composer_kubernetes_pod_configmap",
    schedule_interval=None,
    start_date=datetime.datetime(2024, 1, 1),
) as dag:

  KubernetesPodOperator(
    task_id='kpo_configmap_volume_mount',
    image='busybox:1.28',
    cmds=['sh'],
    arguments=[
        '-c',
        'ls /config'
    ],
    volumes=[volume],
    volume_mounts=[volume_mount],
    configmaps=["example-configmap"],
    config_file="/home/airflow/composer_kube_config",
  )

CNCF Kubernetes 提供商的相关信息

KubernetesPodOperator 是在 apache-airflow-providers-cncf-kubernetes 提供商。

如需了解 CNCF Kubernetes 提供程序的详细版本说明,请参阅 CNCF Kubernetes 提供商网站

问题排查

本部分提供有关排查常见 KubernetesPodOperator 问题的建议 问题:

查看日志

排查问题时,您可以按以下顺序查看日志:

  1. Airflow 任务日志:

    1. 在 Google Cloud 控制台中,前往环境页面。

      转到“环境”

    2. 在环境列表中,点击您的环境名称。环境详情页面会打开。

    3. 转到 DAG 标签页。

    4. 点击 DAG 的名称,然后点击 DAG 运行作业以查看详细信息和日志。

  2. Airflow 调度器日志:

    1. 前往环境详情页面。

    2. 前往日志标签页。

    3. 检查 Airflow 调度程序日志。

  3. 用户工作负载日志:

    1. 前往环境详情页面。

    2. 转到监控标签页。

    3. 选择用户工作负载

    4. 检查已执行的工作负载列表。您可以查看每个工作负载的日志和资源利用率信息。

非零返回代码

使用 KubernetesPodOperator(和 GKEStartPodOperator)时,容器入口点的返回代码确定任务是否成功。返回非零代码表示失败。

一种常见的模式是执行 Shell 脚本作为容器入口点 将容器中的多个操作组合在一起。

如果您要编写这样的脚本,建议您在脚本的顶部加入 set -e 命令,因而脚本中失败的命令会终止脚本并将故障传播至 Airflow 任务实例。

Pod 超时

KubernetesPodOperator 的默认超时时间为 120 秒, 可能会导致在下载较大图片之前发生超时。您可以 在以下情况下,可通过更改 startup_timeout_seconds 参数来延长超时时间: 您需要创建 KubernetesPodOperator

当 Pod 超时时,可通过 Airflow 界面例如:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

在以下情况下,也会出现 Pod 超时: Cloud Composer 服务账号 缺少必要的 IAM 权限,无法在 。要验证这一点,请使用 GKE 信息中心:用于查看 或使用 Cloud Logging。

后续步骤