이 페이지에서는 Google Kubernetes Engine 연산자를 사용하여 Google Kubernetes Engine에서 클러스터를 만들고 이러한 클러스터에서 Kubernetes 포드를 실행하는 방법을 설명합니다.
Google Kubernetes Engine 연산자는 지정된 클러스터에서 Kubernetes 포드를 실행합니다. 이 클러스터는 환경과 관련이 없는 별도의 클러스터일 수 있습니다.
이와 달리 KubernetesPodOperator는 환경의 클러스터에서 Kubernetes 포드를 실행합니다.
이 페이지에서는 GKECreateClusterOperator로 Google Kubernetes Engine 클러스터를 만들고, 다음 구성과 함께 GKEStartPodOperator를 사용하고, 나중에 GKEDeleteClusterOperator를 사용하여 삭제하는 예시 DAG를 설명합니다.
이 예시를 따르려면 전체 gke_operator.py 파일을 사용자 환경의 dags/ 폴더에 넣거나 DAG에 관련성이 높은 코드를 추가합니다.
클러스터 만들기
여기에 표시된 코드는 각각 하나의 노드가 있는 두 개의 노드 풀(pool-0 및 pool-1)이 있는 Google Kubernetes Engine 클러스터를 만듭니다. 필요한 경우 body의 일부로 Google Kubernetes Engine API에서 다른 매개변수를 설정할 수 있습니다.
apache-airflow-providers-google 버전 5.1.0이 출시되기 전에는 GKECreateClusterOperator에서 node_pools 객체를 전달할 수 없었습니다. Airflow 2를 사용하는 경우 환경에서 apache-airflow-providers-google 버전 5.1.0 이상을 사용하는지 확인합니다. apache-airflow-providers-google 및 >=5.1.0을 필수 버전으로 지정하면 이 PyPI 패키지의 최신 버전을 설치할 수 있습니다.
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
CLUSTER = {
"name": CLUSTER_NAME,
"node_pools": [
{"name": "pool-0", "initial_node_count": 1},
{"name": "pool-1", "initial_node_count": 1},
],
}
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
body=CLUSTER,
)
클러스터에서 워크로드 실행
다음 섹션에서는 예시의 각 GKEStartPodOperator 구성에 대해 설명합니다. 각 구성 변수에 대한 상세 설명은 GKE 연산자의 Airflow 참조를 확인하세요.
from airflow import models
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKECreateClusterOperator,
GKEDeleteClusterOperator,
GKEStartPodOperator,
)
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s_models
with models.DAG(
"example_gcp_gke",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
CLUSTER = {
"name": CLUSTER_NAME,
"node_pools": [
{"name": "pool-0", "initial_node_count": 1},
{"name": "pool-1", "initial_node_count": 1},
],
}
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
body=CLUSTER,
)
kubernetes_min_pod = GKEStartPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = GKEStartPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
)
kubernetes_affinity_ex = GKEStartPodOperator(
task_id="ex-pod-affinity",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
name="ex-pod-affinity",
namespace="default",
image="perl",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = GKEStartPodOperator(
task_id="ex-all-configs",
name="full",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=PROJECT_ID,
location=CLUSTER_REGION,
)
create_cluster >> kubernetes_min_pod >> delete_cluster
create_cluster >> kubernetes_full_pod >> delete_cluster
create_cluster >> kubernetes_affinity_ex >> delete_cluster
create_cluster >> kubenetes_template_ex >> delete_cluster
DAG에 다음 코드 스니펫을 배치하면 이전에 나열된 매개변수가 정의되고 유효한 경우 pod-ex-minimum 태스크가 성공합니다.
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_min_pod = GKEStartPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
템플릿 구성
Airflow는 Jinja 템플릿 사용을 지원합니다.
연산자와 함께 필수 변수(task_id, name, namespace, image)를 선언해야 합니다. 다음 예시와 같이 Jinja를 사용하여 다른 모든 매개변수(cmds, arguments, env_vars)를 템플릿으로 만들 수 있습니다.
DAG 또는 사용자 환경을 변경하지 않으면 ex-kube-templates 태스크가 실패합니다. 이 DAG가 성공하도록 my_value라는 Airflow 변수를 설정합니다.
gcloud 또는 Airflow UI를 통해 my_value를 설정하려면 다음 안내를 따릅니다.
gcloud
Airflow 2의 경우 다음 명령어를 입력합니다.
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
다음과 같이 바꿉니다.
ENVIRONMENT을 환경 이름으로 바꿉니다.
LOCATION을 환경이 위치한 리전으로 바꿉니다.
Airflow UI
Airflow 2 UI에서 다음 안내를 따르세요.
툴바에서 관리 > 변수를 선택합니다.
변수 나열 페이지에서 새 레코드 추가를 클릭합니다.
변수 추가 페이지에서 다음 정보를 입력합니다.
키: my_value
Val: example_value
저장을 클릭합니다.
템플릿 구성
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubenetes_template_ex = GKEStartPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
)
포드 어피니티 구성
GKEStartPodOperator에서 affinity 매개변수를 구성할 때 특정 노드 풀의 노드 등 포드를 예약할 노드를 제어합니다. 클러스터를 만들 때 pool-0 및 pool-1이라는 노드 풀 두 개를 만들었습니다. 이 연산자는 포드가 pool-1에서만 실행되도록 지정합니다.
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_affinity_ex = GKEStartPodOperator(
task_id="ex-pod-affinity",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
name="ex-pod-affinity",
namespace="default",
image="perl",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-1",
],
}
]
}
]
}
}
},
)
전체 구성
이 예시에서는 GKEStartPodOperator에서 구성할 수 있는 모든 변수를 보여줍니다 코드를 수정하지 않아도 ex-all-configs 작업이 성공적으로 수행됩니다.
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_full_pod = GKEStartPodOperator(
task_id="ex-all-configs",
name="full",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
[[["이해하기 쉬움","easyToUnderstand","thumb-up"],["문제가 해결됨","solvedMyProblem","thumb-up"],["기타","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["번역 문제","translationIssue","thumb-down"],["기타","otherDown","thumb-down"]],["최종 업데이트: 2024-09-01(UTC)"],[],[]]