Using the KubernetesPodOperator

This page describes how to use the KubernetesPodOperator to launch Kubernetes pods from Cloud Composer into the Google Kubernetes Engine cluster that is part of your Cloud Composer environment and to ensure your environment has the appropriate resources.

If you require custom dependencies that are not available via the public PyPI repository, the KubernetesPodOperator is one option that you can use. For example, the KubernetesPodOperator enables you to include packages available only in a private Python package repository or binary dependencies that aren't available in the stock Cloud Composer worker image.

For information about installing packages hosted in a private package repository or additional options to manage dependencies, see Installing Python dependencies.

Before you begin

  • The minimum Cloud Composer version required is 1.0.0. The current version is listed under Image version in the environment details.

  • The KubernetesPodOperator is best suited for launching pods into the GKE cluster for your Cloud Composer environment.

    To authorize Airflow workers in your Cloud Composer environment to use network-accessible GKE clusters, you can use the GKEPodOperator. This version is available in composer-1.1.0-airflow-1.9.0. To kick off a job, specify the project_id, cluster_name, location in addition to the KubernetesPodOperator arguments.

  • Make sure your environment has sufficient resources. Launching pods into a resource-starved environment can cause worker and scheduler errors.

Ensuring appropriate resources for your environment

When you create a Cloud Composer environment, you specify the amount of computing power for the environment, and a certain amount is allocated to the GKE cluster. Launching Kubernetes pods into the environment can cause competition between programs for resources, such as CPU or memory. Because the Airflow scheduler and workers are in the same GKE cluster, the schedulers and workers won't work properly if the competition results in resource starvation.

Creating a node pool

The preferred way to prevent resource starvation in the Cloud Composer environment is to create a new node pool and configure the Kubernetes pods to execute using only resources from that pool.

Step 1: Create a new node pool

If you use a customized service account or OAuth scopes for your Cloud Composer environment, use gcloud to create a new node pool.


To create a node pool in an existing cluster, perform the following steps:

  1. Visit the GKE menu in GCP Console.

    Visit the GKE menu

  2. Select the desired cluster.

  3. Click Edit.

  4. From Node pools, click Add node pool.

  5. Configure your node pool.

  6. (Optional) Enable advanced options, such as automatic upgrades and autoscaling.

  7. Click Save.


To create a node pool in an existing cluster, enter the following command:

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \


  • POOL_NAME is the desired name of the node pool.
  • CLUSTER is the name of the cluster to create the node pool in.
  • PROJECT_ID is the Cloud Composer project name.

    For the list of options, see the gcloud container node-pools create documentation.

    A successful node-pools create request returns the node pool information:

    Creating node pool example-pool...done.
    Created [].
    example-pool  n1-standard-1  100           1.2.4

Step Two: Configure affinity

Configure the affinity parameter when creating the KubernetesPodOperator. This parameter allows finer grained control of what nodes to schedule pods on, such as only nodes in a particular node pool.

The following example shows KubernetesPodOperator with affinity:

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label '' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # '' with the value of
                        # the nodepool's name.
                        'key': '',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [

Increasing the number of nodes in your environment

Increasing the number of nodes in your Cloud Composer environment increases the computing power available to your workers. This increase does not provide additional resources for tasks that require more CPU or RAM than the specified machine type provides.

To increase node count, update your environment.

Specifying the appropriate machine type

To ensure available resources, specify the ideal machine type for the type of computing that occurs in your Cloud Composer environment. You can only specify machine type during Cloud Composer environment creation.

Launching pods

If you created a new node pool, use the same service account and OAuth scopes that you used to create your Cloud Composer environment to avoid workflow failure.

To launch containers using a pod within Cloud Composer's GKE cluster:

  1. Add a KubernetesPodOperator object to a Python DAG.

    import datetime
    from airflow import models
    from airflow.contrib.operators import kubernetes_pod_operator
    YESTERDAY = - datetime.timedelta(days=1)
    # If a Pod fails to launch, or has an error occur in the container, Airflow
    # will show the task as failed, as well as contain all of the task logs
    # required to debug.
    with models.DAG(
            start_date=YESTERDAY) as dag:
        # Only name, namespace, image, and task_id are required to create a
        # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
        # to using the config file found at `/home/airflow/composer_kube_config if
        # no `config_file` parameter is specified. By default it will contain the
        # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
        # created upon environment creation.
        kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
            # The ID specified for the task.
            # Name of task you want to run, used to generate Pod ID.
            # Entrypoint of the container, if not specified the Docker container's
            # entrypoint is used. The cmds parameter is templated.
            # The namespace to run within Kubernetes, default namespace is
            # `default`. There is the potential for the resource starvation of
            # Airflow workers and scheduler within the Cloud Composer environment,
            # the recommended solution is to increase the amount of nodes in order
            # to satisfy the computing requirements. Alternatively, launching pods
            # into a custom namespace will stop fighting over resources.
            # Docker image specified. Defaults to, but any fully
            # qualified URLs will point to a custom repository. Supports private
            # images if the Composer Environment is under the same
            # project-id as the images.

  2. Place the entire Python .py file for the DAG into an environment's /dags folder in Cloud Storage.


Non-zero return codes when also using the 'GKEPodOperator'

When using the KubernetesPodOperator and the GKEPodOperator, the return code of the container's entrypoint determines whether the task is considered successful or not. Non-zero return codes indicate failure.

A common pattern when using the KubernetesPodOperator and the GKEPodOperator is to execute a shell script as the container entrypoint to group together multiple operations within the container.

If you are writing such a script, we recommended that you include the set -e command at the top of the script so that failed commands in the script terminate the script and propagate the failure to the Airflow task instance.

Task fails despite pod success

For Cloud Composer environments running composer-1.4.1-airflow-* or earlier:

If an Airflow task runs for an hour and the task logs end with (401) and Reason: Unauthorized, the underlying Kubernetes job continues executing after that point and even might succeed. However, Airflow reports the task as failed.

To fix this issue, add an explicit PyPI package dependency on kubernetes>=8.0.1.

Pod timeouts

The default timeout for KubernetesPodOperator is 120 seconds, which can result in timeouts occurring before larger images download. You can increase the timeout by altering the startup_timeout_seconds parameter when you create the KubernetesPodOperator.

When a pod times out, the task specific log will be available in the Airflow web UI. For example:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/']
Event: pod-name-9a8e9d06 had an event of type Pending
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/", line 392, in run
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Troubleshooting tips for Pod failures

  • View KubernetesPodOperator specific logs:

    1. In the Airflow web UI, click the failed task.
    2. Click 'View Log'.
  • Check the output of the Airflow scheduler and workers:

    1. Go to the Cloud Composer environment's Cloud Storage bucket. This is bucket where the DAGs are located.
    2. Review the logs at logs/DAG_NAME/TASK_ID/EXECUTION_DATE.
  • Review the detailed pod logs in the GCP Console under GKE workloads. These logs include the pod definition YAML file, pod events, and pod details.

Next steps

Σας βοήθησε αυτή η σελίδα; Πείτε μας τη γνώμη σας:

Αποστολή σχολίων σχετικά με…

Αυτή η σελίδα