This page describes how to use the KubernetesPodOperator to deploy
Kubernetes pods
from Cloud Composer into the Google Kubernetes Engine
cluster that is part of your Cloud Composer environment and to ensure
your environment has the appropriate resources.
KubernetesPodOperator launches Kubernetes podsin your
environment's cluster. In comparison,
Google Kubernetes Engine operators
run Kubernetes pods in a specified cluster, which can be a separate cluster
that is not related to your environment. You can also create and delete
clusters using Google Kubernetes Engine operators.
The KubernetesPodOperator is a good option if you require:
Custom Python dependencies that are not available through the public PyPI
repository.
Binary dependencies that are not available in the stock
Cloud Composer worker image.
This page walks you through an example Airflow DAG that includes the following
KubernetesPodOperator configurations:
We recommend to use the latest version of Cloud Composer.
At a minimum, this version must be supported as part of
the deprecation and support policy.
Make sure that your
environment has sufficient resources.
Launching pods into a resource-starved environment can cause Airflow
worker and Airflow scheduler errors.
Set up your Cloud Composer environment resources
When you create a Cloud Composer environment, you specify its
performance parameters, including performance parameters for environment's
cluster. Launching Kubernetes pods into the environment cluster can cause
competition for cluster resources, such as CPU or memory. Because the Airflow
scheduler and workers are in the same GKE cluster, the
schedulers and workers won't work properly if the competition results in
resource starvation.
To prevent resource starvation, take one or more of the following actions:
Increasing the number of nodes in your Cloud Composer environment
increases the computing power available to your workloads. This increase does
not provide additional resources for tasks that require
more CPU or RAM than the specified machine type provides.
During Cloud Composer environment creation,
you can specify a machine type. To ensure available resources, specify a
machine type for the type of computing
that occurs in your Cloud Composer environment.
KubernetesPodOperator configuration
To follow along with this example, put the entire kubernetes_pod_operator.py
file in your environment's dags/ folder or
add the relevant KubernetesPodOperator code to a DAG.
The following sections explain each KubernetesPodOperator configuration
in the example. For information about each configuration variable,
see the Airflow reference.
import datetime
from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
# to using the config file found at `/home/airflow/composer_kube_config if
# no `config_file` parameter is specified. By default it will contain the
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
# created upon environment creation.
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. In Composer 1 there is the potential for
# the resource starvation of Airflow workers and scheduler
# within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources,
# and using Composer 2 will mean the environment will autoscale.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
import datetime
from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = secret.Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
# to using the config file found at `/home/airflow/composer_kube_config if
# no `config_file` parameter is specified. By default it will contain the
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
# created upon environment creation.
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 1.10.4, resource specifications were
# passed as a Pod Resources Class object,
# If using this example on a version of Airflow prior to 1.10.4,
# import the "pod" package from airflow.contrib.kubernetes and use
# resources = pod.Resources() instead passing a dict
# For more info see:
# https://github.com/apache/airflow/pull/4551
resources={"limit_memory": "250M", "limit_cpu": "100m"},
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
Minimal configuration
To create a KubernetesPodOperator, only Pod's name, namespace where to run
the pod, image to use, and task_id are required.
When you place the following code snippet in a DAG, the configuration uses the
defaults in /home/airflow/composer_kube_config. You don't need to modify the
code for the pod-ex-minimum task to succeed.
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. In Composer 1 there is the potential for
# the resource starvation of Airflow workers and scheduler
# within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources,
# and using Composer 2 will mean the environment will autoscale.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
Template configuration
Airflow supports using
Jinja Templating.
You must declare the required variables (task_id, name, namespace,
and image) with the operator. As shown in the following example, you can
template all other parameters with Jinja, including cmds, arguments,
env_vars, and config_file.
kubenetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
Without changing the DAG or your environment, the ex-kube-templates task
fails because of two errors. The logs show this task is failing because the
appropriate variable does not exist (my_value). The second error, which you
can get after fixing the first error, shows that the task fails because
core/kube_config is not found in config.
To fix both errors, follow the steps outlined further.
LOCATION with the region where the environment is located.
To refer to a custom config_file (a Kubernetes configuration file),
override the kube_config Airflow configuration option to a
valid Kubernetes configuration:
Section
Key
Value
core
kube_config
/home/airflow/composer_kube_config
Wait a few minutes for your environment to update. Then
run the ex-kube-templates task again and verify that the
ex-kube-templates task succeeds.
Secret variables configuration
A Kubernetes secret
is an object that contains sensitive data. You can pass secrets to the
Kubernetes pods by using the KubernetesPodOperator.
Secrets must be defined in Kubernetes, or the pod fails to launch.
This example shows two ways of using Kubernetes Secrets: as an environment
variable, and as a volume mounted by the pod.
The first secret, airflow-secrets, is set
to a Kubernetes environment variable named SQL_CONN (as opposed to an
Airflow or Cloud Composer environment variable).
The second secret, service-account, mounts service-account.json, a file
with a service account token, to /var/secrets/google.
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
secret_env = secret.Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
The name of the first Kubernetes secret is defined in the secret variable.
This particular secret is named airflow-secrets. It is exposed as an
environment variable, as dictated by the deploy_type. The environment
variable it sets to, deploy_target, is SQL_CONN. Finally, the key of the
secret that is stored in the deploy_target is sql_alchemy_conn.
The name of the second Kubernetes secret is defined in the secret variable.
This particular secret is named service-account. It is exposed as an
volume, as dictated by the deploy_type. The path of the file to mount,
deploy_target, is /var/secrets/google. Finally, the key of the secret
that is stored in the deploy_target is service-account.json.
Here's what the operator configuration looks like:
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
Without making any changes to the DAG or your environment,
the ex-kube-secrets
task fails. If you look at the logs, the task fails because of
a Pod took too long to start error. This error occurs because Airflow
cannot find the secret specified in the configuration, secret_env.
gcloud
To set the secret using gcloud:
Get information about your Cloud Composer environment cluster.
Create a Kubernetes secret that sets the value of
service-account.json to a local path of a service account key file
called key.json by running the following command:
After you set the secrets, run the ex-kube-secrets task again in the
Airflow UI.
Verify the ex-kube-secrets task succeeds.
Pod affinity configuration
When you configure the affinity parameter in the KubernetesPodOperator, you
control what nodes to schedule pods on, such as nodes only in a particular
node pool. In this example, the operator runs only on node pools named
pool-0 and pool-1. Your Cloud Composer 1 environment nodes are in
the default-pool, so your pods do not run on the nodes in your environment.
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
As the example is currently configured, the task fails. If you look at the
logs, the task fails because node pools pool-0 and pool-1 do not exist.
To make sure the node pools in values exist, make any of the following
configuration changes:
If you created a node pool earlier, replace pool-0 and pool-1 with the
names of your node pools and upload your DAG again.
Create a node pool named pool-0 or pool-1. You can create
both, but the task needs only one to succeed.
Replace pool-0 and pool-1 with default-pool, which is the default pool
that Airflow uses. Then, upload your DAG again.
After you make the changes, wait a few minutes for your environment to update.
Then run the ex-pod-affinity task again and verify that the ex-pod-affinity
task succeeds.
Full configuration
This example shows all the variables that you can configure in the
KubernetesPodOperator. You don't need to modify the code for the
the ex-all-configs task to succeed.
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 1.10.4, resource specifications were
# passed as a Pod Resources Class object,
# If using this example on a version of Airflow prior to 1.10.4,
# import the "pod" package from airflow.contrib.kubernetes and use
# resources = pod.Resources() instead passing a dict
# For more info see:
# https://github.com/apache/airflow/pull/4551
resources={"limit_memory": "250M", "limit_cpu": "100m"},
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
Information about CNCF Kubernetes Provider
GKEStartPodOperator and KubernetesPodOperator are implemented within
apache-airflow-providers-cncf-kubernetes provider.
In version 6.0.0 of the CNCF Kubernetes Provider package,
the kubernetes_default connection is used by default in
the KubernetesPodOperator.
If you specified a custom connection in version 5.0.0, this custom connection
is still used by the operator. To switch back to using the kubernetes_default
connection, you might want to adjust your DAGs accordingly.
Version 5.0.0
This version introduces a few backward incompatible changes
compared to version 4.4.0. The most important ones, that you should be aware of
are related to kubernetes_default connection which is not used in
version 5.0.0.
kubernetes_default connection needs to be modified - Kube config path needs
to be set to /home/airflow/composer_kube_config or alternatively config_file
needs to be added to the KubernetesPodOperator configuration (as it was
presented below).
Modify the code of a task using KubernetesPodOperator in the following way
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
In the bucket of your environment, go up one level.
Review the logs in the logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>
folder.
Detailed pod logs in the Google Cloud console
under GKE workloads. These logs include the pod
definition YAML file, pod events, and pod details.
Non-zero return codes when also using the GKEStartPodOperator
When using the KubernetesPodOperator and the GKEStartPodOperator, the
return code of the container's entrypoint determines whether the task is
considered successful or not. Non-zero return codes indicate failure.
A common pattern when using the KubernetesPodOperator and
the GKEStartPodOperator is to execute a shell script as the container
entrypoint to group together multiple operations within the container.
If you are writing such a script, we recommended that you include
the set -e command at the top of the script
so that failed commands in the script terminate the script and
propagate the failure to the Airflow task instance.
Pod timeouts
The default timeout for KubernetesPodOperator is 120 seconds, which
can result in timeouts occurring before larger images download. You can
increase the timeout by altering the startup_timeout_seconds
parameter when you create the KubernetesPodOperator.
When a pod times out, the task specific log is available in
the Airflow UI. For example:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod Timeouts can also occur when the
Cloud Composer Service Account
lacks the necessary IAM permissions to perform the task at hand. To verify
this, look at pod-level errors using the
GKE Dashboards to look at the logs for your
particular Workload, or use Cloud Logging.
Failed to establish a new connection
Auto-upgrade is enabled by default in GKE clusters.
If a node pool is in a cluster that is upgrading, you might see the following
error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
To check if your cluster is upgrading, in Google Cloud console, go to the
Kubernetes clusters page and look for the loading icon next to your
environment's cluster name.