Use the KubernetesPodOperator

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

This page describes how to use the KubernetesPodOperator to deploy Kubernetes pods from Cloud Composer into the Google Kubernetes Engine cluster that is part of your Cloud Composer environment and to ensure that your environment has the appropriate resources.

KubernetesPodOperator launches Kubernetes pods in your environment's cluster. In comparison, Google Kubernetes Engine operators run Kubernetes pods in a specified cluster, which can be a separate cluster that is not related to your environment. You can also create and delete clusters using Google Kubernetes Engine operators.

The KubernetesPodOperator is a good option if you require:

  • Custom Python dependencies that are not available through the public PyPI repository.
  • Binary dependencies that are not available in the stock Cloud Composer worker image.

This page walks you through an example Airflow DAG that includes the following KubernetesPodOperator configurations:

Before you begin

  • In Cloud Composer 3, your environment's cluster scales automatically. Extra workloads that you run using KubernetesPodOperator scale independently from your environment. Your environment is not affected by the increased resource demand, but your environment's cluster scales up and down depending on the resource demand. The pricing for the extra workloads that you run in your environment's cluster follows the Cloud Composer 2 pricing model and uses Cloud Composer Compute SKUs.

  • In Cloud Composer 3, your environment's cluster is located in the tenant project. However, KubernetesPodOperator works the same way, without any code changes required compared to Cloud Composer 2. Pods are executed in the environment's cluster, in an isolated namespace, but with access to your VPC network (if it's enabled).

  • Cloud Composer 3 uses GKE clusters with Workload Identity. By default, Pods that run in a newly created namespaces or the composer-user-workloads namespace cannot access Google Cloud resources. When using Workload Identity, Kubernetes service accounts associated with namespaces must be mapped to Google Cloud service accounts, to enable service identity authorization for requests to Google APIs and other services.

    Because of this, if you run Pods in the composer-user-workloads namespace or a newly created namespace in your environment's cluster, then proper IAM bindings between Kubernetes and Google Cloud service accounts are not created, and these Pods cannot access resources of your Google Cloud project.

    If you want your Pods to have access to Google Cloud resources, then use the composer-user-workloads namespace or create your own namespace as described further.

    To provide access to your project's resources, follow the guidance in Workload Identity and set up the bindings:

    1. Create a separate namespace in your environment's cluster.
    2. Create a binding between the composer-user-workloads/<namespace_name> Kubernetes Service Account and your environment's service account.
    3. Add your environment's service account annotation to the Kubernetes service account.
    4. When you use KubernetesPodOperator, specify the namespace and the Kubernetes service account in the namespace and service_account_name parameters.
  • Cloud Composer 3 uses GKE clusters with Workload Identity. The GKE metadata server takes a few seconds to start accepting requests on a newly created Pod. Therefore, attempts to authenticate using Workload Identity within the first few seconds of a Pod's life might fail. For more information about this limitation, see Restrictions of Workload Identity.

  • Cloud Composer 2 uses Autopilot clusters which introduce the notion of compute classes:

    • By default, if no class is selected then the general-purpose class is assumed when you create Pods using KubernetesPodOperator.

    • Each class is associated with specific properties and resource limits, You can read about them in Autopilot documentation. For example, Pods that run within the general-purpose class can use up to 110 GiB of memory.

KubernetesPodOperator configuration

To follow along with this example, put the entire kubernetes_pod_operator.py file in your environment's dags/ folder or add the relevant KubernetesPodOperator code to a DAG.

The following sections explain each KubernetesPodOperator configuration in the example. For information about each configuration variable, see the Airflow reference.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

Minimal configuration

To create a KubernetesPodOperator, only Pod's name, namespace where to run the pod, image to use, and task_id are required.

When you place the following code snippet in a DAG, the configuration uses the defaults in /home/airflow/composer_kube_config. You don't need to modify the code for the pod-ex-minimum task to succeed.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Template configuration

Airflow supports using Jinja Templating. You must declare the required variables (task_id, name, namespace, and image) with the operator. As shown in the following example, you can template all other parameters with Jinja, including cmds, arguments, env_vars, and config_file.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

Without changing the DAG or your environment, the ex-kube-templates task fails because of two errors. The logs show this task is failing because the appropriate variable does not exist (my_value). The second error, which you can get after fixing the first error, shows that the task fails because core/kube_config is not found in config.

To fix both errors, follow the steps outlined further.

To set my_value with gcloud or the Airflow UI:

Airflow UI

In the Airflow 2 UI:

  1. Go to the Airflow UI.

  2. In the toolbar, select Admin > Variables.

  3. On the List Variable page, click Add a new record.

  4. On the Add Variable page, enter the following information:

    • Key:my_value
    • Val: example_value
  5. Click Save.

gcloud

For Airflow 2, enter the following command:

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

Replace:

  • ENVIRONMENT with the name of the environment.
  • LOCATION with the region where the environment is located.

To refer to a custom config_file (a Kubernetes configuration file), override the kube_config Airflow configuration option to a valid Kubernetes configuration:

Section Key Value
core kube_config /home/airflow/composer_kube_config

Wait a few minutes for your environment to update. Then run the ex-kube-templates task again and verify that the ex-kube-templates task succeeds.

Full configuration

This example shows all the variables that you can configure in the KubernetesPodOperator. You don't need to modify the code for the the ex-all-configs task to succeed.

For details on each variable, see the Airflow KubernetesPodOperator reference.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Information about CNCF Kubernetes Provider

GKEStartPodOperator and KubernetesPodOperator are implemented within apache-airflow-providers-cncf-kubernetes provider.

For defailed release notes for CNCF Kubernetes provider refer to CNCF Kubernetes Provider website.

Version 6.0.0

In version 6.0.0 of the CNCF Kubernetes Provider package, the kubernetes_default connection is used by default in the KubernetesPodOperator.

If you specified a custom connection in version 5.0.0, this custom connection is still used by the operator. To switch back to using the kubernetes_default connection, you might want to adjust your DAGs accordingly.

Version 5.0.0

This version introduces a few backward incompatible changes compared to version 4.4.0. The most important ones are related to the kubernetes_default connection which is not used in version 5.0.0.

  • The kubernetes_default connection needs to be modified. Kube config path must be set to /home/airflow/composer_kube_config (as shown in Figure 1) As an alternative, config_file must to be added to the KubernetesPodOperator configuration (as shown in the following code example).
Kube config path field in Airflow UI
Figure 1. Airflow UI, modifying kubernetes_default connection (click to enlarge)
  • Modify the code of a task using KubernetesPodOperator in the following way:
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

For more information about Version 5.0.0 refer to CNCF Kubernetes Provider Release Notes

Troubleshooting

Tips for troubleshooting Pod failures

In addition to checking the task logs in the Airflow UI, also check the following logs:

  • Output of the Airflow scheduler and workers:

    1. In the Google Cloud console, go to the Environments page.

      Go to Environments

    2. Follow the DAGs link for your environment.

    3. In the bucket of your environment, go up one level.

    4. Review the logs in the logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE> folder.

  • Detailed pod logs in the Google Cloud console under GKE workloads. These logs include the pod definition YAML file, pod events, and pod details.

Non-zero return codes when also using the GKEStartPodOperator

When using the KubernetesPodOperator and the GKEStartPodOperator, the return code of the container's entrypoint determines whether the task is considered successful or not. Non-zero return codes indicate failure.

A common pattern when using the KubernetesPodOperator and the GKEStartPodOperator is to execute a shell script as the container entrypoint to group together multiple operations within the container.

If you are writing such a script, we recommended that you include the set -e command at the top of the script so that failed commands in the script terminate the script and propagate the failure to the Airflow task instance.

Pod timeouts

The default timeout for KubernetesPodOperator is 120 seconds, which can result in timeouts occurring before larger images download. You can increase the timeout by altering the startup_timeout_seconds parameter when you create the KubernetesPodOperator.

When a pod times out, the task specific log is available in the Airflow UI. For example:

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod Timeouts can also occur when the Cloud Composer Service Account lacks the necessary IAM permissions to perform the task at hand. To verify this, look at pod-level errors using the GKE Dashboards to look at the logs for your particular Workload, or use Cloud Logging.

Failed to establish a new connection

Auto-upgrade is enabled by default in GKE clusters. If a node pool is in a cluster that is upgrading, you might see the following error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

To check if your cluster is upgrading, in Google Cloud console, go to the Kubernetes clusters page and look for the loading icon next to your environment's cluster name.

What's next