Cloud Composer 2 は Workload Identity で GKE クラスタを使用します。デフォルトでは、新しく作成された名前空間または composer-user-workloads 名前空間で実行される Pod は Google Cloud リソースにアクセスできません。Workload Identity を使用する場合は、名前空間に関連付けられた Kubernetes サービス アカウントを Google Cloud サービス アカウントにマッピングして、Google API やその他のサービスへのリクエストに対してサービス ID の承認を有効にする必要があります。
このため、環境のクラスタ内で composer-user-workloads 名前空間または新しく作成された名前空間で Pod を実行すると、Kubernetes と Google Cloud サービス アカウント間の適切な IAM バインディングが作成されず、これらの Pod は、Google Cloud プロジェクトのリソースにアクセスできません。
Pod が Google Cloud リソースにアクセスできるようにするには、後で説明するように、composer-user-workloads 名前空間を使用するか、独自の名前空間を作成します。
"""Example DAG using KubernetesPodOperator."""
import datetime
from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, the config file found at
# `/home/airflow/composer_kube_config` contains credentials for
# Cloud Composer's Google Kubernetes Engine cluster that is created
# upon environment creation.
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes. In Composer 2 environments
# after December 2022, the default namespace is
# `composer-user-workloads`.
namespace="composer-user-workloads",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
# Specifies path to kubernetes config. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
)
kubernetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="composer-user-workloads",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
)
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="composer-user-workloads",
image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
cmds=["echo"],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
},
# Specifies path to kubernetes config. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
)
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="composer-user-workloads",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 600.
startup_timeout_seconds=600,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
),
# Specifies path to kubernetes config. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
次のコード スニペットを DAG に配置すると、/home/airflow/composer_kube_config のデフォルトが構成に使用されます。pod-ex-minimum タスクを成功させるのに、コードを変更する必要はありません。
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes. In Composer 2 environments
# after December 2022, the default namespace is
# `composer-user-workloads`.
namespace="composer-user-workloads",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
# Specifies path to kubernetes config. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
)
kubernetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="composer-user-workloads",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
)
DAG または環境を変更しない場合は、2 つのエラーが発生して ex-kube-templates タスクが失敗します。ログによると、このタスクは適切な変数が存在しないため失敗しています(my_value)。2 番目のエラーは最初のエラーの修正後に取得できますが、config で core/kube_config が見つからないためタスクが失敗していることを示しています。
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="composer-user-workloads",
image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
cmds=["echo"],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
},
# Specifies path to kubernetes config. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
)
DAG または環境を変更しない場合、ex-kube-secrets タスクは失敗します。ログを確認すると、Pod took too long to start エラーのためタスクが失敗しています。このエラーは、構成(secret_env)で指定された Secret が見つからない場合に発生します。
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="composer-user-workloads",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 600.
startup_timeout_seconds=600,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
),
# Specifies path to kubernetes config. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Identifier of connection that should be used
kubernetes_conn_id="kubernetes_default",
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Pod がタイムアウトすると、Airflow UI にタスク固有のログが表示されます。次に例を示します。
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
手元のタスクを実行するために Cloud Composer サービス アカウントに必要な IAM 権限がない場合でも、Pod のタイムアウトは発生する場合があります。この状態を確認するには、GKE ダッシュボードで特定のワークロードのログを調べて Pod レベルのエラーを確認するか、Cloud Logging を使用します。
[[["わかりやすい","easyToUnderstand","thumb-up"],["問題の解決に役立った","solvedMyProblem","thumb-up"],["その他","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["翻訳に関する問題","translationIssue","thumb-down"],["その他","otherDown","thumb-down"]],["最終更新日 2024-05-28 UTC。"],[],[]]