KubernetesPodOperator 사용하기

이 페이지에서는 KubernetesPodOperator를 사용하여 Cloud Composer에서 Kubernetes pod를 Cloud Composer 환경의 일부인 Google Kubernetes Engine 클러스터로 실행하고 사용자 환경에 적절한 리소스가 있는지 확인하는 방법을 설명합니다.

실행된 pod가 Airflow Workers, Redis, Airflow Scheduler, Cloud SQL 프록시와 동일한 Kubernetes Engine에 위치할 것을 나타내는 화살표가 있는 테넌트 프로젝트 및 고객 프로젝트의 Cloud Composer 환경 리소스
Cloud Composer Kubernetes Pod 실행 위치(확대하려면 클릭)

KubernetesPodOperator는 다음이 필요한 경우에 적합한 옵션입니다.

  • 공개 PyPI 저장소를 통해 사용할 수 없는 커스텀 Python 종속 항목
  • 스톡 Cloud Composer 작업자 이미지에서 사용할 수 없는 바이너리 종속 항목

이 페이지에서는 다음 KubernetesPodOperator 구성을 포함하는 예시 DAG를 안내합니다.

시작하기 전에

사용자 환경에 적합한 리소스 확인

Cloud Composer 환경을 만들 때 환경에 대한 컴퓨팅 성능의 양을 지정하면 특정 양이 GKE 클러스터에 할당됩니다. 환경에 Kubernetes pod를 실행하면 CPU나 메모리와 같은 리소스 프로그램 간 경쟁이 발생할 수 있습니다. Airflow 스케줄러와 작업자가 동일한 GKE 클러스터에 있기 때문에 경쟁으로 인해 리소스 부족이 발생하면 스케줄러와 작업자가 제대로 작동하지 않습니다.

리소스 부족을 방지하려면 다음 작업 중 하나 이상을 수행합니다.

노드 풀 만들기

Cloud Composer 환경에서 리소스 부족을 방지하려면 새 노드 풀을 만들고 Kubernetes pod를 구성하여 해당 풀의 리소스만 사용하는 것이 좋습니다.

기존 클러스터에서 노드 풀을 만들려면 다음 단계를 수행합니다.

콘솔

  1. Cloud Console에서 GKE 메뉴로 이동합니다.

    GKE 메뉴로 이동

  2. 원하는 클러스터를 선택합니다.

  3. 수정을 클릭합니다.

  4. 노드 풀 메뉴에서 노드 풀 추가를 클릭합니다.

  5. 노드 풀을 구성합니다.

  6. (선택사항) 자동 업그레이드 및 자동 확장과 같은 고급 옵션을 사용 설정합니다.

  7. 저장을 클릭합니다.

gcloud

다음 명령어를 입력합니다.

gcloud container node-pools create POOL_NAME \
    --cluster CLUSTER_NAME \
    --project PROJECT_ID \
    --zone ZONE \
    ADDITIONAL_FLAGS 

각 항목의 의미는 다음과 같습니다.

  • POOL_NAME은 원하는 노드 풀의 이름입니다.
  • CLUSTER는 노드 풀을 만들 클러스터의 이름입니다.
  • PROJECT_ID는 Cloud Composer 프로젝트 이름입니다.
  • ZONE은 GKE 클러스터가 위치한 영역입니다.

    옵션 목록은 gcloud container node-pools create 문서를 참조하세요.

    성공적인 node-pools create 요청은 노드 풀 정보를 반환합니다.

    Creating node pool example-pool...done.
    Created [https://container.googleapis.com/v1/projects/kubernetes-engine-docs/zones/us-central1-f/clusters/example-cluster/nodePools/example-pool].
    NAME          MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
    example-pool  n1-standard-1  100           1.2.4

사용자 환경에서 노드 수 늘리기

Cloud Composer 환경에서 노드 수를 늘리면 작업자가 사용할 수 있는 컴퓨팅 성능이 향상됩니다. 이렇게 해도 지정된 머신 유형이 제공하는 것보다 많은 CPU 또는 RAM이 필요한 작업에 추가적인 리소스가 제공되지 않습니다.

노드 수를 늘리려면 환경을 업데이트하세요.

적절한 머신 유형 지정

Cloud Composer 환경을 생성하는 동안 머신 유형을 지정할 수 있습니다. 가용 리소스를 확보하려면 Cloud Composer 환경에서 발생하는 컴퓨팅 유형에 적합한 머신 유형을 지정합니다.

KubernetesPodOperator 구성

이 예시를 따르려면 전체 kubernetes_pod_operator.py 파일을 사용자 환경의 dgas/ 폴더에 넣거나 DAG에 관련성이 높은 KubernetesPodOperator 코드를 추가합니다.

다음 섹션에서는 예시의 각 KubernetesPodOperator 구성에 대해 설명합니다. 각 구성 변수에 대한 자세한 내용은 Airflow 참조를 확인하세요.

import datetime

from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, currently the operator defaults
    # to using the config file found at `/home/airflow/composer_kube_config if
    # no `config_file` parameter is specified. By default it will contain the
    # credentials for Cloud Composer's Google Kubernetes Engine cluster that is
    # created upon environment creation.

    kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
        # The ID specified for the task.
        task_id='pod-ex-minimum',
        # Name of task you want to run, used to generate Pod ID.
        name='pod-ex-minimum',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # The namespace to run within Kubernetes, default namespace is
        # `default`. There is the potential for the resource starvation of
        # Airflow workers and scheduler within the Cloud Composer environment,
        # the recommended solution is to increase the amount of nodes in order
        # to satisfy the computing requirements. Alternatively, launching pods
        # into a custom namespace will stop fighting over resources.
        namespace='default',
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}")
    kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            'EXAMPLE_VAR': '/example/value',
            'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})
    kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-pod-affinity',
        name='ex-pod-affinity',
        namespace='default',
        image='perl',
        cmds=['perl'],
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # affinity allows you to constrain which nodes your pod is eligible to
        # be scheduled on, based on labels on the node. In this case, if the
        # label 'cloud.google.com/gke-nodepool' with value
        # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
        # nodes, it will fail to schedule.
        affinity={
            'nodeAffinity': {
                # requiredDuringSchedulingIgnoredDuringExecution means in order
                # for a pod to be scheduled on a node, the node must have the
                # specified labels. However, if labels on a node change at
                # runtime such that the affinity rules on a pod are no longer
                # met, the pod will still continue to run on the node.
                'requiredDuringSchedulingIgnoredDuringExecution': {
                    'nodeSelectorTerms': [{
                        'matchExpressions': [{
                            # When nodepools are created in Google Kubernetes
                            # Engine, the nodes inside of that nodepool are
                            # automatically assigned the label
                            # 'cloud.google.com/gke-nodepool' with the value of
                            # the nodepool's name.
                            'key': 'cloud.google.com/gke-nodepool',
                            'operator': 'In',
                            # The label key's value that pods can be scheduled
                            # on.
                            'values': [
                                'pool-0',
                                'pool-1',
                            ]
                        }]
                    }]
                }
            }
        })
    kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-all-configs',
        name='pi',
        namespace='default',
        image='perl',
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['perl'],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={'pod-label': 'label-name'},
        # Timeout to start up the Pod, default is 120.
        startup_timeout_seconds=120,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={'EXAMPLE_VAR': '/example/value'},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy='Always',
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={'key1': 'value1'},
        # Resource specifications for Pod, this will allow you to set both cpu
        # and memory limits and requirements.
        # Prior to Airflow 1.10.4, resource specifications were
        # passed as a Pod Resources Class object,
        # If using this example on a version of Airflow prior to 1.10.4,
        # import the "pod" package from airflow.contrib.kubernetes and use
        # resources = pod.Resources() instead passing a dict
        # For more info see:
        # https://github.com/apache/airflow/pull/4551
        resources={'limit_memory': 1, 'limit_cpu': 1},
        # Specifies path to kubernetes config. If no config is specified will
        # default to '~/.kube/config'. The config_file is templated.
        config_file='/home/airflow/composer_kube_config',
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        affinity={})

최소 구성

KubernetesPodOperator을 만들려면 name, namespace, image, task_id만이 필요합니다.

DAG에 다음 코드 스니펫을 배치하면 구성에서 /home/airflow/composer_kube_config의 기본값을 사용합니다. 코드를 수정하지 않아도 pod-ex-minimum 작업이 성공적으로 수행됩니다.

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id='pod-ex-minimum',
    # Name of task you want to run, used to generate Pod ID.
    name='pod-ex-minimum',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace='default',
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image='gcr.io/gcp-runtimes/ubuntu_18_0_4')

템플릿 구성

Airflow는 Jinja 템플릿 사용을 지원합니다. 연산자와 함께 필수 변수(task_id, name, namespace, image)를 선언해야 합니다. 다음 예시와 같이 Jinja를 사용하여 다른 모든 매개변수(cmds, arguments, env_vars, config_file)를 템플릿으로 만들 수 있습니다.

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-templates',
    name='ex-kube-templates',
    namespace='default',
    image='bash',
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/code.html#default-variables

    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['echo'],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=['{{ ds }}'],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}")

DAG 또는 사용자 환경을 변경하지 않으면 두 오류로 인해 ex-kube-templates 작업이 실패합니다. 작업을 디버그하고 오류를 해결하는 방법을 살펴보겠습니다.

  1. ex-kube-templates 작업 실패를 확인합니다.
  2. ex-kube-templates 작업 로그를 확인합니다.

    로그는 적합한 변수(my_value)가 존재하지 않아 이 작업이 실패함을 보여줍니다.

  3. gcloud 또는 Airflow UI를 통해 my_value를 설정하려면 다음 안내를 따릅니다.

    gcloud

    다음 명령어를 입력합니다.

    gcloud composer environments run ENVIRONMENT \
        --location LOCATION \
        variables -- \
        --set my_value example_value 

    각 항목의 의미는 다음과 같습니다.

    • ENVIRONMENT는 Cloud Composer 환경의 이름입니다.
    • LOCATION은 Cloud Composer 환경이 위치한 리전입니다.

    Airflow UI

    1. 툴바에서 관리 > 변수를 클릭합니다.
    2. 만들기를 클릭합니다.
    3. 다음 정보를 입력합니다.
      • 키: my_value
      • Val: example_value
    4. 저장을 클릭합니다.
  4. ex-kube-templates 작업을 다시 실행합니다.

  5. ex-kube-templates 작업 상태를 확인합니다.

    여전히 ex-kube-templates 작업이 실패합니다. 로그를 살펴보면 현재 configcore/kubeconfig가 없기 때문에 작업이 실패합니다. 커스텀 config_file(Kubernetes 구성 파일)을 참조하려면 airflow.cfg 파일의 kube_config 변수를 유효한 Kubernetes 구성으로 설정해야 합니다.

  6. kube_config 변수를 설정하려면 다음 명령어를 입력합니다.

    gcloud composer environments update ENVIRONMENT \
        --location LOCATION \
        --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

    각 항목의 의미는 다음과 같습니다.

    • ENVIRONMENT는 Cloud Composer 환경의 이름입니다.
    • LOCATION은 Cloud Composer 환경이 위치한 리전입니다.
  7. 사용자 환경이 업데이트될 때까지 몇 분 정도 기다립니다.

  8. ex-kube-templates 작업을 다시 실행합니다.

  9. ex-kube-templates 작업 성공을 확인합니다.

보안 비밀 변수 구성

Kubernetes 보안 비밀은 소량의 민감한 정보를 포함하는 객체입니다. KubernetesPodOperator를 사용하여 보안 비밀을 Kubernetes pod에 전달할 수 있습니다. 보안 비밀이 Kubernetes에 정의되어 있지 않으면 pod가 실행되지 않습니다.

이 예시에서는 Kubernetes 보안 비밀 airflow-secretsSQL_CONN이라는 Kubernetes 환경 변수(Airflow 또는 Cloud Composer 환경 변수와 반대)에 배포합니다.

보안 비밀은 다음과 같습니다.

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type='env',
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target='SQL_CONN',
    # Name of the Kubernetes Secret
    secret='airflow-secrets',
    # Key of a secret stored in this Secret object
    key='sql_alchemy_conn')
secret_volume = secret.Secret(
    'volume',
    # Path where we mount the secret as volume
    '/var/secrets/google',
    # Name of Kubernetes Secret
    'service-account',
    # Key in the form of service account file name
    'service-account.json')

Kubernetes 보안 비밀의 이름은 secret 변수에 정의됩니다. 이 특정 보안 비밀의 이름은 airflow-secrets입니다. deploy_type의 지시에 따라 환경 변수로 표시됩니다. 설정되는 환경 변수 deploy_targetSQL_CONN입니다. 마지막으로, deploy_target에 저장하고 있는 보안 비밀의 keysql_alchemy_conn입니다.

연산자 구성은 다음과 같습니다.

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-kube-secrets',
    name='ex-kube-secrets',
    namespace='default',
    image='ubuntu',
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        'EXAMPLE_VAR': '/example/value',
        'GOOGLE_APPLICATION_CREDENTIALS': '/var/secrets/google/service-account.json'})

DAG 또는 사용자 환경을 변경하지 않으면 ex-kube-secrets 작업이 실패합니다. 작업을 디버그하고 오류를 해결하는 방법을 살펴보겠습니다.

  1. ex-kube-secrets 작업 실패를 확인합니다.
  2. ex-kube-secrets 작업 로그를 확인합니다.

    로그를 살펴보면 Pod took too long to start 오류로 인해 작업이 실패함을 알 수 있습니다. 이 오류는 Airflow가 구성에 지정된 보안 비밀 secret_env를 찾을 수 없기 때문에 발생합니다.

  3. gcloud를 사용하여 보안 비밀을 설정하는 방법은 다음과 같습니다.

    1. 다음 명령어를 실행하여 Cloud Composer 환경에 대한 세부정보를 확인합니다.

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"

      각 매개변수는 다음과 같습니다.

      • ENVIRONMENT는 Cloud Composer 환경의 이름입니다.
      • LOCATION은 Cloud Composer 환경이 위치한 리전입니다.

        projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id>와 유사한 출력이 반환됩니다.

    2. GKE 클러스터 ID를 가져오려면 출력을 복사하여 나중에 가져올 수 있는 위치의 /clusters/ 다음(-gke로 끝남)에 붙여넣습니다. 이 출력은 클러스터 ID입니다.

    3. 영역을 가져오려면 출력을 복사하여 나중에 가져올 수 있는 위치의 /zones/ 다음에 붙여넣습니다.

    4. 다음 명령어를 실행하여 GKE 클러스터에 연결합니다.

      gcloud container clusters get-credentials CLUSTER_ID \
      --zone ZONE \
      --project PROJECT
      

      각 항목의 의미는 다음과 같습니다.

      • CLUSTER_ID는 GKE 클러스터 ID입니다.
      • ZONE은 GKE가 위치한 영역입니다.
      • PROJECT는 Google Cloud 프로젝트의 ID입니다.
    5. 다음 명령어를 실행하여 sql_alchemy_conn 값을 test_value로 설정하는 Kubernetes 보안 비밀을 생성합니다.

      kubectl create secret generic airflow-secrets \
      --from-literal sql_alchemy_conn=test_value
      
  4. 보안 비밀을 설정한 후 ex-kube-secrets 작업을 다시 실행합니다.

  5. ex-kube-secrets 작업 성공을 확인합니다.

Pod 어피니티 구성

KubernetesPodOperator에서 affinity 매개변수를 구성할 때 특정 노드 풀의 노드 등 pod를 예약할 노드를 제어합니다. 이 예시에서 연산자는 pool-0pool-1이라는 이름의 노드 풀에서만 실행됩니다.

실행된 pod가 Airflow Workers, Redis, Airflow Scheduler, Cloud SQL 프록시와 동일한 Kubernetes Engine에 위치하지만, Kubernetes Engine 내 별도의 칸에 나타나는 것과 같이 특정 풀, pool-0, pool-1에 위치할 것을 나타내는 화살표가 있는 테넌트 프로젝트 및 고객 프로젝트의 Cloud Composer 환경 리소스
Pod 어피니티를 통한 Cloud Composer Kubernetes Pod 실행 위치(확대하려면 클릭)


kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-pod-affinity',
    name='ex-pod-affinity',
    namespace='default',
    image='perl',
    cmds=['perl'],
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        'nodeAffinity': {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            'requiredDuringSchedulingIgnoredDuringExecution': {
                'nodeSelectorTerms': [{
                    'matchExpressions': [{
                        # When nodepools are created in Google Kubernetes
                        # Engine, the nodes inside of that nodepool are
                        # automatically assigned the label
                        # 'cloud.google.com/gke-nodepool' with the value of
                        # the nodepool's name.
                        'key': 'cloud.google.com/gke-nodepool',
                        'operator': 'In',
                        # The label key's value that pods can be scheduled
                        # on.
                        'values': [
                            'pool-0',
                            'pool-1',
                        ]
                    }]
                }]
            }
        }
    })

현재 구성되어 있는 예시에 따르면 작업이 실패합니다. 작업을 디버그하고 오류를 해결하는 방법을 살펴보겠습니다.

  1. ex-pod-affinity 작업 실패를 확인합니다.
  2. ex-pod-affinity 작업 로그를 확인합니다.

    로그를 살펴보면 노드 풀 pool-0pool-1이 존재하지 않아 작업이 실패함을 알 수 있습니다.

  3. values에 노드 풀이 있는지 확인하려면 다음 중 원하는 구성 변경을 수행합니다.

    • 이전에 노드 풀을 만든 경우 pool-0pool-1을 노드 풀 이름으로 바꾸고 DAG를 다시 업로드합니다.
    • 이름이 pool-0 또는 pool-1노드 풀을 만듭니다. 둘 다 만들 수 있지만 하나만 있어도 작업이 성공할 수 있습니다.
    • pool-0pool-1을 Airflow가 사용하는 기본 풀인 default-pool로 바꿉니다. DAG를 다시 업로드합니다. 참고: 기본적으로 Kubernetes pod는 default-pool에 예약됩니다. 풀을 나중에 추가하면 풀이 default-pool로 제한됩니다.
  4. 사용자 환경이 업데이트될 때까지 몇 분 정도 기다립니다.

  5. ex-pod-affinity 작업을 다시 실행합니다.

  6. ex-pod-affinity 작업 성공을 확인합니다.

전체 구성

이 예시에서는 KubernetesPodOperator에서 구성할 수 있는 모든 변수를 보여줍니다 코드를 수정하지 않아도 ex-all-configs 작업이 성공적으로 수행됩니다.

각 변수에 대한 세부정보는 Airflow KubernetesPodOperator 참조를 확인하세요.

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id='ex-all-configs',
    name='pi',
    namespace='default',
    image='perl',
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=['perl'],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={'pod-label': 'label-name'},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={'EXAMPLE_VAR': '/example/value'},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy='Always',
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={'key1': 'value1'},
    # Resource specifications for Pod, this will allow you to set both cpu
    # and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={'limit_memory': 1, 'limit_cpu': 1},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file='/home/airflow/composer_kube_config',
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={})

DAG 관리

작업 상태 보기

  1. Airflow 웹 인터페이스로 이동합니다.
  2. DAG 페이지에서 DAG 이름(예: composer_sample_kubernetes_pod)을 클릭합니다.
  3. DAG 세부정보 페이지에서 그래프 보기를 클릭합니다.
  4. 상태를 확인합니다.

    • 실패: 작업 주변이 빨간색 상자로 표시되어 있습니다(예: ex-kube-templates). 포인터를 작업 위로 가져가면 상태: 실패를 확인할 수 있습니다.

    • 성공: 작업 주변이 녹색 상자로 표시되어 있습니다(예: pod-ex-minimum). 포인터를 작업 위로 가져가면 상태: 성공을 확인할 수 있습니다.

작업 로그 확인

  1. Airflow UI에서 작업 상태를 봅니다.
  2. DAG의 그래프 보기에서 작업 이름을 클릭합니다.
  3. 작업 인스턴스 컨텍스트 메뉴에서 로그 보기를 클릭합니다.

작업 다시 실행

  1. DAG로 돌아가려면 다음 안내를 따르세요.
    1. Airflow UI의 툴바에서 DAG를 클릭합니다.
    2. DAG 이름을 클릭합니다(예: composer_samples_kubernetes_pod).
  2. 작업을 다시 실행하려면 다음 안내를 따르세요.
    1. 작업 이름을 클릭합니다.
    2. 지우기를 클릭한 다음 확인을 클릭합니다. 작업이 자동으로 다시 실행됩니다.

문제해결

Pod 오류 문제해결 관련 팁

작업 로그 확인 외에 다음 로그도 확인하세요.

  • Airflow 스케줄러 및 작업자의 출력

    1. Cloud Composer 환경의 Cloud Storage 버킷으로 이동합니다. 이는 DAG가 위치한 버킷입니다.
    2. logs/DAG_NAME/TASK_ID/EXECUTION_DATE에서 로그를 검토합니다.
  • GKE 워크로드 내 Cloud Console의 자세한 pod 로그. 이러한 로그에는 pod 정의 YAML 파일, pod 이벤트, pod 세부정보가 포함됩니다.

GKEPodOperator를 사용할 때의 0이 아닌 반환 코드

KubernetesPodOperatorGKEPodOperator를 사용할 때 컨테이너 진입점의 반환 코드를 통해 작업의 성공 여부를 알 수 있습니다. 0이 아닌 반환 코드는 실패를 나타냅니다.

KubernetesPodOperatorGKEPodOperator 사용 시 공통적인 패턴은 셸 스크립트를 컨테이너 진입점으로 실행하여 컨테이너 내에서 여러 작업을 그룹화합니다.

이러한 스크립트를 작성 중인 경우 스크립트 상단에 set -e 명령어를 사용하여 스크립트의 실패한 명령어가 스크립트를 종료하고 실패를 Airflow 작업 인스턴스에 적용하도록 합니다.

Pod 성공에도 불구하고 작업 실패

composer-1.4.1-airflow-* 이하 버전을 실행하는 Cloud Composer 환경의 경우:

Airflow 작업이 한 시간 동안 실행되고 작업 로그가 kubernetes.client.rest.ApiException: (401)Reason: Unauthorized로 끝나는 경우, 기본 Kubernetes 작업은 그 시점 이후에도 계속 실행되며 심지어는 성공할 수도 있습니다. 하지만 Airflow가 작업을 실패로 보고합니다.

이 문제를 해결하려면 kubernetes>=8.0.1에 대한 명시적 PyPI 패키지 종속성을 추가하세요.

Pod 제한 시간

KubernetesPodOperator의 기본 제한 시간은 120초이므로 큰 이미지를 다운로드하기 전에 시간 초과가 발생할 수 있습니다. 제한 시간을 변경하려면 KubernetesPodOperator 생성 시 startup_timeout_seconds 매개변수를 변경합니다.

Pod가 시간 초과되면 Airflow 웹 UI에서 작업 관련 로그를 사용할 수 있습니다. 예:

Executing  on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in 
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Composer 서비스 계정에 작업을 수행하는 데 필요한 IAM 권한이 없는 경우에도 Pod 시간 초과가 발생할 수 있습니다. 이를 확인하려면 GKE 대시보드를 사용하여 Pod 수준 오류를 보고 특정 워크로드의 로그를 확인하거나 Stackdriver Logging을 사용하세요.

새 연결을 설정할 수 없음

자동 업그레이드는 GKE 클러스터에서 기본적으로 사용 설정됩니다. 노드 풀이 업그레이드 중인 클러스터에 있는 경우 다음 오류가 표시될 수 있습니다.

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new connection: [Errno 111] Connection refuse

클러스터가 업그레이드 중인지 확인하려면 Cloud Console에서 GKE 메뉴로 이동하여 환경의 클러스터 이름 옆에 있는 로드 아이콘을 찾으세요.

관련 리소스