import datetime
from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
# to using the config file found at `/home/airflow/composer_kube_config if
# no `config_file` parameter is specified. By default it will contain the
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
# created upon environment creation.
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. In Composer 1 there is the potential for
# the resource starvation of Airflow workers and scheduler
# within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources,
# and using Composer 2 will mean the environment will autoscale.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
Airflow 1
import datetime
from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = secret.Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
# to using the config file found at `/home/airflow/composer_kube_config if
# no `config_file` parameter is specified. By default it will contain the
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
# created upon environment creation.
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 1.10.4, resource specifications were
# passed as a Pod Resources Class object,
# If using this example on a version of Airflow prior to 1.10.4,
# import the "pod" package from airflow.contrib.kubernetes and use
# resources = pod.Resources() instead passing a dict
# For more info see:
# https://github.com/apache/airflow/pull/4551
resources={"limit_memory": "250M", "limit_cpu": "100m"},
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config のデフォルトが構成に使用されます。pod-ex-minimum タスクを成功させるのに、コードを変更する必要はありません。
Airflow 2
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. In Composer 1 there is the potential for
# the resource starvation of Airflow workers and scheduler
# within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources,
# and using Composer 2 will mean the environment will autoscale.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
Airflow 1
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
Airflow 1
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates タスクが失敗します。ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value)。2 番目のエラーは最初のエラーの修正後に取得できますが、config で core/kube_config が見つからないためタスクが失敗していることを示しています。
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
Airflow 1
secret_env = secret.Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
Airflow 1
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
DAG または環境を変更しない場合、ex-kube-secrets タスクは失敗します。ログを確認すると、Pod took too long to start エラーのためタスクが失敗しています。このエラーは、構成(secret_env)で指定された Secret が見つからない場合に発生します。
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
Airflow 1
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
Airflow 1
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 1.10.4, resource specifications were
# passed as a Pod Resources Class object,
# If using this example on a version of Airflow prior to 1.10.4,
# import the "pod" package from airflow.contrib.kubernetes and use
# resources = pod.Resources() instead passing a dict
# For more info see:
# https://github.com/apache/airflow/pull/4551
resources={"limit_memory": "250M", "limit_cpu": "100m"},
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
[[["わかりやすい","easyToUnderstand","thumb-up"],["問題の解決に役立った","solvedMyProblem","thumb-up"],["その他","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["翻訳に関する問題","translationIssue","thumb-down"],["その他","otherDown","thumb-down"]],["最終更新日 2024-05-28 UTC。"],[],[]]