Using the KubernetesPodOperator

This page describes how to use the KubernetesPodOperator to launch Kubernetes pods from Cloud Composer into the Google Kubernetes Engine cluster that is part of your Cloud Composer environment and to ensure your environment has the appropriate resources.

Cloud Composer environment resources in the tenant project and the customer project with arrow showing that launched pods will be in the same Kubernetes Engine as the Airflow Workers, Redis, Airflow Scheduler, and Cloud SQL Proxy
Cloud Composer Kubernetes Pod Launch Location (click to enlarge)

The KubernetesPodOperator is a good option if you require:

  • Custom Python dependencies that are not available through the public PyPI repository.
  • Binary dependencies that aren't available in the stock Cloud Composer worker image.

This page walks you through an example DAG that includes the following KubernetesPodOperator configurations:

Before you begin

Ensuring appropriate resources for your environment

When you create a Cloud Composer environment, you specify the amount of computing power for the environment, and a certain amount is allocated to the GKE cluster. Launching Kubernetes pods into the environment can cause competition between programs for resources, such as CPU or memory. Because the Airflow scheduler and workers are in the same GKE cluster, the schedulers and workers won't work properly if the competition results in resource starvation.

To prevent resource starvation, take one or more of the following actions:

Creating a node pool

The preferred way to prevent resource starvation in the Cloud Composer environment is to create a new node pool and configure the Kubernetes pods to execute using only resources from that pool.

To create a node pool in an existing cluster, perform the following steps:

Console

  1. In GCP Console, go to the GKE menu.

    Visit the GKE menu

  2. Select the desired cluster.

  3. Click Edit.

  4. From Node pools, click Add node pool.

  5. Configure your node pool.

  6. (Optional) Enable advanced options, such as automatic upgrades and autoscaling.

  7. Click Save.

gcloud

Enter the following command:

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

where:

  • POOL_NAME is the desired name of the node pool.
  • CLUSTER is the name of the cluster to create the node pool in.
  • PROJECT_ID is the Cloud Composer project name.
  • ZONE is the zone where the GKE cluster is located.

    For the list of options, see the gcloud container node-pools create documentation.

    A successful node-pools create request returns the node pool information:

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

Increasing the number of nodes in your environment

Increasing the number of nodes in your Cloud Composer environment increases the computing power available to your workers. This increase does not provide additional resources for tasks that require more CPU or RAM than the specified machine type provides.

To increase node count, update your environment.

Specifying the appropriate machine type

During Cloud Composer environment creation, you can specify a machine type. To ensure available resources, specify the ideal machine type for the type of computing that occurs in your Cloud Composer environment.

KuberentesPodOperator Configuration

To follow along with this example, put the entire kubernetes_pod_operator.py file in your environment's dags/ folder or add the relevant KubernetesPodOperator code to a DAG.

The following sections explain each KubernetesPodOperator configuration in the example. For information about each configuration variable, see the Ariflow reference.

import datetime
from airflow import models
from airflow.contrib.kubernetes import pod
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator


# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images.
        image='gcr.io/gcp-runtimes/ubuntu_16_0_4')


    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in airflow.cfg. If
        # the configuration file does not exist or does not provide valid
        # credentials the pod will fail to launch. If not specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={'EXAMPLE_VAR': '/example/value'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        resources=pod.Resources(),
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

Minimal Configuration

To create a KubernetesPodOperator, only name, namespace, image, and task_id are required.

When you place the following code snippet in a DAG, the configuration uses the defaults in /home/airflow/composer_kube_config. You don't need to modify the code for the pod-ex-minimum task to succeed.

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images.
    image='gcr.io/gcp-runtimes/ubuntu_16_0_4')

Template Configuration

Airflow supports using Jinja Templating. You must declare the required variables (task_id, name, namespace, image) with the operator. As shown in the following example, you can template all other parameters with Jinja, including cmds, arguments, env_vars, and config_file.

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

Without changing the DAG or your environment, the ex-kube-templates task fails because of two errors. Let's walk through how to debug the task and resolve the errors.

  1. Verify the ex-kube-templates task fails.
  2. Check the ex-kube-templates task logs.

    The logs show this task is failing because the appropriate variable does not exist (my_value).

  3. To set my_value with gcloud or the Airflow UI:

    gcloud

    Enter the following command:

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    where:

    • ENVIRONMENT is the name of the Cloud Composer environment
    • LOCATION is the region where the Cloud Composer environment is located

    Airflow UI

    1. In the toolbar, click Admin > Variables.
    2. Click Create.
    3. Enter the following information:
      • Key:my_value
      • Val: example_value
    4. Click Save.
  4. Run the ex-kube-templates task again.

  5. Verify the ex-kube-templates task status.

    The ex-kube-templates task still fails! If you look at the logs, the task now fails because core/kubeconfig is not found in config. To refer to a custom config_file (a Kubernetes configuration file), you need to set the kube_config variable in the airflow.cfg file to a valid Kubernetes configuration.

  6. To set the kube_config variable, enter the following command:

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    where:

    • ENVIRONMENT is the name of the Cloud Composer environment
    • LOCATION is the region where the Cloud Composer environment is located
  7. Wait a few minutes for your environment to update.

  8. Run the ex-kube-templates task again

  9. Verify the ex-kube-templates task succeeds.

Secret Variables Configuration

A Kubernetes secret is an object that contains a small amount of sensitive data. You can pass secrets to the Kubernetes pods by using the KubernetesPodOperator. Secrets must be defined in Kubernetes, or the pod fails to launch.

In this example, we deploy the Kubernetes secret, airflow-secrets, to a Kubernetes environment variable named SQL_CONN (as opposed to an Airflow or Cloud Composer environment variable).

Here's what the secret looks like:

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')

The name of the Kubernetes secret is defined in the secret variable. This particular secret is named airflow-secrets. It is exposed as an environment variable, as dictated by the deploy_type. The environment variable it sets to, deploy_target, is SQL_CONN. Finally, the key of the secret we are storing in the deploy_target is sql_alchemy_conn.

Here's what the operator configuration looks like:

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={'EXAMPLE_VAR': '/example/value'})

Without making any changes to the DAG or your environment, the ex-kube-secrets task fails. Let's walk through how to debug the task and resolve the errors.

  1. Verify the ex-kube-secrets task fails.
  2. Check the ex-kube-secrets task logs.

    If you look at the logs, you'll see that the task fails because of a Pod took too long to start error. This error occurs because Airflow cannot find the secret specified in the configuration, secret_env.

  3. To set the secret using gcloud:

    1. View details about your Cloud Composer environment by running the following command:

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      where:

      • ENVIRONMENT is the name of the Cloud Composer environment
      • LOCATION is the region where the Cloud Composer environment is located

        Output similar to the following returns: projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>

    2. To get the GKE cluster ID, copy and paste the output after /clusters/ (ends in -gke) somewhere you can get it later. This output is your cluster id.

    3. To get the zone, copy and paste the output after /zones/ somewhere you can get it later.

    4. Connect to your GKE cluster by running the following command:

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      where:

      • CLUSTER_ID is your GKE cluster ID
      • ZONE is the zone where your GKE is located
      • PROJECT is the ID of your GCP Project
    5. Create a Kubernetes secret that sets the value of sql_alchemy_conn to test_value by running the following command:

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. After you set the secret, run the ex-kube-secrets task again.

  5. Verify the ex-kube-secrets task succeeds.

Pod Affinity Configuration

When you configure the affinity parameter in the KubernetesPodOperator, you control what nodes to schedule pods on, such as nodes only in a particular node pool. In this example, the operator runs only on node pools named pool-0 and pool-1.

Cloud Composer environment resources in the tenant project and the customer project with arrow showing that launched pods will be in the same Kubernetes Engine as the Airflow Workers, Redis, Airflow Scheduler, and Cloud SQL Proxy, but in particular pools, pool-0 and pool-1, shown as separate boxes within the Kubernetes Engine.
Cloud Composer Kubernetes Pod Launch Location with Pod Affinity (click to enlarge)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

As the example is currently configured, the task fails. Let's walk through how to debug the task and resolve the errors.

  1. Verify the ex-pod-affinity task fails.
  2. Check the ex-pod-affinity task logs.

    If you look at the logs, you'll see that the task fails because node pools pool-0 and pool-1 do not exist.

  3. To make sure the node pools in values exist, make any of the following configuration changes:

    • If you created a node pool earlier, replace pool-0 and pool-1 with the names of your node pools and upload your DAG again.
    • Create a node pool named pool-0 or pool-1. You can create both, but the task needs only one to succeed.
    • Replace pool-0 and pool-1 with default-pool, which is the default pool that Airflow uses. Then, upload your DAG again. Note: By default, the Kubernetes pods are scheduled in the default-pool. If you add pools later, the pools will be restricted to the default-pool.
  4. Wait a few minutes for your environment to update.

  5. Run the ex-pod-affinity task again.

  6. Verify the ex-pod-affinity task succeeds.

Full Configuration

This example shows all the variables that you can configure in the KubernetesPodOperator. You don't need to modify the code for the the ex-all-configs task to succeed.

For details on each variable, see the Airflow KubernetesPodOperator reference.

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    resources=pod.Resources(),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

Managing DAGs

Viewing a task's status

  1. Go to the Airflow web interface.
  2. On the DAGs page, click the DAG name (such as composer_sample_kubernetes_pod ).
  3. On the DAGs Details page, click Graph View.
  4. Check status:

    • Failed: The task has a red box around it (such as ex-kube-templates). You can also hold the pointer over task and look for State: Failed.

    • Succeed: The task has a green box around it (such as pod-ex-minimum). You can also hold the pointer over the task and check for State: Success.

Checking task logs

  1. In the Airflow UI, view the task's status.
  2. In the Graph View for the DAG, click the task name.
  3. On the Task Instance Context menu, click View Log.

Running a task again

  1. To return to the DAG:
    1. In the toolbar in the Airflow UI, click DAGs.
    2. Click the DAG name (such as composer_samples_kubernetes_pod).
  2. To run the task again:
    1. Click the task name.
    2. Click Clear and then click OK. The task runs again automatically.

Troubleshooting

Tips for troubleshooting Pod failures

In addition to checking the task logs, also check the following logs:

  • Output of the Airflow scheduler and workers:

    1. Go to the Cloud Composer environment's Cloud Storage bucket. This is bucket where the DAGs are located.
    2. Review the logs at logs/DAG_NAME/TASK_ID/EXECUTION_DATE.
  • Detailed pod logs in the GCP Console under GKE workloads. These logs include the pod definition YAML file, pod events, and pod details.

Non-zero return codes when also using the GKEPodOperator

When using the KubernetesPodOperator and the GKEPodOperator, the return code of the container's entrypoint determines whether the task is considered successful or not. Non-zero return codes indicate failure.

A common pattern when using the KubernetesPodOperator and the GKEPodOperator is to execute a shell script as the container entrypoint to group together multiple operations within the container.

If you are writing such a script, we recommended that you include the set -e command at the top of the script so that failed commands in the script terminate the script and propagate the failure to the Airflow task instance.

Task fails despite pod success

For Cloud Composer environments running composer-1.4.1-airflow-* or earlier:

If an Airflow task runs for an hour and the task logs end with kubernetes.client.rest.ApiException: (401) and Reason: Unauthorized, the underlying Kubernetes job continues executing after that point and even might succeed. However, Airflow reports the task as failed.

To fix this issue, add an explicit PyPI package dependency on kubernetes>=8.0.1.

Pod timeouts

The default timeout for KubernetesPodOperator is 120 seconds, which can result in timeouts occurring before larger images download. You can increase the timeout by altering the startup_timeout_seconds parameter when you create the KubernetesPodOperator.

When a pod times out, the task specific log will be available in the Airflow web UI. For example:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Failed to Establish a New Connection

Auto-upgrade is enabled by default in GKE clusters. If a node pool is in a cluster that is upgrading, you may see the following error:

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

To check if your cluster is upgrading, go to the GKE menu in the GCP Console and look for the loading icon next to your environment's cluster name.

Related Resources

Was this page helpful? Let us know how we did:

Send feedback about...

Cloud Composer