Cloud Composer 1 dalam mode pasca-pemeliharaan. Google tidak merilis update lebih lanjut untuk Cloud Composer 1, termasuk versi baru Airflow, perbaikan bug, dan update keamanan. Sebaiknya rencanakan migrasi ke Cloud Composer 2.
Halaman ini menjelaskan cara menggunakan KubernetesPodOperator untuk men-deploy pod Kubernetes dari Cloud Composer ke cluster Google Kubernetes Engine yang merupakan bagian dari lingkungan Cloud Composer Anda dan untuk memastikan lingkungan Anda memiliki resource yang sesuai.
KubernetesPodOperator meluncurkan pod Kubernetesdi cluster lingkungan Anda. Sebagai perbandingan, operator Google Kubernetes Engine menjalankan pod Kubernetes di cluster tertentu, yang dapat berupa cluster terpisah yang tidak terkait dengan lingkungan Anda. Anda juga dapat membuat dan menghapus cluster menggunakan operator Google Kubernetes Engine.
KubernetesPodOperator adalah opsi yang baik jika Anda memerlukan:
Dependensi Python kustom yang tidak tersedia melalui repositori PyPI publik.
Dependensi biner yang tidak tersedia di image pekerja Cloud Composer stok.
Halaman ini akan memandu Anda melihat contoh DAG Airflow yang mencakup konfigurasi
KubernetesPodOperator berikut:
Saat membuat lingkungan Cloud Composer, Anda harus menentukan parameter performanya, termasuk parameter performa untuk cluster lingkungan. Meluncurkan pod Kubernetes ke cluster lingkungan dapat menyebabkan
persaingan terhadap resource cluster, seperti CPU atau memori. Karena penjadwal dan pekerja Airflow berada di cluster GKE yang sama, penjadwal dan pekerja tidak akan berfungsi dengan baik jika persaingan menyebabkan kekurangan resource.
Untuk mencegah kekurangan sumber daya, lakukan satu atau beberapa tindakan berikut:
Cara yang disukai untuk mencegah kekurangan resource di lingkungan Cloud Composer adalah dengan membuat node pool baru dan mengonfigurasi pod Kubernetes agar dijalankan hanya menggunakan resource dari kumpulan tersebut.
Konsol
Di konsol Google Cloud, buka halaman Environments.
Peningkatan jumlah node di lingkungan Cloud Composer meningkatkan daya komputasi yang tersedia untuk workload Anda. Peningkatan ini tidak menyediakan resource tambahan untuk tugas-tugas yang memerlukan lebih banyak CPU atau RAM daripada yang disediakan oleh jenis mesin yang ditentukan.
Selama pembuatan lingkungan Cloud Composer, Anda dapat menentukan jenis mesin. Guna memastikan resource yang tersedia, tentukan jenis mesin untuk jenis komputasi yang terjadi di lingkungan Cloud Composer Anda.
Bagian berikut menjelaskan setiap konfigurasi KubernetesPodOperator
dalam contoh. Untuk mengetahui informasi tentang setiap variabel konfigurasi, lihat Referensi Airflow.
Aliran udara 2
import datetime
from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
# to using the config file found at `/home/airflow/composer_kube_config if
# no `config_file` parameter is specified. By default it will contain the
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
# created upon environment creation.
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. In Composer 1 there is the potential for
# the resource starvation of Airflow workers and scheduler
# within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources,
# and using Composer 2 will mean the environment will autoscale.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
Aliran udara 1
import datetime
from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = secret.Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
dag_id="composer_sample_kubernetes_pod",
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY,
) as dag:
# Only name, namespace, image, and task_id are required to create a
# KubernetesPodOperator. In Cloud Composer, currently the operator defaults
# to using the config file found at `/home/airflow/composer_kube_config if
# no `config_file` parameter is specified. By default it will contain the
# credentials for Cloud Composer's Google Kubernetes Engine cluster that is
# created upon environment creation.
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 1.10.4, resource specifications were
# passed as a Pod Resources Class object,
# If using this example on a version of Airflow prior to 1.10.4,
# import the "pod" package from airflow.contrib.kubernetes and use
# resources = pod.Resources() instead passing a dict
# For more info see:
# https://github.com/apache/airflow/pull/4551
resources={"limit_memory": "250M", "limit_cpu": "100m"},
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
Konfigurasi minimal
Untuk membuat KubernetesPodOperator, hanya name Pod, namespace tempat menjalankan pod, image untuk digunakan, dan task_id yang diperlukan.
Jika Anda menempatkan cuplikan kode berikut di DAG, konfigurasi akan menggunakan
default di /home/airflow/composer_kube_config. Anda tidak perlu mengubah kode agar tugas pod-ex-minimum berhasil.
Aliran udara 2
kubernetes_min_pod = KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. In Composer 1 there is the potential for
# the resource starvation of Airflow workers and scheduler
# within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources,
# and using Composer 2 will mean the environment will autoscale.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
Aliran udara 1
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`. There is the potential for the resource starvation of
# Airflow workers and scheduler within the Cloud Composer environment,
# the recommended solution is to increase the amount of nodes in order
# to satisfy the computing requirements. Alternatively, launching pods
# into a custom namespace will stop fighting over resources.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
Konfigurasi template
Airflow mendukung penggunaan
Jinja Templating.
Anda harus mendeklarasikan variabel yang diperlukan (task_id, name, namespace,
dan image) dengan operator. Seperti yang ditunjukkan dalam contoh berikut, Anda dapat membuat template semua parameter lainnya dengan Jinja, termasuk cmds, arguments, env_vars, dan config_file.
Aliran udara 2
kubenetes_template_ex = KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
Aliran udara 1
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
# Sets the config file to a kubernetes config file specified in
# airflow.cfg. If the configuration file does not exist or does
# not provide validcredentials the pod will fail to launch. If not
# specified, config_file defaults to ~/.kube/config
config_file="{{ conf.get('core', 'kube_config') }}",
)
Tanpa mengubah DAG atau lingkungan Anda, tugas ex-kube-templates
akan gagal karena dua error. Log menunjukkan bahwa tugas ini gagal karena variabel yang sesuai tidak ada (my_value). Error kedua, yang bisa Anda dapatkan setelah memperbaiki error pertama, menunjukkan bahwa tugas gagal karena core/kube_config tidak ditemukan di config.
Untuk memperbaiki kedua error tersebut, ikuti langkah-langkah yang dijelaskan lebih lanjut.
Untuk menetapkan my_value dengan gcloud atau UI Airflow:
Untuk merujuk ke config_file kustom (file konfigurasi Kubernetes), ganti opsi konfigurasi Airflow kube_config ke konfigurasi Kubernetes yang valid:
Bagian
Kunci
Nilai
core
kube_config
/home/airflow/composer_kube_config
Tunggu beberapa menit sampai lingkungan Anda selesai diperbarui. Kemudian,
jalankan lagi tugas ex-kube-templates dan verifikasi bahwa
tugas ex-kube-templates berhasil.
Konfigurasi variabel secret
Rahasia Kubernetes adalah objek yang berisi data sensitif. Anda dapat meneruskan secret ke pod Kubernetes menggunakan KubernetesPodOperator.
Secret harus ditentukan di Kubernetes. Jika tidak, pod akan gagal diluncurkan.
Contoh ini menunjukkan dua cara penggunaan Secret Kubernetes: sebagai variabel lingkungan, dan sebagai volume yang dipasang oleh pod.
Rahasia pertama, airflow-secrets, ditetapkan ke variabel lingkungan Kubernetes bernama SQL_CONN (bukan variabel lingkungan Airflow atau Cloud Composer).
Rahasia kedua, service-account, memasang service-account.json, file dengan token akun layanan, ke /var/secrets/google.
Rahasianya akan terlihat seperti berikut:
Aliran udara 2
secret_env = Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
Aliran udara 1
secret_env = secret.Secret(
# Expose the secret as environment variable.
deploy_type="env",
# The name of the environment variable, since deploy_type is `env` rather
# than `volume`.
deploy_target="SQL_CONN",
# Name of the Kubernetes Secret
secret="airflow-secrets",
# Key of a secret stored in this Secret object
key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
deploy_type="volume",
# Path where we mount the secret as volume
deploy_target="/var/secrets/google",
# Name of Kubernetes Secret
secret="service-account",
# Key in the form of service account file name
key="service-account.json",
)
Nama rahasia Kubernetes pertama ditentukan dalam variabel secret.
Rahasia khusus ini diberi nama airflow-secrets. ID ini diekspos sebagai
variabel lingkungan, sebagaimana ditentukan oleh deploy_type. Variabel lingkungan yang ditetapkannya, deploy_target, adalah SQL_CONN. Terakhir, key rahasia yang disimpan di deploy_target adalah sql_alchemy_conn.
Nama rahasia Kubernetes kedua ditentukan dalam variabel secret.
Rahasia khusus ini diberi nama service-account. Aktivitas ditampilkan sebagai
volume, sebagaimana ditentukan oleh deploy_type. Jalur file yang akan dipasang,
deploy_target, adalah /var/secrets/google. Terakhir, key rahasia yang
disimpan di deploy_target adalah service-account.json.
Berikut tampilan konfigurasi operator:
Aliran udara 2
kubernetes_secret_vars_ex = KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
Aliran udara 1
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-kube-secrets",
name="ex-kube-secrets",
namespace="default",
image="ubuntu",
startup_timeout_seconds=300,
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[secret_env, secret_volume],
# env_vars allows you to specify environment variables for your
# container to use. env_vars is templated.
env_vars={
"EXAMPLE_VAR": "/example/value",
"GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
},
)
Tanpa membuat perubahan pada DAG atau lingkungan Anda,
tugas ex-kube-secrets
akan gagal. Jika Anda melihat log, tugas akan gagal karena
error Pod took too long to start. Error ini terjadi karena Airflow tidak dapat menemukan rahasia yang ditentukan dalam konfigurasi, secret_env.
gcloud
Untuk menetapkan rahasia menggunakan gcloud:
Dapatkan informasi tentang cluster lingkungan Cloud Composer Anda.
Buat secret Kubernetes yang menetapkan nilai service-account.json ke jalur lokal file kunci akun layanan yang disebut key.json dengan menjalankan perintah berikut:
Setelah menetapkan secret, jalankan kembali tugas ex-kube-secrets di UI Airflow.
Pastikan tugas ex-kube-secrets berhasil.
Konfigurasi afinitas pod
Saat mengonfigurasi parameter affinity di KubernetesPodOperator, Anda
mengontrol node yang akan digunakan untuk menjadwalkan pod, seperti node hanya dalam
kumpulan node tertentu. Dalam contoh ini, operator hanya berjalan pada kumpulan node yang bernama pool-0 dan pool-1. 1 node lingkungan Cloud Composer Anda berada di default-pool, sehingga pod Anda tidak berjalan di node di lingkungan Anda.
Aliran udara 2
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
Aliran udara 1
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-pod-affinity",
name="ex-pod-affinity",
namespace="default",
image="perl:5.34.0",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-0",
"pool-1",
],
}
]
}
]
}
}
},
)
Karena contoh sudah dikonfigurasi saat ini, tugas akan gagal. Jika Anda melihat log, tugas akan gagal karena kumpulan node pool-0 dan pool-1 tidak ada.
Untuk memastikan kumpulan node di values ada, lakukan perubahan konfigurasi berikut:
Jika Anda membuat kumpulan node sebelumnya, ganti pool-0 dan pool-1 dengan nama kumpulan node Anda dan upload DAG Anda lagi.
Buat kumpulan node dengan nama pool-0 atau pool-1. Anda dapat membuat keduanya, tetapi tugas ini hanya membutuhkan salah satu agar berhasil.
Ganti pool-0 dan pool-1 dengan default-pool, yang merupakan kumpulan default yang digunakan Airflow. Kemudian, upload DAG Anda lagi.
Setelah Anda melakukan perubahan, tunggu beberapa menit hingga lingkungan Anda diperbarui.
Kemudian, jalankan tugas ex-pod-affinity lagi dan verifikasi bahwa tugas ex-pod-affinity
berhasil.
Konfigurasi penuh
Contoh ini menunjukkan semua variabel yang dapat Anda konfigurasi di
KubernetesPodOperator. Anda tidak perlu mengubah kode agar
tugas ex-all-configs berhasil.
kubernetes_full_pod = KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
affinity={},
)
Aliran udara 1
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id="ex-all-configs",
name="pi",
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 1.10.4, resource specifications were
# passed as a Pod Resources Class object,
# If using this example on a version of Airflow prior to 1.10.4,
# import the "pod" package from airflow.contrib.kubernetes and use
# resources = pod.Resources() instead passing a dict
# For more info see:
# https://github.com/apache/airflow/pull/4551
resources={"limit_memory": "250M", "limit_cpu": "100m"},
# Specifies path to kubernetes config. If no config is specified will
# default to '~/.kube/config'. The config_file is templated.
config_file="/home/airflow/composer_kube_config",
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
Informasi tentang Penyedia Kubernetes CNCF
GKEStartPodOperator dan KubernetesPodOperator diimplementasikan dalam penyedia apache-airflow-providers-cncf-kubernetes.
Pada paket Penyedia Kubernetes CNCF versi 6.0.0,
koneksi kubernetes_default digunakan secara default di
KubernetesPodOperator.
Jika Anda menentukan koneksi kustom dalam versi 5.0.0, koneksi kustom ini
masih digunakan oleh operator. Untuk beralih kembali menggunakan koneksi
kubernetes_default, Anda mungkin perlu menyesuaikan DAG.
Versi 5.0.0
Versi ini memperkenalkan beberapa perubahan yang tidak kompatibel dengan versi sebelumnya
dibandingkan dengan versi 4.4.0. Hal terpenting yang harus Anda ketahui
berkaitan dengan koneksi kubernetes_default yang tidak digunakan di
versi 5.0.0.
Koneksi kubernetes_default perlu diubah - Jalur konfigurasi Kube harus
disetel ke /home/airflow/composer_kube_config (lihat Gambar 1) atau config_file
perlu ditambahkan ke konfigurasi KubernetesPodOperator (seperti yang
disajikan di bawah).
Ubah kode tugas menggunakan KubernetesPodOperator dengan cara berikut
KubernetesPodOperator(
# config_file parameter - can be skipped if connection contains this setting
config_file="/home/airflow/composer_kube_config",
# definition of connection to be used by the operator
kubernetes_conn_id='kubernetes_default',
...
)
Tinjau log di folder logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE>.
Log pod mendetail di Konsol Google Cloud di bagian workload GKE. Log ini mencakup file YAML definisi pod, peristiwa pod, dan detail pod.
Kode pengembalian bukan nol jika juga menggunakan GKEStartPodOperator
Saat menggunakan KubernetesPodOperator dan GKEStartPodOperator, kode yang ditampilkan dari titik entri container menentukan apakah tugas
dianggap berhasil atau tidak. Kode pengembalian bukan nol menunjukkan kegagalan.
Pola yang umum saat menggunakan KubernetesPodOperator dan
GKEStartPodOperator adalah menjalankan skrip shell sebagai titik entri
container untuk mengelompokkan beberapa operasi dalam container.
Jika Anda menulis skrip semacam itu, sebaiknya sertakan perintah set -e di bagian atas skrip, sehingga perintah yang gagal dalam skrip akan menghentikan skrip dan menerapkan kegagalan tersebut ke instance tugas Airflow.
Waktu tunggu pod
Waktu tunggu default untuk KubernetesPodOperator adalah 120 detik, yang
dapat mengakibatkan waktu tunggu yang terjadi sebelum download gambar yang lebih besar. Anda dapat meningkatkan waktu tunggu dengan mengubah parameter startup_timeout_seconds saat membuat KubernetesPodOperator.
Saat waktu pod habis, log khusus tugas akan tersedia di
UI Airflow. Contoh:
Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Waktu Tunggu Pod juga dapat terjadi jika Akun Layanan Cloud Composer tidak memiliki izin IAM yang diperlukan untuk menjalankan tugas yang sedang dikerjakan. Untuk memverifikasinya, pelajari error level pod menggunakan Dasbor GKE untuk melihat log untuk Beban Kerja tertentu Anda, atau gunakan Cloud Logging.
Gagal membuat koneksi baru
Upgrade otomatis diaktifkan secara default di cluster GKE.
Jika kumpulan node berada di cluster yang sedang diupgrade, Anda mungkin melihat error berikut:
<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused
Untuk memeriksa apakah cluster Anda sedang diupgrade, di Konsol Google Cloud, buka halaman Cluster Kubernetes dan cari ikon pemuatan di samping nama cluster lingkungan Anda.
[[["Mudah dipahami","easyToUnderstand","thumb-up"],["Memecahkan masalah saya","solvedMyProblem","thumb-up"],["Lainnya","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Masalah terjemahan","translationIssue","thumb-down"],["Lainnya","otherDown","thumb-down"]],["Terakhir diperbarui pada 2024-03-30 UTC."],[],[]]