KubernetesPodOperator 사용

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 KubernetesPodOperator를 사용해서 Kubernetes 포드를 Cloud Composer에서 Cloud Composer 환경에 포함된 Google Kubernetes Engine 클러스터에 배포하고 해당 환경에 적절한 리소스가 포함되었는지 확인하는 방법을 설명합니다.

KubernetesPodOperator환경의 클러스터에서 Kubernetes 포드를 실행합니다. 이에 비해 Google Kubernetes Engine 연산자는 지정된 클러스터에서 Kubernetes 포드를 실행합니다. 이 클러스터는 환경과 관련이 없는 별도의 클러스터일 수 있습니다. Google Kubernetes Engine 연산자를 사용하여 클러스터를 만들고 삭제할 수도 있습니다.

KubernetesPodOperator는 다음이 필요한 경우에 적합한 옵션입니다.

  • 공개 PyPI 저장소를 통해 사용할 수 없는 커스텀 Python 종속 항목
  • 스톡 Cloud Composer 작업자 이미지에서 사용할 수 없는 바이너리 종속 항목

이 페이지에서는 다음 KubernetesPodOperator 구성을 포함하는 예시 Airflow DAG를 안내합니다.

시작하기 전에

  • Cloud Composer 2에서 환경 클러스터는 자동으로 확장됩니다. KubernetesPodOperator를 사용하여 실행하는 추가 워크로드는 사용자 환경과 독립적으로 확장됩니다. 사용자 환경이 증가한 리소스 수요의 영향을 받지 않지만 사용자 환경의 클러스터는 리소스 수요에 따라 확장 및 축소됩니다. 사용자 환경의 클러스터에서 실행되는 추가 워크로드의 가격 책정은 Cloud Composer 2 가격 책정 모델을 따르고 Cloud Composer Compute SKU를 사용합니다.

  • Cloud Composer 2는 워크로드 아이덴티티와 함께 GKE 클러스터를 사용합니다. 기본적으로 새로 생성된 네임스페이스 또는 composer-user-workloads 네임스페이스에서 실행되는 포드는 Google Cloud 리소스에 액세스할 수 없습니다. 워크로드 아이덴티티를 사용할 때 Google API 및 기타 서비스에 대한 요청에 서비스 ID 승인을 사용 설정하려면 네임스페이스와 연결된 Kubernetes 서비스 계정을 Google Cloud 서비스 계정에 매핑해야 합니다.

    따라서 환경의 클러스터에서 composer-user-workloads 네임스페이스 또는 새로 생성된 네임스페이스에서 포드를 실행하면 Kubernetes 서비스 계정과 Google Cloud 서비스 계정 간에 적절한 IAM 바인딩이 생성되지 않으며 이러한 포드는 Google Cloud 프로젝트의 리소스에 액세스할 수 없습니다.

    포드가 Google Cloud 리소스에 액세스하도록 하려면 자세히 설명된 바와 같이 composer-user-workloads 네임스페이스를 사용하거나 자체 네임스페이스를 만듭니다.

    프로젝트 리소스에 대한 액세스 권한을 제공하려면 워크로드 아이덴티티의 안내를 따르고 바인딩을 설정합니다.

    1. 환경 클러스터에 개별 네임스페이스를 만듭니다.
    2. composer-user-workloads/<namespace_name> Kubernetes 서비스 계정과 사용자 환경의 서비스 계정 사이에 바인딩을 만듭니다.
    3. 사용자 환경의 서비스 계정 주석을 Kubernetes 서비스 계정에 추가합니다.
    4. KubernetesPodOperator를 사용하는 경우 namespaceservice_account_name 매개변수에 네임스페이스와 Kubernetes 서비스 계정을 지정합니다.
  • Cloud Composer 2는 워크로드 아이덴티티와 함께 GKE 클러스터를 사용합니다. GKE 메타데이터 서버는 새로 생성된 포드에서 요청 수락을 시작하는 데 몇 초 정도 걸립니다. 따라서 포드 수명의 처음 몇 초 내에 워크로드 아이덴티티를 사용하여 인증을 시도하면 실패할 수 있습니다. 이 제한사항에 대한 자세한 내용은 워크로드 아이덴티티 제한사항을 참조하세요.

  • Cloud Composer 2는 컴퓨팅 클래스의 개념을 도입하는 Autopilot 클러스터를 사용합니다.

    • 기본적으로 클래스를 선택하지 않으면 KubernetesPodOperator를 사용하여 포드를 만들 때 general-purpose 클래스로 간주됩니다.

    • 각 클래스는 특정 속성 및 리소스 한도와 연결됩니다. 자세한 내용은 Autopilot 문서를 참조하세요. 예를 들어 general-purpose 클래스 내에서 실행되는 포드는 최대 110GiB 메모리를 사용할 수 있습니다.

  • CNCF Kubernetes Provider 버전 5.0.0이 사용되는 경우 CNCF Kubernetes Provider 섹션에 설명된 안내를 따릅니다.

KubernetesPodOperator 구성

이 예시를 따르려면 전체 kubernetes_pod_operator.py 파일을 사용자 환경의 dags/ 폴더에 넣거나 DAG에 관련성이 높은 KubernetesPodOperator 코드를 추가합니다.

다음 섹션에서는 예시의 각 KubernetesPodOperator 구성에 대해 설명합니다. 각 구성 변수에 대한 자세한 내용은 Airflow 참조를 확인하세요.

"""Example DAG using KubernetesPodOperator."""
import datetime

from airflow import models
from airflow.kubernetes.secret import Secret
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
    KubernetesPodOperator,
)
from kubernetes.client import models as k8s_models

# A Secret is an object that contains a small amount of sensitive data such as
# a password, a token, or a key. Such information might otherwise be put in a
# Pod specification or in an image; putting it in a Secret object allows for
# more control over how it is used, and reduces the risk of accidental
# exposure.
secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)
# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

# If a Pod fails to launch, or has an error occur in the container, Airflow
# will show the task as failed, as well as contain all of the task logs
# required to debug.
with models.DAG(
    dag_id="composer_sample_kubernetes_pod",
    schedule_interval=datetime.timedelta(days=1),
    start_date=YESTERDAY,
) as dag:
    # Only name, namespace, image, and task_id are required to create a
    # KubernetesPodOperator. In Cloud Composer, the config file found at
    # `/home/airflow/composer_kube_config` contains credentials for
    # Cloud Composer's Google Kubernetes Engine cluster that is created
    # upon environment creation.
    kubernetes_min_pod = KubernetesPodOperator(
        # The ID specified for the task.
        task_id="pod-ex-minimum",
        # Name of task you want to run, used to generate Pod ID.
        name="pod-ex-minimum",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # The namespace to run within Kubernetes. In Composer 2 environments
        # after December 2022, the default namespace is
        # `composer-user-workloads`.
        namespace="composer-user-workloads",
        # Docker image specified. Defaults to hub.docker.com, but any fully
        # qualified URLs will point to a custom repository. Supports private
        # gcr.io images if the Composer Environment is under the same
        # project-id as the gcr.io images and the service account that Composer
        # uses has permission to access the Google Container Registry
        # (the default service account has permission)
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_template_ex = KubernetesPodOperator(
        task_id="ex-kube-templates",
        name="ex-kube-templates",
        namespace="composer-user-workloads",
        image="bash",
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["echo"],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=["{{ ds }}"],
        # The var template variable allows you to access variables defined in
        # Airflow UI. In this case we are getting the value of my_value and
        # setting the environment variable `MY_VALUE`. The pod will fail if
        # `my_value` is not set in the Airflow UI.
        env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
        # Sets the config file to a kubernetes config file specified in
        # airflow.cfg. If the configuration file does not exist or does
        # not provide validcredentials the pod will fail to launch. If not
        # specified, config_file defaults to ~/.kube/config
        config_file="{{ conf.get('core', 'kube_config') }}",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_secret_vars_ex = KubernetesPodOperator(
        task_id="ex-kube-secrets",
        name="ex-kube-secrets",
        namespace="composer-user-workloads",
        image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
        startup_timeout_seconds=300,
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[secret_env, secret_volume],
        cmds=["echo"],
        # env_vars allows you to specify environment variables for your
        # container to use. env_vars is templated.
        env_vars={
            "EXAMPLE_VAR": "/example/value",
            "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
        },
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
    )
    kubernetes_full_pod = KubernetesPodOperator(
        task_id="ex-all-configs",
        name="pi",
        namespace="composer-user-workloads",
        image="perl:5.34.0",
        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=["perl"],
        # Arguments to the entrypoint. The docker image's CMD is used if this
        # is not provided. The arguments parameter is templated.
        arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
        # The secrets to pass to Pod, the Pod will fail to create if the
        # secrets you specify in a Secret object do not exist in Kubernetes.
        secrets=[],
        # Labels to apply to the Pod.
        labels={"pod-label": "label-name"},
        # Timeout to start up the Pod, default is 600.
        startup_timeout_seconds=600,
        # The environment variables to be initialized in the container
        # env_vars are templated.
        env_vars={"EXAMPLE_VAR": "/example/value"},
        # If true, logs stdout output of container. Defaults to True.
        get_logs=True,
        # Determines when to pull a fresh image, if 'IfNotPresent' will cause
        # the Kubelet to skip pulling an image if it already exists. If you
        # want to always pull a new image, set it to 'Always'.
        image_pull_policy="Always",
        # Annotations are non-identifying metadata you can attach to the Pod.
        # Can be a large range of data, and can include characters that are not
        # permitted by labels.
        annotations={"key1": "value1"},
        # Optional resource specifications for Pod, this will allow you to
        # set both cpu and memory limits and requirements.
        # Prior to Airflow 2.3 and the cncf providers package 5.0.0
        # resources were passed as a dictionary. This change was made in
        # https://github.com/apache/airflow/pull/27197
        # Additionally, "memory" and "cpu" were previously named
        # "limit_memory" and "limit_cpu"
        # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
        container_resources=k8s_models.V1ResourceRequirements(
            requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
            limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        ),
        # Specifies path to kubernetes config. The config_file is templated.
        config_file="/home/airflow/composer_kube_config",
        # If true, the content of /airflow/xcom/return.json from container will
        # also be pushed to an XCom when the container ends.
        do_xcom_push=False,
        # List of Volume objects to pass to the Pod.
        volumes=[],
        # List of VolumeMount objects to pass to the Pod.
        volume_mounts=[],
        # Identifier of connection that should be used
        kubernetes_conn_id="kubernetes_default",
        # Affinity determines which nodes the Pod can run on based on the
        # config. For more information see:
        # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
        # Pod affinity with the KubernetesPodOperator
        # is not supported with Composer 2
        # instead, create a cluster and use the GKEStartPodOperator
        # https://cloud.google.com/composer/docs/using-gke-operator
        affinity={},
    )

최소 구성

KubernetesPodOperator를 만들려면 포드의 name, 포드를 실행할 namespace, 사용할 image, task_id만 필요합니다.

DAG에 다음 코드 스니펫을 배치하면 구성에서 /home/airflow/composer_kube_config의 기본값을 사용합니다. 코드를 수정하지 않아도 pod-ex-minimum 작업이 성공적으로 수행됩니다.

kubernetes_min_pod = KubernetesPodOperator(
    # The ID specified for the task.
    task_id="pod-ex-minimum",
    # Name of task you want to run, used to generate Pod ID.
    name="pod-ex-minimum",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # The namespace to run within Kubernetes. In Composer 2 environments
    # after December 2022, the default namespace is
    # `composer-user-workloads`.
    namespace="composer-user-workloads",
    # Docker image specified. Defaults to hub.docker.com, but any fully
    # qualified URLs will point to a custom repository. Supports private
    # gcr.io images if the Composer Environment is under the same
    # project-id as the gcr.io images and the service account that Composer
    # uses has permission to access the Google Container Registry
    # (the default service account has permission)
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

템플릿 구성

Airflow는 Jinja 템플릿 사용을 지원합니다. 연산자와 함께 필수 변수(task_id, name, namespace, image)를 선언해야 합니다. 다음 예시와 같이 Jinja를 사용하여 다른 모든 매개변수(cmds, arguments, env_vars, config_file)를 템플릿으로 만들 수 있습니다.

kubernetes_template_ex = KubernetesPodOperator(
    task_id="ex-kube-templates",
    name="ex-kube-templates",
    namespace="composer-user-workloads",
    image="bash",
    # All parameters below are able to be templated with jinja -- cmds,
    # arguments, env_vars, and config_file. For more information visit:
    # https://airflow.apache.org/docs/apache-airflow/stable/macros-ref.html
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["echo"],
    # DS in jinja is the execution date as YYYY-MM-DD, this docker image
    # will echo the execution date. Arguments to the entrypoint. The docker
    # image's CMD is used if this is not provided. The arguments parameter
    # is templated.
    arguments=["{{ ds }}"],
    # The var template variable allows you to access variables defined in
    # Airflow UI. In this case we are getting the value of my_value and
    # setting the environment variable `MY_VALUE`. The pod will fail if
    # `my_value` is not set in the Airflow UI.
    env_vars={"MY_VALUE": "{{ var.value.my_value }}"},
    # Sets the config file to a kubernetes config file specified in
    # airflow.cfg. If the configuration file does not exist or does
    # not provide validcredentials the pod will fail to launch. If not
    # specified, config_file defaults to ~/.kube/config
    config_file="{{ conf.get('core', 'kube_config') }}",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

DAG 또는 사용자 환경을 변경하지 않으면 두 오류로 인해 ex-kube-templates 작업이 실패합니다. 로그는 적합한 변수가 존재하지 않아(my_value) 이 작업이 실패함을 보여줍니다. 첫 번째 오류를 해결한 후 발생할 수 있는 두 번째 오류는 config에서 core/kube_config를 찾을 수 없어서 작업이 실패함을 보여줍니다.

두 오류를 해결하려면 추가로 설명된 단계를 따릅니다.

gcloud 또는 Airflow UI를 통해 my_value를 설정하려면 다음 안내를 따릅니다.

Airflow UI

Airflow 2 UI에서 다음 안내를 따르세요.

  1. Airflow UI로 이동합니다.

  2. 툴바에서 관리 > 변수를 선택합니다.

  3. 변수 나열 페이지에서 새 레코드 추가를 클릭합니다.

  4. 변수 추가 페이지에서 다음 정보를 입력합니다.

    • 키: my_value
    • Val: example_value
  5. 저장을 클릭합니다.

gcloud

Airflow 2의 경우 다음 명령어를 입력합니다.

gcloud composer environments run ENVIRONMENT \
    --location LOCATION \
    variables set -- \
    my_value example_value

다음과 같이 바꿉니다.

  • ENVIRONMENT을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.

커스텀 config_file(Kubernetes 구성 파일)을 참조하려면 kube_config Airflow 구성 옵션을 유효한 Kubernetes 구성으로 재정의합니다.

섹션
core kube_config /home/airflow/composer_kube_config

사용자 환경이 업데이트될 때까지 몇 분 정도 기다립니다. 그런 다음 ex-kube-templates 작업을 다시 실행하고 ex-kube-templates 작업이 성공하는지 확인합니다.

보안 비밀 변수 구성

Kubernetes 보안 비밀은 민감한 정보를 포함하는 객체입니다. KubernetesPodOperator를 사용하여 보안 비밀을 Kubernetes 포드에 전달할 수 있습니다. 보안 비밀이 Kubernetes에 정의되어 있지 않으면 포드가 실행되지 않습니다.

이 예시에서는 Kubernetes 보안 비밀을 환경 변수와 포드가 마운트한 볼륨으로 사용하는 두 가지 방법을 보여줍니다.

Airflow 또는 Cloud Composer 환경 변수와 반대로 첫 번째 보안 비밀 airflow-secretsSQL_CONN이라는 Kubernetes 환경 변수로 설정됩니다.

두 번째 보안 비밀 service-account는 서비스 계정 토큰이 있는 파일인 service-account.json/var/secrets/google에 마운트합니다.

보안 비밀은 다음과 같습니다.

secret_env = Secret(
    # Expose the secret as environment variable.
    deploy_type="env",
    # The name of the environment variable, since deploy_type is `env` rather
    # than `volume`.
    deploy_target="SQL_CONN",
    # Name of the Kubernetes Secret
    secret="airflow-secrets",
    # Key of a secret stored in this Secret object
    key="sql_alchemy_conn",
)
secret_volume = Secret(
    deploy_type="volume",
    # Path where we mount the secret as volume
    deploy_target="/var/secrets/google",
    # Name of Kubernetes Secret
    secret="service-account",
    # Key in the form of service account file name
    key="service-account.json",
)

첫 번째 Kubernetes 보안 비밀의 이름은 secret 변수에 정의됩니다. 이 특정 보안 비밀의 이름은 airflow-secrets입니다. deploy_type의 지시에 따라 환경 변수로 표시됩니다. 설정되는 환경 변수 deploy_targetSQL_CONN입니다. 마지막으로 deploy_target에 저장된 보안 비밀의 keysql_alchemy_conn입니다.

두 번째 Kubernetes 보안 비밀의 이름은 secret 변수에 정의됩니다. 이 특정 보안 비밀의 이름은 service-account입니다. deploy_type의 지시에 따라 볼륨으로 표시됩니다. 마운트할 파일의 경로 deploy_target/var/secrets/google입니다. 마지막으로 deploy_target에 저장된 보안 비밀의 keyservice-account.json입니다.

연산자 구성은 다음과 같습니다.

kubernetes_secret_vars_ex = KubernetesPodOperator(
    task_id="ex-kube-secrets",
    name="ex-kube-secrets",
    namespace="composer-user-workloads",
    image="gcr.io/gcp-runtimes/ubuntu_20_0_4",
    startup_timeout_seconds=300,
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[secret_env, secret_volume],
    cmds=["echo"],
    # env_vars allows you to specify environment variables for your
    # container to use. env_vars is templated.
    env_vars={
        "EXAMPLE_VAR": "/example/value",
        "GOOGLE_APPLICATION_CREDENTIALS": "/var/secrets/google/service-account.json",
    },
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
)

DAG 또는 사용자 환경을 변경하지 않으면 ex-kube-secrets 태스크가 실패합니다. 로그를 살펴보면 Pod took too long to start 오류로 인해 태스크가 실패합니다. 이 오류는 Airflow가 구성에 지정된 보안 비밀 secret_env를 찾을 수 없기 때문에 발생합니다.

gcloud

gcloud를 사용하여 보안 비밀을 설정하는 방법은 다음과 같습니다.

  1. Cloud Composer 환경 클러스터에 대한 정보를 가져옵니다.

    1. 다음 명령어를 실행합니다.

      gcloud composer environments describe ENVIRONMENT \
          --location LOCATION \
          --format="value(config.gkeCluster)"
      

      다음과 같이 바꿉니다.

      • ENVIRONMENT를 환경 이름으로 바꿉니다.
      • LOCATION을 Cloud Composer 환경이 위치한 리전으로 바꿉니다.

      이 명령어 출력에는 projects/<your-project-id>/locations/<location-of-composer-env>/clusters/<your-cluster-id> 형식이 사용됩니다.

    2. GKE 클러스터 ID를 가져오려면 /clusters/ 다음의 출력을 복사합니다(-gke로 끝남).

  2. 다음 명령어를 실행하여 GKE 클러스터에 연결합니다.

    gcloud container clusters get-credentials CLUSTER_ID \
      --project PROJECT \
      --region LOCATION
    

    다음과 같이 바꿉니다.

    • CLUSTER_ID를 GKE 클러스터 ID로 바꿉니다.
    • PROJECT를 Google Cloud 프로젝트의 ID로 바꿉니다.
    • LOCATION을 Cloud Composer 환경이 위치한 리전으로 바꿉니다.

  3. Kubernetes 보안 비밀을 만듭니다.

    1. 다음 명령어를 실행하여 sql_alchemy_conn 값을 test_value로 설정하는 Kubernetes 보안 비밀을 생성합니다.

      kubectl create secret generic airflow-secrets \
        --from-literal sql_alchemy_conn=test_value -n composer-user-workloads
      

    2. 다음 명령어를 실행하여 service-account.json의 값을 key.json이라는 서비스 계정 키 파일의 로컬 경로로 설정하는 Kubernetes 보안 비밀을 만듭니다.

      kubectl create secret generic service-account \
        --from-file service-account.json=./key.json -n composer-user-workloads
      

  4. 보안 비밀을 설정한 후 Airflow UI에서 ex-kube-secrets 태스크를 다시 실행합니다.

  5. ex-kube-secrets 작업 성공을 확인합니다.

전체 구성

이 예시에서는 KubernetesPodOperator에서 구성할 수 있는 모든 변수를 보여줍니다 코드를 수정하지 않아도 ex-all-configs 작업이 성공적으로 수행됩니다.

각 변수에 대한 세부정보는 Airflow KubernetesPodOperator 참조를 확인하세요.

kubernetes_full_pod = KubernetesPodOperator(
    task_id="ex-all-configs",
    name="pi",
    namespace="composer-user-workloads",
    image="perl:5.34.0",
    # Entrypoint of the container, if not specified the Docker container's
    # entrypoint is used. The cmds parameter is templated.
    cmds=["perl"],
    # Arguments to the entrypoint. The docker image's CMD is used if this
    # is not provided. The arguments parameter is templated.
    arguments=["-Mbignum=bpi", "-wle", "print bpi(2000)"],
    # The secrets to pass to Pod, the Pod will fail to create if the
    # secrets you specify in a Secret object do not exist in Kubernetes.
    secrets=[],
    # Labels to apply to the Pod.
    labels={"pod-label": "label-name"},
    # Timeout to start up the Pod, default is 600.
    startup_timeout_seconds=600,
    # The environment variables to be initialized in the container
    # env_vars are templated.
    env_vars={"EXAMPLE_VAR": "/example/value"},
    # If true, logs stdout output of container. Defaults to True.
    get_logs=True,
    # Determines when to pull a fresh image, if 'IfNotPresent' will cause
    # the Kubelet to skip pulling an image if it already exists. If you
    # want to always pull a new image, set it to 'Always'.
    image_pull_policy="Always",
    # Annotations are non-identifying metadata you can attach to the Pod.
    # Can be a large range of data, and can include characters that are not
    # permitted by labels.
    annotations={"key1": "value1"},
    # Optional resource specifications for Pod, this will allow you to
    # set both cpu and memory limits and requirements.
    # Prior to Airflow 2.3 and the cncf providers package 5.0.0
    # resources were passed as a dictionary. This change was made in
    # https://github.com/apache/airflow/pull/27197
    # Additionally, "memory" and "cpu" were previously named
    # "limit_memory" and "limit_cpu"
    # resources={'limit_memory': "250M", 'limit_cpu': "100m"},
    container_resources=k8s_models.V1ResourceRequirements(
        requests={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
        limits={"cpu": "1000m", "memory": "10G", "ephemeral-storage": "10G"},
    ),
    # Specifies path to kubernetes config. The config_file is templated.
    config_file="/home/airflow/composer_kube_config",
    # If true, the content of /airflow/xcom/return.json from container will
    # also be pushed to an XCom when the container ends.
    do_xcom_push=False,
    # List of Volume objects to pass to the Pod.
    volumes=[],
    # List of VolumeMount objects to pass to the Pod.
    volume_mounts=[],
    # Identifier of connection that should be used
    kubernetes_conn_id="kubernetes_default",
    # Affinity determines which nodes the Pod can run on based on the
    # config. For more information see:
    # https://kubernetes.io/docs/concepts/configuration/assign-pod-node/
    # Pod affinity with the KubernetesPodOperator
    # is not supported with Composer 2
    # instead, create a cluster and use the GKEStartPodOperator
    # https://cloud.google.com/composer/docs/using-gke-operator
    affinity={},
)

CNCF Kubernetes Provider 정보

GKEStartPodOperator 및 KubernetesPodOperator는 apache-airflow-providers-cncf-kubernetes Provider 내에서 구현됩니다.

CNCF Kubernetes Provider의 자세한 출시 노트는 CNCF Kubernetes Provider 웹사이트를 참조하세요.

버전 6.0.0

CNCF Kubernetes Provider 패키지 버전 6.0.0에서는 기본적으로 kubernetes_default 연결이 KubernetesPodOperator에서 사용됩니다.

버전 5.0.0에서 커스텀 연결을 지정한 경우 이 커스텀 연결이 연산자에서 계속 사용합니다. kubernetes_default 연결을 사용하도록 다시 전환하려면 이에 따라 DAG를 조정해야 할 수 있습니다.

버전 5.0.0

이 버전은 버전 4.4.0과 비교하여 이전 버전과 호환되지 않는 몇 가지 변경사항을 도입합니다. 가장 중요한 것은 버전 5.0.0에서 사용되지 않는 kubernetes_default 연결과 관련이 있습니다.

  • kubernetes_default 연결을 수정해야 합니다. Kube 구성 경로를 /home/airflow/composer_kube_config로 설정해야 합니다(그림 1 참조). 또는 config_fileKubernetesPodOperator 구성에 추가해야 합니다(다음 코드 예시 참조).
Airflow UI의 Kube 구성 경로 필드
그림 1. Airflow UI, kubernetes_default 연결 수정(확대하려면 클릭)
  • 다음과 같은 방법으로 KubernetesPodOperator를 사용하여 태스크 코드를 수정합니다.
KubernetesPodOperator(
  # config_file parameter - can be skipped if connection contains this setting
  config_file="/home/airflow/composer_kube_config",
  # definition of connection to be used by the operator
  kubernetes_conn_id='kubernetes_default',
  ...
)

버전 5.0.0에 대한 자세한 내용은 CNCF Kubernetes Provider 출시 노트를 참조하세요.

문제 해결

포드 오류 문제 해결 관련 팁

Airflow UI에서 태스크 로그를 확인하는 것 외에 다음 로그도 확인하세요.

  • Airflow 스케줄러 및 작업자의 출력

    1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

      환경으로 이동

    2. 환경의 DAG 링크를 따릅니다.

    3. 사용자 환경 버킷에서 한 수준 위로 이동합니다.

    4. logs/<DAG_NAME>/<TASK_ID>/<EXECUTION_DATE> 폴더의 로그를 검토합니다.

  • GKE 워크로드 내 Google Cloud 콘솔의 자세한 포드 로그. 이러한 로그에는 포드 정의 YAML 파일, 포드 이벤트, 포드 세부정보가 포함됩니다.

GKEStartPodOperator를 사용할 때의 0이 아닌 반환 코드

KubernetesPodOperatorGKEStartPodOperator를 사용할 때 컨테이너 진입점의 반환 코드를 통해 작업의 성공 여부를 알 수 있습니다. 0이 아닌 반환 코드는 실패를 나타냅니다.

KubernetesPodOperatorGKEStartPodOperator 사용 시 공통적인 패턴은 셸 스크립트를 컨테이너 진입점으로 실행하여 컨테이너 내에서 여러 작업을 그룹화합니다.

이러한 스크립트를 작성 중인 경우 스크립트 상단에 set -e 명령어를 사용하여 스크립트의 실패한 명령어가 스크립트를 종료하고 실패를 Airflow 작업 인스턴스에 적용하도록 합니다.

포드 제한 시간

KubernetesPodOperator의 기본 제한 시간은 120초이므로 큰 이미지를 다운로드하기 전에 시간 초과가 발생할 수 있습니다. 제한 시간을 변경하려면 KubernetesPodOperator 생성 시 startup_timeout_seconds 매개변수를 변경합니다.

포드가 시간 초과되면 Airflow UI에서 작업별 로그를 사용할 수 있습니다. 예를 들면 다음과 같습니다.

Executing <Task(KubernetesPodOperator): ex-all-configs> on 2018-07-23 19:06:58.133811
Running: ['bash', '-c', u'airflow run kubernetes-pod-example ex-all-configs 2018-07-23T19:06:58.133811 --job_id 726 --raw -sd DAGS_FOLDER/kubernetes_pod_operator_sample.py']
Event: pod-name-9a8e9d06 had an event of type Pending
...
...
Event: pod-name-9a8e9d06 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Cloud Composer 서비스 계정에 작업을 수행하는 데 필요한 IAM 권한이 없는 경우에도 포드 시간 초과가 발생할 수 있습니다. 이를 확인하려면 GKE 대시보드를 사용하여 포드 수준 오류를 보고 특정 워크로드의 로그를 확인하거나 Cloud Logging을 사용하세요.

새 연결을 설정할 수 없음

자동 업그레이드는 GKE 클러스터에서 기본적으로 사용 설정됩니다. 노드 풀이 업그레이드 중인 클러스터에 있는 경우 다음 오류가 표시될 수 있습니다.

<Task(KubernetesPodOperator): gke-upgrade> Failed to establish a new
connection: [Errno 111] Connection refused

클러스터가 업그레이드 중인지 확인하려면 Google Cloud Console에서 Kubernetes 클러스터 페이지로 이동하여 환경의 클러스터 이름 옆에 있는 로드 아이콘을 찾으세요.

다음 단계