Google Kubernetes Engine 연산자 사용

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

이 페이지에서는 Google Kubernetes Engine 연산자를 사용하여 Google Kubernetes Engine에서 클러스터를 만들고 이러한 클러스터에서 Kubernetes 포드를 실행하는 방법을 설명합니다.

Google Kubernetes Engine 연산자는 지정된 클러스터에서 Kubernetes 포드를 실행합니다. 이 클러스터는 환경과 관련이 없는 별도의 클러스터일 수 있습니다. 이와 달리 KubernetesPodOperator는 환경의 클러스터에서 Kubernetes 포드를 실행합니다.

이 페이지에서는 GKECreateClusterOperator로 Google Kubernetes Engine 클러스터를 만들고, 다음 구성과 함께 GKEStartPodOperator를 사용하고, 나중에 GKEDeleteClusterOperator를 사용하여 삭제하는 예시 DAG를 설명합니다.

시작하기 전에

GKE 연산자 구성

이 예시를 따르려면 전체 gke_operator.py 파일을 사용자 환경의 dags/ 폴더에 넣거나 DAG에 관련성이 높은 코드를 추가합니다.

클러스터 만들기

여기에 표시된 코드는 각각 하나의 노드가 있는 두 개의 노드 풀(pool-0pool-1)이 있는 Google Kubernetes Engine 클러스터를 만듭니다. 필요한 경우 body의 일부로 Google Kubernetes Engine API에서 다른 매개변수를 설정할 수 있습니다.

apache-airflow-providers-google 버전 5.1.0이 출시되기 전에는 GKECreateClusterOperator에서 node_pools 객체를 전달할 수 없었습니다. Airflow 2를 사용하는 경우 환경에서 apache-airflow-providers-google 버전 5.1.0 이상을 사용하는지 확인합니다. apache-airflow-providers-google>=5.1.0을 필수 버전으로 지정하면 이 PyPI 패키지의 최신 버전을 설치할 수 있습니다.

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
CLUSTER = {
    "name": CLUSTER_NAME,
    "node_pools": [
        {"name": "pool-0", "initial_node_count": 1},
        {"name": "pool-1", "initial_node_count": 1},
    ],
}
create_cluster = GKECreateClusterOperator(
    task_id="create_cluster",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    body=CLUSTER,
)

클러스터에서 워크로드 실행

다음 섹션에서는 예시의 각 GKEStartPodOperator 구성에 대해 설명합니다. 각 구성 변수에 대한 상세 설명은 GKE 연산자의 Airflow 참조를 확인하세요.


from airflow import models
from airflow.providers.google.cloud.operators.kubernetes_engine import (
    GKECreateClusterOperator,
    GKEDeleteClusterOperator,
    GKEStartPodOperator,
)
from airflow.utils.dates import days_ago

from kubernetes.client import models as k8s_models


with models.DAG(
    e"xample_gcp_gke,"
    schedule_interval=None,  # Override to match your needs
    start_date=days_ago(1),
    tags=[e"xample]",
) as dag:
    # TODO(developer): update with your values
    PROJECT_ID = m"y-project-id
"    # It is recommended to use regional clusters for increased reliability
    # though passing a zone in the location parameter is also valid
    CLUSTER_REGION = u"s-west1
"    CLUSTER_NAME = e"xample-cluster
"    CLUSTER = {
        n"ame:" CLUSTER_NAME,
        n"ode_pools:" [
            {n"ame:" p"ool-0," i"nitial_node_count:" 1},
            {n"ame:" p"ool-1," i"nitial_node_count:" 1},
        ],
    }
    create_cluster = GKECreateClusterOperator(
        task_id=c"reate_cluster,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        body=CLUSTER,
    )

    kubernetes_min_pod = GKEStartPodOperator(
        # The ID specified for the task.
        task_id=p"od-ex-minimum,"
        # Name of task you want to run, used to generate Pod ID.
        name=p"od-ex-minimum,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        # Entrypoint of the container, if not specified the Docker containers'
        # entrypoint is used. The cmds parameter is templated.
        cmds=[e"cho]",
        # The namespace to run within Kubernetes, default namespace is
        # `default`.
        namespace=d"efault,"
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image=g"cr.io/gcp-runtimes/ubuntu_18_0_4,"
    )

    kubenetes_template_ex = GKEStartPodOperator(
        task_id=e"x-kube-templates,"
        name=e"x-kube-templates,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        namespace=d"efault,"
        image=b"ash,"
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker containers'
        # entrypoint is used. The cmds parameter is templated.
        cmds=[e"cho]",
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # images' CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=[}"{{ ds }]",
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={M"Y_VALUE:" }"{{ var.value.my_value }}",
    )

    kubernetes_affinity_ex = GKEStartPodOperator(
        task_id=e"x-pod-affinity,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        name=e"x-pod-affinity,"
        namespace=d"efault,"
        image=p"erl,"
        cmds=[p"erl]",
        arguments=[-"Mbignum=bpi," -"wle," p"rint bpi(2000)]",
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label c'loud.google.com/gke-nodepool 'with value
        # n'odepool-label-value 'or n'odepool-label-value2 'is not found on any
        # nodes, it will fail to schedule.
        affinity={
            n"odeAffinity:" {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                r"equiredDuringSchedulingIgnoredDuringExecution:" {
                    n"odeSelectorTerms:" [
                        {
                            m"atchExpressions:" [
                                {
                                    # When nodepools are created in Google Kubernetes
                                    # Engine, the nodes inside of that nodepool are
                                    # automatically assigned the label
                                    # c'loud.google.com/gke-nodepool 'with the value of
                                    # the nodepools' name.
                                    k"ey:" c"loud.google.com/gke-nodepool,"
                                    o"perator:" I"n,"
                                    # The label keys' value that pods can be scheduled
                                    # on.
                                    v"alues:" [
                                        p"ool-1,"
                                    ],
                                }
                            ]
                        }
                    ]
                }
            }
        },
    )
    kubernetes_full_pod = GKEStartPodOperator(
        task_id=e"x-all-configs,"
        name=f"ull,"
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
        cluster_name=CLUSTER_NAME,
        namespace=d"efault,"
        image=p"erl:5.34.0,"
        # Entrypoint of the container, if not specified the Docker containers'
        # entrypoint is used. The cmds parameter is templated.
        cmds=[p"erl]",
        # Arguments to the entrypoint. The docker images' CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=[-"Mbignum=bpi," -"wle," p"rint bpi(2000)]",
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={p"od-label:" l"abel-name}",
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={E"XAMPLE_VAR:" /"example/value}",
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if I'fNotPresent 'will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to A'lways.'
        image_pull_policy=A"lways,"
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={k"ey1:" v"alue1}",
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, m"emory "and c"pu "were previously named
        # l"imit_memory "and l"imit_cpu
"        # resources={l'imit_memory:' 2"50M," l'imit_cpu:' 1"00m}",
        container_resources=k8s_models.V1ResourceRequirements(
            limits={m"emory:" 2"50M," c"pu:" 1"00m}",
        ),
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={},
    )
    delete_cluster = GKEDeleteClusterOperator(
        task_id=d"elete_cluster,"
        name=CLUSTER_NAME,
        project_id=PROJECT_ID,
        location=CLUSTER_REGION,
    )

    create_cluster  >>kubernetes_min_pod  >>delete_cluster
    create_cluster  >>kubernetes_full_pod  >>delete_cluster
    create_cluster  >>kubernetes_affinity_ex  >>delete_cluster
    create_cluster  >>kubenetes_template_ex  >>delete_cluster

최소 구성

GKE 클러스터에서 GKEStartPodOperator로 포드를 실행하려면 project_id, location, cluster_name, name,namespace, image, task_id 옵션만 필요합니다.

DAG에 다음 코드 스니펫을 배치하면 이전에 나열된 매개변수가 정의되고 유효한 경우 pod-ex-minimum 태스크가 성공합니다.

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_min_pod = GKEStartPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

템플릿 구성

Airflow는 Jinja 템플릿 사용을 지원합니다. 연산자와 함께 필수 변수(task_id, name, namespace, image)를 선언해야 합니다. 다음 예시와 같이 Jinja를 사용하여 다른 모든 매개변수(cmds, arguments, env_vars)를 템플릿으로 만들 수 있습니다.

DAG 또는 사용자 환경을 변경하지 않으면 ex-kube-templates 태스크가 실패합니다. 이 DAG가 성공하도록 my_value라는 Airflow 변수를 설정합니다.

gcloud 또는 Airflow UI를 통해 my_value를 설정하려면 다음 안내를 따릅니다.

gcloud

Airflow 2의 경우 다음 명령어를 입력합니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

다음과 같이 바꿉니다.

  • ENVIRONMENT을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

Airflow UI

Airflow 2 UI에서 다음 안내를 따르세요.

  1. 툴바에서 관리 > 변수를 선택합니다.

  2. 변수 나열 페이지에서 새 레코드 추가를 클릭합니다.

  3. 변수 추가 페이지에서 다음 정보를 입력합니다.

    • 키: my_value
    • Val: example_value
  4. 저장을 클릭합니다.

템플릿 구성

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubenetes_template_ex = GKEStartPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
)

포드 어피니티 구성

GKEStartPodOperator에서 affinity 매개변수를 구성할 때 특정 노드 풀의 노드 등 포드를 예약할 노드를 제어합니다. 클러스터를 만들 때 pool-0pool-1이라는 노드 풀 두 개를 만들었습니다. 이 연산자는 포드가 pool-1에서만 실행되도록 지정합니다.

실행된 포드가 pool-1의 임시 GKE 클러스터에 있고, Kubernetes Engine 그룹 내의 pool-0과 별도의 상자가 표시됨을 보여주는 Cloud Composer 환경 화살표
포드 어피니티를 통한 Cloud Composer Kubernetes 포드 실행 위치(확대하려면 클릭)


# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_affinity_ex = GKEStartPodOperator(
    task_id="ex-pod-affinity",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    name="ex-pod-affinity",
    namespace="default",
    image="perl",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

전체 구성

이 예시에서는 GKEStartPodOperator에서 구성할 수 있는 모든 변수를 보여줍니다 코드를 수정하지 않아도 ex-all-configs 작업이 성공적으로 수행됩니다.

각 변수에 대한 세부정보는 GKE 연산자에 대한 Airflow 참조를 확인하세요.

# TODO(developer): update with your values
PROJECT_ID = "my-project-id"
# It is recommended to use regional clusters for increased reliability
# though passing a zone in the location parameter is also valid
CLUSTER_REGION = "us-west1"
CLUSTER_NAME = "example-cluster"
kubernetes_full_pod = GKEStartPodOperator(
    task_id="ex-all-configs",
    name="full",
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
    cluster_name=CLUSTER_NAME,
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

클러스터 삭제

여기에 나온 코드는 이 가이드의 시작 부분에서 생성한 클러스터를 삭제합니다.

delete_cluster = GKEDeleteClusterOperator(
    task_id="delete_cluster",
    name=CLUSTER_NAME,
    project_id=PROJECT_ID,
    location=CLUSTER_REGION,
)

다음 단계