This page describes how to use the KubernetesPodOperator
to launch
Kubernetes pods
from Cloud Composer into the Google Kubernetes Engine cluster that
is part of your Cloud Composer environment
and to ensure your environment has the appropriate resources.
The KubernetesPodOperator
is a good option if you require:
- Custom Python dependencies that are not available through the public PyPI repository.
- Binary dependencies that aren't available in the stock Cloud Composer worker image.
This page walks you through an example DAG that includes the following KubernetesPodOperator
configurations:
- Minimal configuration: Sets only the required parameters.
- Template configuration: Uses parameters that you can template with Jinja.
- Secret variables configuration: Passes a Kubernetes Secret object to the pod.
- Pod affinity configuration: Limits the available nodes to schedule pods on.
- Full configuration: Includes all configurations.
Before you begin
- We recommend using the latest version of Cloud Composer. At a minimum, the version used should be supported as part of the Cloud Composer Deprecation and Support policy. To check the image version, see the environment details.
- Make sure your environment has sufficient resources. Launching pods into a resource-starved environment can cause Airflow worker and Airflow scheduler errors.
Ensuring appropriate resources for your environment
When you create a Cloud Composer environment, you specify the amount of computing power for the environment, and a certain amount is allocated to the GKE cluster. Launching Kubernetes pods into the environment can cause competition between programs for resources, such as CPU or memory. Because the Airflow scheduler and workers are in the same GKE cluster, the schedulers and workers won't work properly if the competition results in resource starvation.
To prevent resource starvation, take one or more of the following actions:
- Create a node pool (preferred)
- Increase the number of nodes in your environment
- Specify the appropriate machine type
Creating a node pool
The preferred way to prevent resource starvation in the Cloud Composer environment is to create a new node pool and configure the Kubernetes pods to execute using only resources from that pool.
To create a node pool in an existing cluster, perform the following steps:
Console
In Cloud Console, go to the GKE menu.
Select the desired cluster.
Click Edit.
From Node pools, click Add node pool.
Configure your node pool.
(Optional) Enable advanced options, such as automatic upgrades and autoscaling.
Click Save.
gcloud
Enter the following command:
gcloud container node-pools create POOL_NAME \ --cluster CLUSTER_NAME \ --project PROJECT_ID \ --zone ZONE \ ADDITIONAL_FLAGS
where:
POOL_NAME
is the desired name of the node pool.CLUSTER
is the name of the cluster to create the node pool in.PROJECT_ID
is the Cloud Composer project name.ZONE
is the zone where the GKE cluster is located.--machine-type
: The Compute Engine machine type (such asn1-standard-1
) to use for instances in the node pool. If unspecified, the default machine type isn1-standard-1
. Refer togcloud container node-pools create --machine-type
for information on specifying machine types.--num-nodes
: The number of nodes to create in the node pool. Default is 3. Your project must have enough [quota] available for the number of nodes, as well as sufficient route quota.--disk-size
: The size in GB for node VM boot disks. Defaults to 100 GB.--image-type
: The [image type] to use for the node pool. Image type specifies the base OS to run on nodes in the node pool. If an image type is specified, that is assigned to the node pool and all future upgrades use the specified image type. If it is not specified, the server chooses the default image type.
To see the default image type and a list of valid image types, enter:
gcloud container get-server-config
--min-cpu-platform
: The [minimum CPU platform] to run on the nodes.--scopes SCOPE,[SCOPE,...]
: The scopes for the node instances. For example:
gcloud container node-pools create POOL_NAME \ --cluster CLUSTER_NAME \ --project PROJECT_ID \ --scopes https://www.googleapis.com/auth/devstorage.read_only
gcloud container node-pools create POOL_NAME \ --cluster CLUSTER_NAME \ --project PROJECT_ID \ --scopes bigquery,storage-rw,compute-ro
You can specify multiple, comma-delimited scopes. The following scopes are required for the cluster to function properly and added even if unspecified:
compute-rw
andstorage-ro
.SCOPE
can be the full URI of the scope or an alias.
For the list of options, see the gcloud container node-pools create
documentation.
A successful node-pools create
request returns the node pool information:
Creating node pool example-pool...done. Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool]. NAME MACHINE_TYPE DISK_SIZE_GB NODE_VERSION example-pool n1-standard-1 100 1.2.4
Increasing the number of nodes in your environment
Increasing the number of nodes in your Cloud Composer environment increases the computing power available to your workers. This increase does not provide additional resources for tasks that require more CPU or RAM than the specified machine type provides.
To increase node count, update your environment.
Specifying the appropriate machine type
During Cloud Composer environment creation, you can specify a machine type. To ensure available resources, specify the ideal machine type for the type of computing that occurs in your Cloud Composer environment.
KubernetesPodOperator Configuration
To follow along with this example,
put the entire kubernetes_pod_operator.py
file in your environment's dags/
folder or add the
relevant KubernetesPodOperator
code to a DAG.
The following sections explain each KubernetesPodOperator
configuration
in the example. For information about each configuration variable,
see the Airflow reference.
Minimal Configuration
To create a KubernetesPodOperator, only name
, namespace
, image
, and task_id
are required.
When you place the following code snippet in a DAG, the configuration uses the defaults
in /home/airflow/composer_kube_config
. You don't need to modify the code for the
pod-ex-minimum
task to succeed.
Template Configuration
Airflow supports using Jinja Templating.
You must declare the required variables (task_id
, name
, namespace
, image
)
with the operator. As shown in the following example, you can template
all other parameters with Jinja, including cmds
, arguments
, env_vars
,
and config_file
.
Without changing the DAG or your environment, the ex-kube-templates
task
fails because of two errors. Let's walk through how to debug the task and resolve
the errors.
- Verify the
ex-kube-templates
task fails. - Check the
ex-kube-templates
task logs.The logs show this task is failing because the appropriate variable does not exist (
my_value
). To set
my_value
withgcloud
or the Airflow UI:gcloud
Enter the following command:
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set my_value example_value
where:
ENVIRONMENT
is the name of the Cloud Composer environmentLOCATION
is the region where the Cloud Composer environment is located
Airflow UI
- In the toolbar, click Admin > Variables.
- Click Create.
- Enter the following information:
- Key:
my_value
- Val:
example_value
- Key:
- Click Save.
Verify the
ex-kube-templates
task status.The
ex-kube-templates
task still fails! If you look at the logs, the task now fails becausecore/kubeconfig
is not found inconfig
. To refer to a customconfig_file
(a Kubernetes configuration file), you need to set thekube_config
variable in theairflow.cfg
file to a valid Kubernetes configuration.To set the
kube_config
variable, enter the following command:gcloud composer environments update ENVIRONMENT \ --location LOCATION \ --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config
where:
ENVIRONMENT
is the name of the Cloud Composer environmentLOCATION
is the region where the Cloud Composer environment is located
Wait a few minutes for your environment to update.
Run the
ex-kube-templates
task again
Secret Variables Configuration
A Kubernetes secret
is an object that contains a small amount of sensitive data.
You can pass secrets to the Kubernetes pods by using the KubernetesPodOperator
.
Secrets must be defined in Kubernetes, or the pod fails to launch.
In this example, we deploy the Kubernetes secret, airflow-secrets
,
to a Kubernetes environment variable named SQL_CONN
(as opposed to an Airflow or
Cloud Composer environment variable).
Here's what the secret looks like:
The name of the Kubernetes secret is defined in the secret
variable.
This particular secret is named airflow-secrets
. It is exposed as an
environment variable, as dictated by the deploy_type
. The environment variable
it sets to, deploy_target
, is SQL_CONN
. Finally, the key
of the secret
we are storing in the deploy_target
is sql_alchemy_conn
.
Here's what the operator configuration looks like:
Without making any changes to the DAG or your environment, the ex-kube-secrets
task fails. Let's walk through how to debug the task and resolve the errors.
- Verify the
ex-kube-secrets
task fails. - Check the
ex-kube-secrets
task logs.If you look at the logs, you'll see that the task fails because of a
Pod took too long to start
error. This error occurs because Airflow cannot find the secret specified in the configuration,secret_env
. To set the secret using
gcloud
:View details about your Cloud Composer environment by running the following command:
gcloud composer environments describe ENVIRONMENT \ --location LOCATION \ --format="value(config.gkeCluster)"
where:
ENVIRONMENT
is the name of the Cloud Composer environmentLOCATION
is the region where the Cloud Composer environment is locatedOutput similar to the following returns:
projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>
To get the GKE cluster ID, copy and paste the output after
/clusters/
(ends in-gke
) somewhere you can get it later. This output is your cluster id.To get the zone, copy and paste the output after
/zones/
somewhere you can get it later.Connect to your GKE cluster by running the following command:
gcloud container clusters get-credentials CLUSTER_ID \ --zone ZONE \ --project PROJECT
where:
CLUSTER_ID
is your GKE cluster IDZONE
is the zone where your GKE is locatedPROJECT
is the ID of your Google Cloud Project
Create a Kubernetes secret that sets the value of
sql_alchemy_conn
totest_value
by running the following command:kubectl create secret generic airflow-secrets \ --from-literal sql_alchemy_conn=test_value
After you set the secret, run the
ex-kube-secrets
task again.
Pod Affinity Configuration
When you configure the affinity
parameter in the KubernetesPodOperator
, you
control what nodes to schedule pods on, such as nodes only in a particular node pool.
In this example, the operator runs only on node pools named pool-0
and pool-1
.
As the example is currently configured, the task fails. Let's walk through how to debug the task and resolve the errors.
- Verify the
ex-pod-affinity
task fails. - Check the
ex-pod-affinity
task logs.If you look at the logs, you'll see that the task fails because node pools
pool-0
andpool-1
do not exist. To make sure the node pools in
values
exist, make any of the following configuration changes:- If you created a node pool earlier, replace
pool-0
andpool-1
with the names of your node pools and upload your DAG again. - Create a node pool named
pool-0
orpool-1
. You can create both, but the task needs only one to succeed. - Replace
pool-0
andpool-1
withdefault-pool
, which is the default pool that Airflow uses. Then, upload your DAG again. Note: By default, the Kubernetes pods are scheduled in thedefault-pool
. If you add pools later, the pools will be restricted to thedefault-pool
.
- If you created a node pool earlier, replace
Wait a few minutes for your environment to update.
Run the
ex-pod-affinity
task again.
Full Configuration
This example shows all the variables that you can configure in the
KubernetesPodOperator
. You don't need to modify the code for the
the ex-all-configs
task to succeed.
For details on each variable, see the Airflow KubernetesPodOperator
reference.
Managing DAGs
Viewing a task's status
- Go to the Airflow web interface.
- On the DAGs page, click the DAG name (such as
composer_sample_kubernetes_pod
). - On the DAGs Details page, click Graph View.
Check status:
Failed: The task has a red box around it (such as
ex-kube-templates
). You can also hold the pointer over task and look for State: Failed.Succeed: The task has a green box around it (such as
pod-ex-minimum
). You can also hold the pointer over the task and check for State: Success.
Checking task logs
- In the Airflow UI, view the task's status.
- In the Graph View for the DAG, click the task name.
- On the Task Instance Context menu, click View Log.
Running a task again
- To return to the DAG:
- In the toolbar in the Airflow UI, click DAGs.
- Click the DAG name (such as
composer_samples_kubernetes_pod
).
- To run the task again:
- Click the task name.
- Click Clear and then click OK. The task runs again automatically.
Troubleshooting
Tips for troubleshooting Pod failures
In addition to checking the task logs, also check the following logs:
Output of the Airflow scheduler and workers:
- Go to the Cloud Composer environment's Cloud Storage bucket. This is bucket where the DAGs are located.
- Review the logs at
logs/
DAG_NAME
/
TASK_ID
/
EXECUTION_DATE
.
Detailed pod logs in the Cloud Console under GKE workloads. These logs include the pod definition YAML file, pod events, and pod details.
Non-zero return codes when also using the GKEPodOperator
When using the KubernetesPodOperator
and the GKEPodOperator
, the return
code of the container's entrypoint determines whether the task is considered
successful or not. Non-zero return codes indicate failure.
A common pattern when using the KubernetesPodOperator
and the GKEPodOperator
is to execute a shell script as the container entrypoint to group together
multiple operations within the container.
If you are writing such a script, we recommended that you include
the set -e
command at the top of the script
so that failed commands in the script terminate the script and
propagate the failure to the Airflow task instance.
Task fails despite pod success
For Cloud Composer environments running composer-1.4.1-airflow-*
or earlier:
If an Airflow task runs for an hour and the task logs end
with kubernetes.client.rest.ApiException: (401)
and Reason: Unauthorized
,
the underlying Kubernetes job continues executing after that point and
even might succeed. However, Airflow reports the task as failed.
To fix this issue, add an explicit PyPI package dependency
on kubernetes>=8.0.1
.
Pod timeouts
The default timeout for KubernetesPodOperator
is 120 seconds, which
can result in timeouts occurring before larger images download. You can
increase the timeout by altering the startup_timeout_seconds
parameter when you create the KubernetesPodOperator
.
When a pod times out, the task specific log will be available in the Airflow web UI. For example:
Executingon 2018-07-23 19:06:58.133811 Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py'] Event: pod-name-9a8e9d06 had an event of type Pending ... ... Event: pod-name-9a8e9d06 had an event of type Pending Traceback (most recent call last): File "/usr/local/bin/airflow", line 27, in args.func(args) File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run pool=args.pool, File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper result = func(*args, **kwargs) File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task result = task_copy.execute(context=context) File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute raise AirflowException('Pod Launching failed: {error}'.format(error=ex)) airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Pod Timeouts can also occur when the Composer Service Account lacks the necessary IAM permissions to perform the task at hand. To verify this, look at pod-level errors using the GKE Dashboards to look at the logs for your particular Workload, or use Stackdriver Logging.
Failed to Establish a New Connection
Auto-upgrade is enabled by default in GKE clusters. If a node pool is in a cluster that is upgrading, you may see the following error:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse
To check if your cluster is upgrading, go to the GKE menu in the Cloud Console and look for the loading icon next to your environment's cluster name.