importdatetimefromairflowimportmodelsfromairflow.kubernetes.secretimportSecretfromairflow.providers.cncf.kubernetes.operators.kubernetes_podimport(KubernetesPodOperator,)fromkubernetes.clientimportmodelsask8s_models# A Secret is an object that contains a small amount of sensitive data such as# a password, a token, or a key. Such information might otherwise be put in a# Pod specification or in an image; putting it in a Secret object allows for# more control over how it is used, and reduces the risk of accidental# exposure.secret_env=Secret(# Expose the secret as environment variable.deploy_type="env",# The name of the environment variable, since deploy_type is `env` rather# than `volume`.deploy_target="SQL_CONN",# Name of the Kubernetes Secretsecret="airflow-secrets",# Key of a secret stored in this Secret objectkey="sql_alchemy_conn",)secret_volume=Secret(deploy_type="volume",# Path where we mount the secret as volumedeploy_target="/var/secrets/google",# Name of Kubernetes Secretsecret="service-account",# Key in the form of service account file namekey="service-account.json",)# If you are running Airflow in more than one time zone# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html# for best practicesYESTERDAY=datetime.datetime.now()-datetime.timedelta(days=1)# If a Pod fails to launch, or has an error occur in the container, Airflow# will show the task as failed, as well as contain all of the task logs# required to debug.withmodels.DAG(dag_id="composer_sample_kubernetes_pod",schedule_interval=datetime.timedelta(days=1),start_date=YESTERDAY,)asdag:# Only name, namespace, image, and task_id are required to create a# KubernetesPodOperator. In Cloud Composer, currently the operator defaults# to using the config file found at `/home/airflow/composer_kube_config if# no `config_file` parameter is specified. By default it will contain the# credentials for Cloud Composer's Google Kubernetes Engine cluster that is# created upon environment creation.kubernetes_min_pod=KubernetesPodOperator(# The ID specified for the task.task_id="pod-ex-minimum",# Name of task you want to run, used to generate Pod ID.name="pod-ex-minimum",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# The namespace to run within Kubernetes, default namespace is# `default`. In Composer 1 there is the potential for# the resource starvation of Airflow workers and scheduler# within the Cloud Composer environment,# the recommended solution is to increase the amount of nodes in order# to satisfy the computing requirements. Alternatively, launching pods# into a custom namespace will stop fighting over resources,# and using Composer 2 will mean the environment will autoscale.namespace="default",# Docker image specified. Defaults to hub.docker.com, but any fully# qualified URLs will point to a custom repository. Supports private# gcr.io images if the Composer Environment is under the same# project-id as the gcr.io images and the service account that Composer# uses has permission to access the Google Container Registry# (the default service account has permission)image="gcr.io/gcp-runtimes/ubuntu_18_0_4",)kubenetes_template_ex=KubernetesPodOperator(task_id="ex-kube-templates",name="ex-kube-templates",namespace="default",image="bash",# All parameters below are able to be templated with jinja -- cmds,# arguments, env_vars, and config_file. For more information visit:# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# DS in jinja is the execution date as YYYY-MM-DD, this docker image# will echo the execution date. Arguments to the entrypoint. The docker# image's CMD is used if this is not provided. The arguments parameter# is templated.arguments=["{{ ds }}"],# The var template variable allows you to access variables defined in# Airflow UI. In this case we are getting the value of my_value and# setting the environment variable `MY_VALUE`. The pod will fail if# `my_value` is not set in the Airflow UI.env_vars={"MY_VALUE":"{{ var.value.my_value }}"},# Sets the config file to a kubernetes config file specified in# airflow.cfg. If the configuration file does not exist or does# not provide validcredentials the pod will fail to launch. If not# specified, config_file defaults to ~/.kube/configconfig_file="{{ conf.get('core', 'kube_config') }}",)kubernetes_secret_vars_ex=KubernetesPodOperator(task_id="ex-kube-secrets",name="ex-kube-secrets",namespace="default",image="ubuntu",startup_timeout_seconds=300,# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[secret_env,secret_volume],# env_vars allows you to specify environment variables for your# container to use. env_vars is templated.env_vars={"EXAMPLE_VAR":"/example/value","GOOGLE_APPLICATION_CREDENTIALS":"/var/secrets/google/service-account.json ",},)# Pod affinity with the KubernetesPodOperator# is not supported with Composer 2# instead, create a cluster and use the GKEStartPodOperator# https://cloud.google.com/composer/docs/using-gke-operatorkubernetes_affinity_ex=KubernetesPodOperator(task_id="ex-pod-affinity",name="ex-pod-affinity",namespace="default",image="perl:5.34.0",cmds=["perl"],arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# affinity allows you to constrain which nodes your pod is eligible to# be scheduled on, based on labels on the node. In this case, if the# label 'cloud.google.com/gke-nodepool' with value# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any# nodes, it will fail to schedule.affinity={"nodeAffinity":{# requiredDuringSchedulingIgnoredDuringExecution means in order# for a pod to be scheduled on a node, the node must have the# specified labels. However, if labels on a node change at# runtime such that the affinity rules on a pod are no longer# met, the pod will still continue to run on the node."requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{# When nodepools are created in Google Kubernetes# Engine, the nodes inside of that nodepool are# automatically assigned the label# 'cloud.google.com/gke-nodepool' with the value of# the nodepool's name."key":"cloud.google.com/gke-nodepool","operator":"In",# The label key's value that pods can be scheduled# on."values":["pool-0","pool-1",],}]}]}}},)kubernetes_full_pod=KubernetesPodOperator(task_id="ex-all-configs",name="pi",namespace="default",image="perl:5.34.0",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["perl"],# Arguments to the entrypoint. The docker image's CMD is used if this# is not provided. The arguments parameter is templated.arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[],# Labels to apply to the Pod.labels={"pod-label":"label-name"},# Timeout to start up the Pod, default is 120.startup_timeout_seconds=120,# The environment variables to be initialized in the container# env_vars are templated.env_vars={"EXAMPLE_VAR":"/example/value"},# If true, logs stdout output of container. Defaults to True.get_logs=True,# Determines when to pull a fresh image, if 'IfNotPresent' will cause# the Kubelet to skip pulling an image if it already exists. If you# want to always pull a new image, set it to 'Always'.image_pull_policy="Always",# Annotations are non-identifying metadata you can attach to the Pod.# Can be a large range of data, and can include characters that are not# permitted by labels.annotations={"key1":"value1"},# Optional resource specifications for Pod, this will allow you to# set both cpu and memory limits and requirements.# Prior to Airflow 2.3 and the cncf providers package 5.0.0# resources were passed as a dictionary. This change was made in# https://github.com/apache/airflow/pull/27197# Additionally, "memory" and "cpu" were previously named# "limit_memory" and "limit_cpu"# resources={'limit_memory': "250M", 'limit_cpu': "100m"},container_resources=k8s_models.V1ResourceRequirements(limits={"memory":"250M","cpu":"100m"},),# Specifies path to kubernetes config. If no config is specified will# default to '~/.kube/config'. The config_file is templated.config_file="/home/airflow/composer_kube_config",# If true, the content of /airflow/xcom/return.json from container will# also be pushed to an XCom when the container ends.do_xcom_push=False,# List of Volume objects to pass to the Pod.volumes=[],# List of VolumeMount objects to pass to the Pod.volume_mounts=[],# Affinity determines which nodes the Pod can run on based on the# config. For more information see:# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/# Pod affinity with the KubernetesPodOperator# is not supported with Composer 2# instead, create a cluster and use the GKEStartPodOperator# https://cloud.google.com/composer/docs/using-gke-operatoraffinity={},)
Airflow 1
importdatetimefromairflowimportmodelsfromairflow.contrib.kubernetesimportsecretfromairflow.contrib.operatorsimportkubernetes_pod_operator# A Secret is an object that contains a small amount of sensitive data such as# a password, a token, or a key. Such information might otherwise be put in a# Pod specification or in an image; putting it in a Secret object allows for# more control over how it is used, and reduces the risk of accidental# exposure.secret_env=secret.Secret(# Expose the secret as environment variable.deploy_type="env",# The name of the environment variable, since deploy_type is `env` rather# than `volume`.deploy_target="SQL_CONN",# Name of the Kubernetes Secretsecret="airflow-secrets",# Key of a secret stored in this Secret objectkey="sql_alchemy_conn",)secret_volume=secret.Secret(deploy_type="volume",# Path where we mount the secret as volumedeploy_target="/var/secrets/google",# Name of Kubernetes Secretsecret="service-account",# Key in the form of service account file namekey="service-account.json",)# If you are running Airflow in more than one time zone# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html# for best practicesYESTERDAY=datetime.datetime.now()-datetime.timedelta(days=1)# If a Pod fails to launch, or has an error occur in the container, Airflow# will show the task as failed, as well as contain all of the task logs# required to debug.withmodels.DAG(dag_id="composer_sample_kubernetes_pod",schedule_interval=datetime.timedelta(days=1),start_date=YESTERDAY,)asdag:# Only name, namespace, image, and task_id are required to create a# KubernetesPodOperator. In Cloud Composer, currently the operator defaults# to using the config file found at `/home/airflow/composer_kube_config if# no `config_file` parameter is specified. By default it will contain the# credentials for Cloud Composer's Google Kubernetes Engine cluster that is# created upon environment creation.kubernetes_min_pod=kubernetes_pod_operator.KubernetesPodOperator(# The ID specified for the task.task_id="pod-ex-minimum",# Name of task you want to run, used to generate Pod ID.name="pod-ex-minimum",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# The namespace to run within Kubernetes, default namespace is# `default`. There is the potential for the resource starvation of# Airflow workers and scheduler within the Cloud Composer environment,# the recommended solution is to increase the amount of nodes in order# to satisfy the computing requirements. Alternatively, launching pods# into a custom namespace will stop fighting over resources.namespace="default",# Docker image specified. Defaults to hub.docker.com, but any fully# qualified URLs will point to a custom repository. Supports private# gcr.io images if the Composer Environment is under the same# project-id as the gcr.io images and the service account that Composer# uses has permission to access the Google Container Registry# (the default service account has permission)image="gcr.io/gcp-runtimes/ubuntu_18_0_4",)kubenetes_template_ex=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-kube-templates",name="ex-kube-templates",namespace="default",image="bash",# All parameters below are able to be templated with jinja -- cmds,# arguments, env_vars, and config_file. For more information visit:# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# DS in jinja is the execution date as YYYY-MM-DD, this docker image# will echo the execution date. Arguments to the entrypoint. The docker# image's CMD is used if this is not provided. The arguments parameter# is templated.arguments=["{{ ds }}"],# The var template variable allows you to access variables defined in# Airflow UI. In this case we are getting the value of my_value and# setting the environment variable `MY_VALUE`. The pod will fail if# `my_value` is not set in the Airflow UI.env_vars={"MY_VALUE":"{{ var.value.my_value }}"},# Sets the config file to a kubernetes config file specified in# airflow.cfg. If the configuration file does not exist or does# not provide validcredentials the pod will fail to launch. If not# specified, config_file defaults to ~/.kube/configconfig_file="{{ conf.get('core', 'kube_config') }}",)kubernetes_secret_vars_ex=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-kube-secrets",name="ex-kube-secrets",namespace="default",image="ubuntu",startup_timeout_seconds=300,# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[secret_env,secret_volume],# env_vars allows you to specify environment variables for your# container to use. env_vars is templated.env_vars={"EXAMPLE_VAR":"/example/value","GOOGLE_APPLICATION_CREDENTIALS":"/var/secrets/google/service-account.json ",},)kubernetes_affinity_ex=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-pod-affinity",name="ex-pod-affinity",namespace="default",image="perl:5.34.0",cmds=["perl"],arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# affinity allows you to constrain which nodes your pod is eligible to# be scheduled on, based on labels on the node. In this case, if the# label 'cloud.google.com/gke-nodepool' with value# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any# nodes, it will fail to schedule.affinity={"nodeAffinity":{# requiredDuringSchedulingIgnoredDuringExecution means in order# for a pod to be scheduled on a node, the node must have the# specified labels. However, if labels on a node change at# runtime such that the affinity rules on a pod are no longer# met, the pod will still continue to run on the node."requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{# When nodepools are created in Google Kubernetes# Engine, the nodes inside of that nodepool are# automatically assigned the label# 'cloud.google.com/gke-nodepool' with the value of# the nodepool's name."key":"cloud.google.com/gke-nodepool","operator":"In",# The label key's value that pods can be scheduled# on."values":["pool-0","pool-1",],}]}]}}},)kubernetes_full_pod=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-all-configs",name="pi",namespace="default",image="perl:5.34.0",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["perl"],# Arguments to the entrypoint. The docker image's CMD is used if this# is not provided. The arguments parameter is templated.arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[],# Labels to apply to the Pod.labels={"pod-label":"label-name"},# Timeout to start up the Pod, default is 120.startup_timeout_seconds=120,# The environment variables to be initialized in the container# env_vars are templated.env_vars={"EXAMPLE_VAR":"/example/value"},# If true, logs stdout output of container. Defaults to True.get_logs=True,# Determines when to pull a fresh image, if 'IfNotPresent' will cause# the Kubelet to skip pulling an image if it already exists. If you# want to always pull a new image, set it to 'Always'.image_pull_policy="Always",# Annotations are non-identifying metadata you can attach to the Pod.# Can be a large range of data, and can include characters that are not# permitted by labels.annotations={"key1":"value1"},# Optional resource specifications for Pod, this will allow you to# set both cpu and memory limits and requirements.# Prior to Airflow 1.10.4, resource specifications were# passed as a Pod Resources Class object,# If using this example on a version of Airflow prior to 1.10.4,# import the "pod" package from airflow.contrib.kubernetes and use# resources = pod.Resources() instead passing a dict# For more info see:# https://github.com/apache/airflow/pull/4551resources={"limit_memory":"250M","limit_cpu":"100m"},# Specifies path to kubernetes config. If no config is specified will# default to '~/.kube/config'. The config_file is templated.config_file="/home/airflow/composer_kube_config",# If true, the content of /airflow/xcom/return.json from container will# also be pushed to an XCom when the container ends.do_xcom_push=False,# List of Volume objects to pass to the Pod.volumes=[],# List of VolumeMount objects to pass to the Pod.volume_mounts=[],# Affinity determines which nodes the Pod can run on based on the# config. For more information see:# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/affinity={},)
次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config のデフォルトが構成に使用されます。pod-ex-minimum タスクを成功させるのに、コードを変更する必要はありません。
Airflow 2
kubernetes_min_pod=KubernetesPodOperator(# The ID specified for the task.task_id="pod-ex-minimum",# Name of task you want to run, used to generate Pod ID.name="pod-ex-minimum",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# The namespace to run within Kubernetes, default namespace is# `default`. In Composer 1 there is the potential for# the resource starvation of Airflow workers and scheduler# within the Cloud Composer environment,# the recommended solution is to increase the amount of nodes in order# to satisfy the computing requirements. Alternatively, launching pods# into a custom namespace will stop fighting over resources,# and using Composer 2 will mean the environment will autoscale.namespace="default",# Docker image specified. Defaults to hub.docker.com, but any fully# qualified URLs will point to a custom repository. Supports private# gcr.io images if the Composer Environment is under the same# project-id as the gcr.io images and the service account that Composer# uses has permission to access the Google Container Registry# (the default service account has permission)image="gcr.io/gcp-runtimes/ubuntu_18_0_4",)
Airflow 1
kubernetes_min_pod=kubernetes_pod_operator.KubernetesPodOperator(# The ID specified for the task.task_id="pod-ex-minimum",# Name of task you want to run, used to generate Pod ID.name="pod-ex-minimum",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# The namespace to run within Kubernetes, default namespace is# `default`. There is the potential for the resource starvation of# Airflow workers and scheduler within the Cloud Composer environment,# the recommended solution is to increase the amount of nodes in order# to satisfy the computing requirements. Alternatively, launching pods# into a custom namespace will stop fighting over resources.namespace="default",# Docker image specified. Defaults to hub.docker.com, but any fully# qualified URLs will point to a custom repository. Supports private# gcr.io images if the Composer Environment is under the same# project-id as the gcr.io images and the service account that Composer# uses has permission to access the Google Container Registry# (the default service account has permission)image="gcr.io/gcp-runtimes/ubuntu_18_0_4",)
kubenetes_template_ex=KubernetesPodOperator(task_id="ex-kube-templates",name="ex-kube-templates",namespace="default",image="bash",# All parameters below are able to be templated with jinja -- cmds,# arguments, env_vars, and config_file. For more information visit:# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# DS in jinja is the execution date as YYYY-MM-DD, this docker image# will echo the execution date. Arguments to the entrypoint. The docker# image's CMD is used if this is not provided. The arguments parameter# is templated.arguments=["{{ ds }}"],# The var template variable allows you to access variables defined in# Airflow UI. In this case we are getting the value of my_value and# setting the environment variable `MY_VALUE`. The pod will fail if# `my_value` is not set in the Airflow UI.env_vars={"MY_VALUE":"{{ var.value.my_value }}"},# Sets the config file to a kubernetes config file specified in# airflow.cfg. If the configuration file does not exist or does# not provide validcredentials the pod will fail to launch. If not# specified, config_file defaults to ~/.kube/configconfig_file="{{ conf.get('core', 'kube_config') }}",)
Airflow 1
kubenetes_template_ex=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-kube-templates",name="ex-kube-templates",namespace="default",image="bash",# All parameters below are able to be templated with jinja -- cmds,# arguments, env_vars, and config_file. For more information visit:# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["echo"],# DS in jinja is the execution date as YYYY-MM-DD, this docker image# will echo the execution date. Arguments to the entrypoint. The docker# image's CMD is used if this is not provided. The arguments parameter# is templated.arguments=["{{ ds }}"],# The var template variable allows you to access variables defined in# Airflow UI. In this case we are getting the value of my_value and# setting the environment variable `MY_VALUE`. The pod will fail if# `my_value` is not set in the Airflow UI.env_vars={"MY_VALUE":"{{ var.value.my_value }}"},# Sets the config file to a kubernetes config file specified in# airflow.cfg. If the configuration file does not exist or does# not provide validcredentials the pod will fail to launch. If not# specified, config_file defaults to ~/.kube/configconfig_file="{{ conf.get('core', 'kube_config') }}",)
DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates タスクが失敗します。ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value)。2 番目のエラーは最初のエラーの修正後に取得できますが、config で core/kube_config が見つからないためタスクが失敗していることを示しています。
secret_env=Secret(# Expose the secret as environment variable.deploy_type="env",# The name of the environment variable, since deploy_type is `env` rather# than `volume`.deploy_target="SQL_CONN",# Name of the Kubernetes Secretsecret="airflow-secrets",# Key of a secret stored in this Secret objectkey="sql_alchemy_conn",)secret_volume=Secret(deploy_type="volume",# Path where we mount the secret as volumedeploy_target="/var/secrets/google",# Name of Kubernetes Secretsecret="service-account",# Key in the form of service account file namekey="service-account.json",)
Airflow 1
secret_env=secret.Secret(# Expose the secret as environment variable.deploy_type="env",# The name of the environment variable, since deploy_type is `env` rather# than `volume`.deploy_target="SQL_CONN",# Name of the Kubernetes Secretsecret="airflow-secrets",# Key of a secret stored in this Secret objectkey="sql_alchemy_conn",)secret_volume=secret.Secret(deploy_type="volume",# Path where we mount the secret as volumedeploy_target="/var/secrets/google",# Name of Kubernetes Secretsecret="service-account",# Key in the form of service account file namekey="service-account.json",)
kubernetes_secret_vars_ex=KubernetesPodOperator(task_id="ex-kube-secrets",name="ex-kube-secrets",namespace="default",image="ubuntu",startup_timeout_seconds=300,# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[secret_env,secret_volume],# env_vars allows you to specify environment variables for your# container to use. env_vars is templated.env_vars={"EXAMPLE_VAR":"/example/value","GOOGLE_APPLICATION_CREDENTIALS":"/var/secrets/google/service-account.json ",},)
Airflow 1
kubernetes_secret_vars_ex=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-kube-secrets",name="ex-kube-secrets",namespace="default",image="ubuntu",startup_timeout_seconds=300,# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[secret_env,secret_volume],# env_vars allows you to specify environment variables for your# container to use. env_vars is templated.env_vars={"EXAMPLE_VAR":"/example/value","GOOGLE_APPLICATION_CREDENTIALS":"/var/secrets/google/service-account.json ",},)
DAG または環境を変更しない場合、ex-kube-secrets タスクは失敗します。ログを確認すると、Pod took too long to start エラーのためタスクが失敗しています。このエラーは、構成(secret_env)で指定された Secret が見つからない場合に発生します。
# Pod affinity with the KubernetesPodOperator# is not supported with Composer 2# instead, create a cluster and use the GKEStartPodOperator# https://cloud.google.com/composer/docs/using-gke-operatorkubernetes_affinity_ex=KubernetesPodOperator(task_id="ex-pod-affinity",name="ex-pod-affinity",namespace="default",image="perl:5.34.0",cmds=["perl"],arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# affinity allows you to constrain which nodes your pod is eligible to# be scheduled on, based on labels on the node. In this case, if the# label 'cloud.google.com/gke-nodepool' with value# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any# nodes, it will fail to schedule.affinity={"nodeAffinity":{# requiredDuringSchedulingIgnoredDuringExecution means in order# for a pod to be scheduled on a node, the node must have the# specified labels. However, if labels on a node change at# runtime such that the affinity rules on a pod are no longer# met, the pod will still continue to run on the node."requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{# When nodepools are created in Google Kubernetes# Engine, the nodes inside of that nodepool are# automatically assigned the label# 'cloud.google.com/gke-nodepool' with the value of# the nodepool's name."key":"cloud.google.com/gke-nodepool","operator":"In",# The label key's value that pods can be scheduled# on."values":["pool-0","pool-1",],}]}]}}},)
Airflow 1
kubernetes_affinity_ex=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-pod-affinity",name="ex-pod-affinity",namespace="default",image="perl:5.34.0",cmds=["perl"],arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# affinity allows you to constrain which nodes your pod is eligible to# be scheduled on, based on labels on the node. In this case, if the# label 'cloud.google.com/gke-nodepool' with value# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any# nodes, it will fail to schedule.affinity={"nodeAffinity":{# requiredDuringSchedulingIgnoredDuringExecution means in order# for a pod to be scheduled on a node, the node must have the# specified labels. However, if labels on a node change at# runtime such that the affinity rules on a pod are no longer# met, the pod will still continue to run on the node."requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{# When nodepools are created in Google Kubernetes# Engine, the nodes inside of that nodepool are# automatically assigned the label# 'cloud.google.com/gke-nodepool' with the value of# the nodepool's name."key":"cloud.google.com/gke-nodepool","operator":"In",# The label key's value that pods can be scheduled# on."values":["pool-0","pool-1",],}]}]}}},)
kubernetes_full_pod=KubernetesPodOperator(task_id="ex-all-configs",name="pi",namespace="default",image="perl:5.34.0",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["perl"],# Arguments to the entrypoint. The docker image's CMD is used if this# is not provided. The arguments parameter is templated.arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[],# Labels to apply to the Pod.labels={"pod-label":"label-name"},# Timeout to start up the Pod, default is 120.startup_timeout_seconds=120,# The environment variables to be initialized in the container# env_vars are templated.env_vars={"EXAMPLE_VAR":"/example/value"},# If true, logs stdout output of container. Defaults to True.get_logs=True,# Determines when to pull a fresh image, if 'IfNotPresent' will cause# the Kubelet to skip pulling an image if it already exists. If you# want to always pull a new image, set it to 'Always'.image_pull_policy="Always",# Annotations are non-identifying metadata you can attach to the Pod.# Can be a large range of data, and can include characters that are not# permitted by labels.annotations={"key1":"value1"},# Optional resource specifications for Pod, this will allow you to# set both cpu and memory limits and requirements.# Prior to Airflow 2.3 and the cncf providers package 5.0.0# resources were passed as a dictionary. This change was made in# https://github.com/apache/airflow/pull/27197# Additionally, "memory" and "cpu" were previously named# "limit_memory" and "limit_cpu"# resources={'limit_memory': "250M", 'limit_cpu': "100m"},container_resources=k8s_models.V1ResourceRequirements(limits={"memory":"250M","cpu":"100m"},),# Specifies path to kubernetes config. If no config is specified will# default to '~/.kube/config'. The config_file is templated.config_file="/home/airflow/composer_kube_config",# If true, the content of /airflow/xcom/return.json from container will# also be pushed to an XCom when the container ends.do_xcom_push=False,# List of Volume objects to pass to the Pod.volumes=[],# List of VolumeMount objects to pass to the Pod.volume_mounts=[],# Affinity determines which nodes the Pod can run on based on the# config. For more information see:# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/# Pod affinity with the KubernetesPodOperator# is not supported with Composer 2# instead, create a cluster and use the GKEStartPodOperator# https://cloud.google.com/composer/docs/using-gke-operatoraffinity={},)
Airflow 1
kubernetes_full_pod=kubernetes_pod_operator.KubernetesPodOperator(task_id="ex-all-configs",name="pi",namespace="default",image="perl:5.34.0",# Entrypoint of the container, if not specified the Docker container's# entrypoint is used. The cmds parameter is templated.cmds=["perl"],# Arguments to the entrypoint. The docker image's CMD is used if this# is not provided. The arguments parameter is templated.arguments=["-Mbignum=bpi","-wle","print bpi(2000)"],# The secrets to pass to Pod, the Pod will fail to create if the# secrets you specify in a Secret object do not exist in Kubernetes.secrets=[],# Labels to apply to the Pod.labels={"pod-label":"label-name"},# Timeout to start up the Pod, default is 120.startup_timeout_seconds=120,# The environment variables to be initialized in the container# env_vars are templated.env_vars={"EXAMPLE_VAR":"/example/value"},# If true, logs stdout output of container. Defaults to True.get_logs=True,# Determines when to pull a fresh image, if 'IfNotPresent' will cause# the Kubelet to skip pulling an image if it already exists. If you# want to always pull a new image, set it to 'Always'.image_pull_policy="Always",# Annotations are non-identifying metadata you can attach to the Pod.# Can be a large range of data, and can include characters that are not# permitted by labels.annotations={"key1":"value1"},# Optional resource specifications for Pod, this will allow you to# set both cpu and memory limits and requirements.# Prior to Airflow 1.10.4, resource specifications were# passed as a Pod Resources Class object,# If using this example on a version of Airflow prior to 1.10.4,# import the "pod" package from airflow.contrib.kubernetes and use# resources = pod.Resources() instead passing a dict# For more info see:# https://github.com/apache/airflow/pull/4551resources={"limit_memory":"250M","limit_cpu":"100m"},# Specifies path to kubernetes config. If no config is specified will# default to '~/.kube/config'. The config_file is templated.config_file="/home/airflow/composer_kube_config",# If true, the content of /airflow/xcom/return.json from container will# also be pushed to an XCom when the container ends.do_xcom_push=False,# List of Volume objects to pass to the Pod.volumes=[],# List of VolumeMount objects to pass to the Pod.volume_mounts=[],# Affinity determines which nodes the Pod can run on based on the# config. For more information see:# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/affinity={},)
KubernetesPodOperator(# config_file parameter - can be skipped if connection contains this settingconfig_file="/home/airflow/composer_kube_config",# definition of connection to be used by the operatorkubernetes_conn_id='kubernetes_default',...)
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
[[["わかりやすい","easyToUnderstand","thumb-up"],["問題の解決に役立った","solvedMyProblem","thumb-up"],["その他","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["翻訳に関する問題","translationIssue","thumb-down"],["その他","otherDown","thumb-down"]],["最終更新日 2024-05-28 UTC。"],[],[]]