Cloud Composer 1 | Cloud Composer 2
このページでは、Google Kubernetes Engine 演算子を使用して Google Kubernetes Engine でクラスタを作成し、それらのクラスタで Kubernetes Pod を起動する方法について説明します。
Google Kubernetes Engine 演算子は、指定したクラスタで Kubernetes Pod を実行しますが、このクラスタは、使用中の環境とは関係のない別のクラスタでも構いません。これに対し、KubernetesPodOperator
は環境のクラスタ内で Kubernetes Pod を実行します。
このページでは、GKECreateClusterOperator
を使用して Google Kubernetes Engine クラスタを作成し、次の構成で GKEStartPodOperator
を使用してから、その後 GKEDeleteClusterOperator
で削除する DAG の例について説明します。
- 最小の構成: 必要なパラメータのみを設定します。
- テンプレートの構成: Jinja でテンプレートを作成できるパラメータを使用します。
- Pod アフィニティの構成: Pod のスケジュールに使用できるノードを制限します。
- すべての構成: すべての構成が含まれます。
始める前に
GKE 演算子の構成
この例の操作を行うために、gke_operator.py
ファイル全体をご使用の環境の dags/
フォルダに配置するか、DAG に関連するコードをに追加します。
クラスタの作成
ここに示すコードでは、2 つのノードプール(pool-0
と pool-1
、それぞれ 1 つのノードを持つ)を持つ Google Kubernetes Engine クラスタを作成します。必要に応じて、body
の一部として Google Kubernetes Engine API から他のパラメータを設定できます。
apache-airflow-providers-google
バージョン 5.1.0 のリリース前は、GKECreateClusterOperator
で node_pools
オブジェクトを渡すことができませんでした。Airflow 2 を使用する場合は、環境で apache-airflow-providers-google
バージョン 5.1.0 以降を使用していることを確認してください。apache-airflow-providers-google
~>=5.1.0
を必要なバージョンとして指定することで、この PyPI パッケージの最新バージョンをインストールできます。
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
CLUSTER = {
"name": CLUSTER_NAME,
"node_pools": [
{"name": "pool-0", "initial_node_count": 1},
{"name": "pool-1", "initial_node_count": 1},
],
}
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
body=CLUSTER,
)
クラスタでワークロードを起動する
次のセクションでは、この例のそれぞれの GKEStartPodOperator
構成について説明します。各構成変数について詳しくは、GKE 演算子ーの Airflow リファレンスをご覧ください。
from airflow import models
from airflow.providers.google.cloud.operators.kubernetes_engine import (
GKECreateClusterOperator,
GKEDeleteClusterOperator,
GKEStartPodOperator,
)
from airflow.utils.dates import days_ago
from kubernetes.client import models as k8s_models
with models.DAG(
"example_gcp_gke",
schedule_interval=None, # Override to match your needs
start_date=days_ago(1),
tags=["example"],
) as dag:
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
CLUSTER = {
"name": CLUSTER_NAME,
"node_pools": [
{"name": "pool-0", "initial_node_count": 1},
{"name": "pool-1", "initial_node_count": 1},
],
}
create_cluster = GKECreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
body=CLUSTER,
)
kubernetes_min_pod = GKEStartPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
kubenetes_template_ex = GKEStartPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
)
kubernetes_affinity_ex = GKEStartPodOperator(
task_id="ex-pod-affinity",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
name="ex-pod-affinity",
namespace="default",
image="perl",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-1",
],
}
]
}
]
}
}
},
)
kubernetes_full_pod = GKEStartPodOperator(
task_id="ex-all-configs",
name="full",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=PROJECT_ID,
location=CLUSTER_REGION,
)
create_cluster >> kubernetes_min_pod >> delete_cluster
create_cluster >> kubernetes_full_pod >> delete_cluster
create_cluster >> kubernetes_affinity_ex >> delete_cluster
create_cluster >> kubenetes_template_ex >> delete_cluster
構成を最小限にしたい
GKE クラスタで GKEStartPodOperator
を使用して Pod を起動するには、project_id
、location
、cluster_name
、name
、namespace
、image
、task_id
オプションは必須です。
次のコード スニペットを DAG に配置すると、前述のパラメータが定義済みで有効である限り、pod-ex-minimum
タスクは成功します。
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_min_pod = GKEStartPodOperator(
# The ID specified for the task.
task_id="pod-ex-minimum",
# Name of task you want to run, used to generate Pod ID.
name="pod-ex-minimum",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# The namespace to run within Kubernetes, default namespace is
# `default`.
namespace="default",
# Docker image specified. Defaults to hub.docker.com, but any fully
# qualified URLs will point to a custom repository. Supports private
# gcr.io images if the Composer Environment is under the same
# project-id as the gcr.io images and the service account that Composer
# uses has permission to access the Google Container Registry
# (the default service account has permission)
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)
テンプレートの構成
Airflow は Jinja テンプレートの使用をサポートしています。必須変数(task_id
、name
、namespace
、image
など)を演算子を使用して宣言する必要があります。次の例で示すように、Jinja を使用してその他のすべてのパラメータ(cmds
、arguments
、env_vars
など)をテンプレート化できます。
DAG または環境を変更しない場合は、ex-kube-templates
タスクが失敗します。この DAG を成功させるには、my_value
という名前の Airflow 変数を設定します。
my_value
と gcloud
または Airflow UI を設定するには、次の手順を行います。
gcloud
Airflow 2 の場合は、次のコマンドを入力します。
gcloud composer environments run ENVIRONMENT \
--location LOCATION \
variables set -- \
my_value example_value
次のように置き換えます。
ENVIRONMENT
を環境の名前にする。LOCATION
は、環境が配置されているリージョン。
Airflow UI
Airflow 2 UI で:
ツールバーで、[管理者] > [変数] を選択します。
[List Variable] ページで、[新しいレコードを追加する] をクリックします。
[Add Variable] ページで、次の情報を入力します。
- Key:
my_value
- Val:
example_value
- Key:
[保存] をクリックします。
テンプレートの構成
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubenetes_template_ex = GKEStartPodOperator(
task_id="ex-kube-templates",
name="ex-kube-templates",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="bash",
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["echo"],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=["{{ ds }}"],
# The var template variable allows you to access variables defined in
# Airflow UI. In this case we are getting the value of my_value and
# setting the environment variable `MY_VALUE`. The pod will fail if
# `my_value` is not set in the Airflow UI.
env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
)
Pod アフィニティの構成
GKEStartPodOperator
で affinity
パラメータを構成する際に、Pod のスケジュールを設定する対象ノードを制御します(特定のノードプール内のノードなど)。クラスタの作成時に、pool-0
と pool-1
という名前の 2 つのノードプールを作成しました。この演算子は、Pod が pool-1
でのみ実行される必要があることを示します。
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_affinity_ex = GKEStartPodOperator(
task_id="ex-pod-affinity",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
name="ex-pod-affinity",
namespace="default",
image="perl",
cmds=["perl"],
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# affinity allows you to constrain which nodes your pod is eligible to
# be scheduled on, based on labels on the node. In this case, if the
# label 'cloud.google.com/gke-nodepool' with value
# 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
# nodes, it will fail to schedule.
affinity={
"nodeAffinity": {
# requiredDuringSchedulingIgnoredDuringExecution means in order
# for a pod to be scheduled on a node, the node must have the
# specified labels. However, if labels on a node change at
# runtime such that the affinity rules on a pod are no longer
# met, the pod will still continue to run on the node.
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
# The label key's value that pods can be scheduled
# on.
"values": [
"pool-1",
],
}
]
}
]
}
}
},
)
すべての構成
この例では、GKEStartPodOperator
で構成できるすべての変数が示されます。ex-all-configs
タスクを成功させるのに、コードを変更する必要はありません。
各変数について詳しくは、GKE 演算子の Airflow リファレンスをご覧ください。
# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_full_pod = GKEStartPodOperator(
task_id="ex-all-configs",
name="full",
project_id=PROJECT_ID,
location=CLUSTER_REGION,
cluster_name=CLUSTER_NAME,
namespace="default",
image="perl:5.34.0",
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=["perl"],
# Arguments to the entrypoint. The docker image's CMD is used if this
# is not provided. The arguments parameter is templated.
arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
# The secrets to pass to Pod, the Pod will fail to create if the
# secrets you specify in a Secret object do not exist in Kubernetes.
secrets=[],
# Labels to apply to the Pod.
labels={"pod-label": "label-name"},
# Timeout to start up the Pod, default is 120.
startup_timeout_seconds=120,
# The environment variables to be initialized in the container
# env_vars are templated.
env_vars={"EXAMPLE_VAR": "/example/value"},
# If true, logs stdout output of container. Defaults to True.
get_logs=True,
# Determines when to pull a fresh image, if 'IfNotPresent' will cause
# the Kubelet to skip pulling an image if it already exists. If you
# want to always pull a new image, set it to 'Always'.
image_pull_policy="Always",
# Annotations are non-identifying metadata you can attach to the Pod.
# Can be a large range of data, and can include characters that are not
# permitted by labels.
annotations={"key1": "value1"},
# Optional resource specifications for Pod, this will allow you to
# set both cpu and memory limits and requirements.
# Prior to Airflow 2.3 and the cncf providers package 5.0.0
# resources were passed as a dictionary. This change was made in
# https://github.com/apache/airflow/pull/27197
# Additionally, "memory" and "cpu" were previously named
# "limit_memory" and "limit_cpu"
# resources={'limit_memory': "250M", 'limit_cpu': "100m"},
container_resources=k8s_models.V1ResourceRequirements(
limits={"memory": "250M", "cpu": "100m"},
),
# If true, the content of /airflow/xcom/return.json from container will
# also be pushed to an XCom when the container ends.
do_xcom_push=False,
# List of Volume objects to pass to the Pod.
volumes=[],
# List of VolumeMount objects to pass to the Pod.
volume_mounts=[],
# Affinity determines which nodes the Pod can run on based on the
# config. For more information see:
# https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
affinity={},
)
クラスタの削除
ここで表示されるコードにより、ガイドの最初から作成されたクラスタが削除されます。
delete_cluster = GKEDeleteClusterOperator(
task_id="delete_cluster",
name=CLUSTER_NAME,
project_id=PROJECT_ID,
location=CLUSTER_REGION,
)