Using the KubernetesPodOperator

This page describes how to use the KubernetesPodOperator to launch Kubernetes pods from Cloud Composer into the Google Kubernetes Engine cluster that is part of your Cloud Composer environment and to ensure your environment has the appropriate resources.

Before you begin

  • The minimum Cloud Composer version required is 1.0.0. The current version is listed under Image version in the environment details.

  • The KubernetesPodOperator is best suited for launching pods into the GKE cluster for your Cloud Composer environment.

    To authorize Airflow workers in your Cloud Composer environment to use network-accessible GKE clusters, you can use the GKEPodOperator. This version is available in composer-1.1.0-airflow-1.9.0. To kick off a job, specify the project_id, cluster_name, location in addition to the KubernetesPodOperator arguments.

  • Make sure your environment has sufficient resources. Launching pods into a resource-starved environment can cause worker and scheduler errors.

  • Due to an issue with the Kubernetes Python client library, your Kubernetes pods should be designed to take no more than an hour to run.

Ensuring appropriate resources for your environment

When you create a Cloud Composer environment, you specify the amount of computing power for the environment, and a certain amount is allocated to the GKE cluster. Launching Kubernetes pods into the environment can cause competition between programs for resources, such as CPU or memory. Because the Airflow scheduler and workers are in the same GKE cluster, the schedulers and workers won't work properly if the competition results in resource starvation.

Creating a node pool

The preferred way to prevent resource starvation in the Cloud Composer environment is to create a new node pool solely to launch pods into. This method allows for resource separation and stops competition between the pods and Airflow's resources.

Step 1: Create a new node pool

Console

To create a node pool in an existing cluster, perform the following steps:

  1. Visit the GKE menu in GCP Console.

    Visit the GKE menu

  2. Select the desired cluster.

  3. Click Edit.
  4. From Node pools, click Add node pool.
  5. Configure your node pool.
  6. To enable advanced options such as automatic upgrades and autoscaling, click More.
  7. Click Save.

gcloud

To create a node pool in an existing cluster, enter the following command:

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    ADDITIONAL_FLAGS 
where:

  • POOL_NAME is the desired name of the node pool.
  • CLUSTER is the name of the cluster to create the node pool in.
  • PROJECT_ID is the Cloud Composer project name.

For the list of options, see the gcloud container node-pools create documentation.

A successful node-pools create request returns the node pool information:

Creating node pool example-pool...done.
  Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
  NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
  example-pool  n1-standard-1  100           1.2.4

Step Two: Configure affinity

Configure the affinity parameter when creating the KubernetesPodOperator. This parameter allows finer grained control of what nodes to schedule pods on, such as only nodes in a particular node pool.

The following example shows KubernetesPodOperator with affinity:

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'node-pool-name-1',
                            'node-pool-name-2',
                        ]
                    }]
                }]
            }
        }
    })

Increasing the number of nodes in your environment

Increasing the number of nodes in your Cloud Composer environment increases the computing power available to your workers. This increase does not provide additional resources for tasks that require more CPU or RAM than the specified machine type provides.

To increase node count, update your environment.

Specifying the appropriate machine type

To ensure available resources, specify the ideal machine type for the type of computing that occurs in your Cloud Composer environment. You can only specify machine type during Cloud Composer environment creation.

Launching pods

To launch containers using a pod within Cloud Composer's GKE, add a KubernetesPodOperator object to a DAG, and place the DAG into the /dags folder.

import datetime

from airflow import models
from airflow.contrib.operators import kubernetes_pod_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.
    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images.
        image='gcr.io/gcp-runtimes/ubuntu_16_0_4')

Troubleshooting

Pod timeouts

The default timeout for KubernetesPodOperator is 120 seconds, which can result in timeouts occurring before larger images download. You can increase the timeout by altering the startup_timeout_seconds parameter when you create the KubernetesPodOperator.

When a pod times out, the task specific log will be available in the Airflow web UI. For example:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Pod failures

  • View KubernetesPodOperator specific logs:

    1. In the Airflow web UI, click the failed task.
    2. Click 'View Log'.
  • Check the output of the Airflow scheduler and workers:

    1. Go to the Cloud Composer environment's Cloud Storage bucket. This is bucket where the DAGs are located.
    2. Review the logs at logs/DAG_NAME/TASK_ID/EXECUTION_DATE.
  • Review the detailed pod logs in the GCP Console under GKE workloads. These logs include the pod definition YAML file, pod events, and pod details.

Next steps

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer