KubernetesPodOperator 사용

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

이 페이지에서는 KubernetesPodOperator를 사용하여 Cloud Composer에서 Cloud Composer 환경의 일부인 Google Kubernetes Engine 클러스터로 Kubernetes 포드를 배포하는 방법을 설명합니다.

KubernetesPodOperator는 사용자 환경의 클러스터에서 Kubernetes 포드를 실행합니다. 이에 비해 Google Kubernetes Engine 연산자는 지정된 클러스터에서 Kubernetes 포드를 실행합니다. 이 클러스터는 환경과 관련이 없는 별도의 클러스터일 수 있습니다. Google Kubernetes Engine 연산자를 사용하여 클러스터를 만들고 삭제할 수도 있습니다.

KubernetesPodOperator는 다음이 필요한 경우에 적합한 옵션입니다.

  • 공개 PyPI 저장소를 통해 사용할 수 없는 커스텀 Python 종속 항목
  • 스톡 Cloud Composer 작업자 이미지에서 사용할 수 없는 바이너리 종속 항목

시작하기 전에

Cloud Composer 환경 리소스 설정

Cloud Composer 환경을 만들 때 환경 클러스터의 성능 매개변수를 포함한 성능 매개변수를 지정합니다. Kubernetes 포드를 환경 클러스터에 실행하면 CPU 또는 메모리와 같은 클러스터 리소스에 경쟁이 발생할 수 있습니다. Airflow 스케줄러와 작업자가 동일한 GKE 클러스터에 있기 때문에 경쟁으로 인해 리소스 부족이 발생하면 스케줄러와 작업자가 제대로 작동하지 않습니다.

리소스 부족을 방지하려면 다음 작업 중 하나 이상을 수행합니다.

노드 풀 만들기

Cloud Composer 환경에서 리소스 부족을 방지하려면 새 노드 풀을 만들고 Kubernetes 포드를 구성하여 해당 풀의 리소스만 사용하는 것이 좋습니다.

콘솔

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 이름을 클릭합니다.

  3. 환경 세부정보 페이지에서 환경 구성 탭으로 이동합니다.

  4. 리소스 > GKE 클러스터 섹션에서 클러스터 세부정보 보기 링크를 따릅니다.

  5. 노드 풀 추가에 설명된 대로 노드 풀을 만듭니다.

gcloud

  1. 환경 클러스터의 이름을 확인합니다.

    gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="value(config.gkeCluster)"
    

    다음과 같이 바꿉니다.

    • ENVIRONMENT_NAME: 환경 이름
    • LOCATION을 환경이 위치한 리전으로 바꿉니다.
  2. 출력에는 환경의 클러스터 이름이 포함됩니다. 예를 들어 europe-west3-example-enviro-af810e25-gke일 수 있습니다.

  3. 노드 풀 추가에 설명된 대로 노드 풀을 만듭니다.

사용자 환경에서 노드 수 늘리기

Cloud Composer 환경에서 노드 수를 늘리면 워크로드에 제공되는 컴퓨팅 성능이 향상됩니다. 이렇게 해도 지정된 머신 유형이 제공하는 것보다 많은 CPU 또는 RAM이 필요한 작업에 추가적인 리소스가 제공되지 않습니다.

노드 수를 늘리려면 환경을 업데이트하세요.

적절한 머신 유형 지정

Cloud Composer 환경을 생성하는 동안 머신 유형을 지정할 수 있습니다. 가용 리소스를 확보하려면 Cloud Composer 환경에서 발생하는 컴퓨팅 유형에 적합한 머신 유형을 지정합니다.

최소 구성

KubernetesPodOperator를 만들려면 포드의 name, 사용할 image, task_id 매개변수만 필요합니다. /home/airflow/composer_kube_config에는 GKE에 인증할 사용자 인증 정보가 포함되어 있습니다.

Airflow 2

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. In Composer 1 there is the potential for
    # the resource starvation of Airflow workers and scheduler
    # within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources,
    # and using Composer 2 will mean the environment will autoscale.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

Airflow 1

kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes, default namespace is
    # `default`. There is the potential for the resource starvation of
    # Airflow workers and scheduler within the Cloud Composer environment,
    # the recommended solution is to increase the amount of nodes in order
    # to satisfy the computing requirements. Alternatively, launching pods
    # into a custom namespace will stop fighting over resources.
    namespace="default",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
)

포드 어피니티 구성

KubernetesPodOperator에서 affinity 매개변수를 구성할 때 특정 노드 풀의 노드 등 포드를 예약할 노드를 제어합니다. 이 예시에서 연산자는 pool-0pool-1이라는 이름의 노드 풀에서만 실행됩니다. Cloud Composer 1 환경 노드는 default-pool에 있으므로 포드는 환경의 노드에서 실행되지 않습니다.

Airflow 2

# Pod affinity with the KubernetesPodOperator
# is not supported with Composer 2
# instead, create a cluster and use the GKEStartPodOperator
# https://cloud.google.com/composer/docs/using-gke-operator
kubernetes_affinity_ex = KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

Airflow 1

kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-pod-affinity",
    name="ex-pod-affinity",
    namespace="default",
    image="perl:5.34.0",
    cmds=["perl"],
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # affinity allows you to constrain which nodes your pod is eligible to
    # be scheduled on, based on labels on the node. In this case, if the
    # label 'cloud.google.com/gke-nodepool' with value
    # 'nodepool-label-value' or 'nodepool-label-value2' is not found on any
    # nodes, it will fail to schedule.
    affinity={
        "nodeAffinity": {
            # requiredDuringSchedulingIgnoredDuringExecution means in order
            # for a pod to be scheduled on a node, the node must have the
            # specified labels. However, if labels on a node change at
            # runtime such that the affinity rules on a pod are no longer
            # met, the pod will still continue to run on the node.
            "requiredDuringSchedulingIgnoredDuringExecution": {
                "nodeSelectorTerms": [
                    {
                        "matchExpressions": [
                            {
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                "key": "cloud.google.com/gke-nodepool",
                                "operator": "In",
                                # The label key's value that pods can be scheduled
                                # on.
                                "values": [
                                    "pool-0",
                                    "pool-1",
                                ],
                            }
                        ]
                    }
                ]
            }
        }
    },
)

구성되어 있는 예시에 따르면 작업이 실패합니다. 로그를 살펴보면 노드 풀 pool-0pool-1이 존재하지 않기 때문에 작업이 실패합니다.

values에 노드 풀이 있는지 확인하려면 다음 중 원하는 구성 변경을 수행합니다.

  • 이전에 노드 풀을 만든 경우 pool-0pool-1을 노드 풀 이름으로 바꾸고 DAG를 다시 업로드합니다.

  • 이름이 pool-0 또는 pool-1노드 풀을 만듭니다. 둘 다 만들 수 있지만 하나만 있어도 작업이 성공할 수 있습니다.

  • pool-0pool-1을 Airflow가 사용하는 기본 풀인 default-pool로 바꿉니다. DAG를 다시 업로드합니다.

변경 후 사용자 환경이 업데이트될 때까지 몇 분 정도 기다립니다. 그런 다음 ex-pod-affinity 작업을 다시 실행하고 ex-pod-affinity 작업이 성공하는지 확인합니다.

추가 구성

이 예시에서는 KubernetesPodOperator에서 구성할 수 있는 추가 매개변수를 보여줍니다.

자세한 내용은 다음 리소스를 참고하세요.

Airflow 2

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        limits={"memory": "250M", "cpu": "100m"},
    ),
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

Airflow 1

kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="default",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 120.
    startup_timeout_seconds=120,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 1.10.4, resource specifications were
    # passed as a Pod Resources Class object,
    # If using this example on a version of Airflow prior to 1.10.4,
    # import the "pod" package from airflow.contrib.kubernetes and use
    # resources = pod.Resources() instead passing a dict
    # For more info see:
    # https://github.com/apache/airflow/pull/4551
    resources={"limit_memory": "250M", "limit_cpu": "100m"},
    # Specifies path to kubernetes config. If no config is specified will
    # default to '~/.kube/config'. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    affinity={},
)

Jinja 템플릿 사용

Airflow는 DAG에서 Jinja 템플릿을 지원합니다.

연산자와 함께 필수 Airflow 매개변수(task_id, name, image)를 선언해야 합니다. 다음 예시와 같이 Jinja를 사용하여 다른 모든 매개변수(cmds, arguments, env_vars, config_file)를 템플릿으로 만들 수 있습니다.

예시의 env_vars 매개변수는 my_value라는 Airflow 변수에서 설정됩니다. 예시 DAG는 Airflow의 vars 템플릿 변수에서 값을 가져옵니다. Airflow에는 다양한 유형의 정보에 액세스할 수 있는 더 많은 변수가 있습니다. 예를 들어 conf 템플릿 변수를 사용하여 Airflow 구성 옵션의 값에 액세스할 수 있습니다. 자세한 내용과 Airflow에서 사용할 수 있는 변수 목록은 Airflow 문서의 템플릿 참조를 확인하세요.

DAG를 변경하거나 env_vars 변수를 만들지 않으면 변수가 없으므로 예시의 ex-kube-templates 태스크가 실패합니다. Airflow UI 또는 Google Cloud CLI에서 이 변수를 만듭니다.

Airflow UI

  1. Airflow UI로 이동합니다.

  2. 툴바에서 관리 > 변수를 선택합니다.

  3. 변수 나열 페이지에서 새 레코드 추가를 클릭합니다.

  4. 변수 추가 페이지에서 다음 정보를 입력합니다.

    • 키: my_value
    • Val: example_value
  5. 저장을 클릭합니다.

환경에서 Airflow 1을 사용하는 경우 다음 명령어를 대신 실행합니다.

  1. Airflow UI로 이동합니다.

  2. 툴바에서 관리 > 변수를 선택합니다.

  3. 변수 페이지에서 만들기 탭을 클릭합니다.

  4. 변수 페이지에서 다음 정보를 입력합니다.

    • 키: my_value
    • Val: example_value
  5. 저장을 클릭합니다.

gcloud

다음 명령어를 입력합니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

환경에서 Airflow 1을 사용하는 경우 다음 명령어를 대신 실행합니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables -- \
    --set my_value example_value

다음과 같이 바꿉니다.

  • ENVIRONMENT: 환경 이름
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

다음 예시는 KubernetesPodOperator와 함께 Jinja 템플릿을 사용하는 방법을 보여줍니다.

Airflow 2

kubenetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Airflow 1

kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="default",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
)

Kubernetes 보안 비밀 및 ConfigMap 사용

Kubernetes 보안 비밀은 민감한 정보를 포함하는 객체입니다. Kubernetes ConfigMap은 키-값 쌍에 비기밀 데이터를 포함하는 객체입니다.

Cloud Composer 2에서는 Google Cloud CLI, API 또는 Terraform을 사용하여 보안 비밀 및 ConfigMap을 만든 다음 KubernetesPodOperator에서 액세스할 수 있습니다.

YAML 구성 파일 정보

Google Cloud CLI 및 API를 사용하여 Kubernetes 보안 비밀 또는 ConfigMap을 만들 때 YAML 형식의 파일을 제공합니다. 이 파일은 Kubernetes 보안 비밀 및 ConfigMap에서 사용하는 것과 동일한 형식을 따라야 합니다. Kubernetes 문서에서는 ConfigMap 및 보안 비밀의 여러 코드 샘플을 제공합니다. 시작하려면 보안 비밀을 사용하여 사용자 인증 정보를 안전하게 배포 페이지와 ConfigMaps을 참조하세요.

Kubernetes 보안 비밀과 동일하게 보안 비밀에서 값을 정의할 때는 base64 표현을 사용합니다.

값을 인코딩하려면 다음 명령어를 사용할 수 있습니다. 이는 base64로 인코딩된 값을 가져오는 여러 가지 방법 중 하나입니다.

echo "postgresql+psycopg2://root:example-password@127.0.0.1:3306/example-db" -n | base64

출력:

cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

이 가이드 뒷부분의 샘플에는 다음 두 개의 YAML 파일 예시가 사용됩니다. Kubernetes 보안 비밀의 YAML 구성 파일 예시:

apiVersion: v1
kind: Secret
metadata:
  name: airflow-secrets
data:
  sql_alchemy_conn: cG9zdGdyZXNxbCtwc3ljb3BnMjovL3Jvb3Q6ZXhhbXBsZS1wYXNzd29yZEAxMjcuMC4wLjE6MzMwNi9leGFtcGxlLWRiIC1uCg==

파일을 포함하는 방법을 보여주는 또 다른 예시입니다. 이전 예시와 마찬가지로 먼저 파일(cat ./key.json | base64)의 콘텐츠를 인코딩한 다음 YAML 파일에 이 값을 제공합니다.

apiVersion: v1
kind: Secret
metadata:
  name: service-account
data:
  service-account.json: |
    ewogICJ0eXBl...mdzZXJ2aWNlYWNjb3VudC5jb20iCn0K

ConfigMap의 YAML 구성 파일 예시. ConfigMap에서는 base64 표현을 사용할 필요가 없습니다.

apiVersion: v1
kind: ConfigMap
metadata:
  name: example-configmap
data:
  example_key: example_value

Kubernetes 보안 비밀 관리

Cloud Composer 2에서는 Google Cloud CLI 및 kubectl을 사용하여 보안 비밀을 만듭니다.

  1. 환경 클러스터에 대한 정보를 얻습니다.

    1. 다음 명령어를 실행합니다.

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      다음과 같이 바꿉니다.

      • ENVIRONMENT를 환경 이름으로 바꿉니다.
      • LOCATION을 Cloud Composer 환경이 위치한 리전으로 바꿉니다.

      이 명령어 출력에는 projects/<your-project-id>/zones/<zone-of-composer-env>/clusters/<your-cluster-id> 형식이 사용됩니다.

    2. GKE 클러스터 ID를 가져오려면 /clusters/ 다음의 출력을 복사합니다(-gke로 끝남).

    3. 영역을 가져오려면 /zones/ 뒤에 출력을 복사합니다.

  2. 다음 명령어를 사용하여 GKE 클러스터에 연결합니다.

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --zone ZONE
    

    다음과 같이 바꿉니다.

    • CLUSTER_ID: 환경의 클러스터 ID
    • PROJECT_ID: 프로젝트 ID
    • ZONE을 환경의 클러스터가 있는 영역으로 바꿉니다.
  3. Kubernetes 보안 비밀 만들기:

    다음 명령어는 Kubernetes 보안 비밀을 만드는 두 가지 다른 접근 방식을 보여줍니다. --from-literal 방식은 키-값 쌍을 사용합니다. --from-file 방식은 파일 콘텐츠를 사용합니다.

    • 키-값 쌍을 제공하여 Kubernetes 보안 비밀을 만들려면 다음 명령어를 실행합니다. 이 예에서는 sql_alchemy_conn 필드에 test_value 값이 있는 airflow-secrets라는 이름의 보안 비밀을 만듭니다.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value
      
    • 파일 콘텐츠를 제공하여 Kubernetes 보안 비밀을 만들려면 다음 명령어를 실행합니다. 이 예에서는 로컬 ./key.json 파일의 콘텐츠에서 가져온 값이 있는 service-account.json 필드가 있는 service-account라는 이름의 보안 비밀을 만듭니다.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json
      

DAG에서 Kubernetes 보안 비밀 사용

이 예시에서는 Kubernetes 보안 비밀을 환경 변수와 포드가 마운트한 볼륨으로 사용하는 두 가지 방법을 보여줍니다.

Airflow 또는 Cloud Composer 환경 변수와 반대로 첫 번째 보안 비밀 airflow-secretsSQL_CONN이라는 Kubernetes 환경 변수로 설정됩니다.

두 번째 보안 비밀 service-account는 서비스 계정 토큰이 있는 파일인 service-account.json/var/secrets/google에 마운트합니다.

보안 비밀 객체는 다음과 같습니다.

Airflow 2

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

Airflow 1

secret_env = secret.Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = secret.Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

첫 번째 Kubernetes 보안 비밀의 이름은 secret_env 변수에 정의됩니다. 이 보안 비밀의 이름은 airflow-secrets입니다. deploy_type 매개변수에서는 이 보안 비밀이 환경 변수로 노출되어야 한다는 것을 지정합니다. 환경 변수의 이름은 deploy_target 매개변수에 지정된 대로 SQL_CONN입니다. 마지막으로 SQL_CONN 환경 변수의 값이 sql_alchemy_conn 키의 값으로 설정됩니다.

두 번째 Kubernetes 보안 비밀의 이름은 secret_volume 변수에 정의됩니다. 이 보안 비밀의 이름은 service-account입니다. 이 보안 비밀은 deploy_type 매개변수에 지정된 대로 볼륨으로 노출됩니다. 마운트할 파일 deploy_target의 경로는 /var/secrets/google입니다. 마지막으로 deploy_target에 저장된 보안 비밀의 keyservice-account.json입니다.

연산자 구성은 다음과 같습니다.

Airflow 2

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

Airflow 1

kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="default",
    image="ubuntu",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json ",
    },
)

CNCF Kubernetes Provider 정보

KubernetesPodOperator는 apache-airflow-providers-cncf-kubernetes Provider에 구현됩니다.

CNCF Kubernetes Provider의 자세한 출시 노트는 CNCF Kubernetes Provider 웹사이트를 참조하세요.

버전 6.0.0

CNCF Kubernetes Provider 패키지 버전 6.0.0에서는 기본적으로 kubernetes_default 연결이 KubernetesPodOperator에서 사용됩니다.

버전 5.0.0에서 커스텀 연결을 지정한 경우 이 커스텀 연결이 연산자에서 계속 사용합니다. kubernetes_default 연결을 사용하도록 다시 전환하려면 이에 따라 DAG를 조정해야 할 수 있습니다.

버전 5.0.0

이 버전은 버전 4.4.0과 비교하여 이전 버전과 호환되지 않는 몇 가지 변경사항을 도입합니다. 가장 중요한 것은 버전 5.0.0에서 사용되지 않는 kubernetes_default 연결과 관련이 있습니다.

  • kubernetes_default 연결을 수정해야 합니다. 다음 그림과 같이 Kubernetes 구성 경로는 /home/airflow/composer_kube_config로 설정해야 합니다. 또는 다음 코드 예시와 같이 config_file을 KubernetesPodOperator 구성에 추가해야 합니다.
Airflow UI의 Kube 구성 경로 필드
그림 1. Airflow UI, kubernetes_default 연결 수정(확대하려면 클릭)
  • 다음과 같은 방법으로 KubernetesPodOperator를 사용하여 태스크 코드를 수정합니다.
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

버전 5.0.0에 대한 자세한 내용은 CNCF Kubernetes Provider 출시 노트를 참조하세요.

문제 해결

이 섹션에서는 일반적인 KubernetesPodOperator 문제를 해결하기 위한 조언을 제공합니다.

로그 보기

문제를 해결할 때는 다음 순서로 로그를 확인할 수 있습니다.

  1. Airflow 태스크 로그:

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

    3. DAG 탭으로 이동합니다.

    4. DAG 이름을 클릭한 다음 DAG 실행을 클릭하여 세부정보와 로그를 확인합니다.

  2. Airflow 스케줄러 로그:

    1. 환경 세부정보 페이지로 이동합니다.

    2. 로그 탭으로 이동합니다.

    3. Airflow 스케줄러 로그를 검사합니다.

  3. GKE 워크로드 내 Google Cloud 콘솔의 포드 로그 이러한 로그에는 포드 정의 YAML 파일, 포드 이벤트, 포드 세부정보가 포함됩니다.

0이 아닌 반환 코드

KubernetesPodOperator(및 GKEStartPodOperator)를 사용할 때 컨테이너 진입점의 반환 코드를 통해 작업의 성공 여부를 알 수 있습니다. 0이 아닌 반환 코드는 실패를 나타냅니다.

일반적인 패턴은 셸 스크립트를 컨테이너 진입점으로 실행하여 컨테이너 내에서 여러 작업을 함께 그룹화하는 것입니다.

이러한 스크립트를 작성 중인 경우 스크립트 상단에 set -e 명령어를 사용하여 스크립트의 실패한 명령어가 스크립트를 종료하고 실패를 Airflow 작업 인스턴스에 적용하도록 합니다.

포드 제한 시간

KubernetesPodOperator의 기본 제한 시간은 120초이므로 큰 이미지를 다운로드하기 전에 시간 초과가 발생할 수 있습니다. 제한 시간을 변경하려면 KubernetesPodOperator 생성 시 startup_timeout_seconds 매개변수를 변경합니다.

포드가 시간 초과되면 Airflow UI에서 작업별 로그를 사용할 수 있습니다. 예를 들면 다음과 같습니다.

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Cloud Composer 서비스 계정에 작업을 수행하는 데 필요한 IAM 권한이 없는 경우에도 포드 시간 초과가 발생할 수 있습니다. 이를 확인하려면 GKE 대시보드를 사용하여 포드 수준 오류를 보고 특정 워크로드의 로그를 확인하거나 Cloud Logging을 사용하세요.

새 연결을 설정할 수 없음

자동 업그레이드는 GKE 클러스터에서 기본적으로 사용 설정됩니다. 노드 풀이 업그레이드 중인 클러스터에 있는 경우 다음 오류가 표시될 수 있습니다.

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

클러스터가 업그레이드 중인지 확인하려면 Google Cloud 콘솔에서 Kubernetes 클러스터 페이지로 이동하여 환경의 클러스터 이름 옆에 있는 로드 아이콘을 찾으세요.

다음 단계