CeleryKubernetesExecutor 사용

Cloud Composer 1 | Cloud Composer 2

이 페이지에서는 Cloud Composer에서 CeleryKubernetesExecutor를 사용 설정하고 DAG에서 KubernetesExecutor를 사용하는 방법을 설명합니다.

CeleryKubernetesExecutor 정보

CeleryKubernetesExecutor는 CeleryExecutor와 KubernetesExecutor를 동시에 사용할 수 있는 실행자 유형입니다. Airflow는 태스크에 정의한 큐를 기준으로 실행자를 선택합니다. 하나의 DAG에서 일부 태스크는 CeleryExecutor로, 다른 태스크는 KubernetesExecutor로 실행할 수 있습니다.

  • CeleryExecutor는 빠르고 확장 가능한 태스크 실행에 최적화되어 있습니다.
  • KubernetesExecutor는 리소스 집약적인 태스크를 실행하고 태스크를 격리하여 실행하도록 설계되었습니다.

Cloud Composer의 CeleryKubernetesExecutor

Cloud Composer의 CeleryKubernetesExecutor는 작업에 KubernetesExecutor를 사용할 수 있는 기능을 제공합니다. CeleryKubernetesExecutor와 별도로 Cloud Composer에서 KubernetesExecutor를 사용할 수 없습니다.

Cloud Composer는 Airflow 작업자와 동일한 네임스페이스에서 환경 클러스터의 KubernetesExecutor로 실행하는 태스크를 실행합니다. 이러한 태스크에는 Airflow 작업자와 동일한 바인딩이 있으며 프로젝트의 리소스에 액세스할 수 있습니다.

KubernetesExecutor로 실행하는 태스크에는 Cloud Composer 가격 책정 모델이 사용됩니다. 이러한 태스크가 포함된 포드가 환경의 클러스터에서 실행되기 때문입니다. 이 포드에 Cloud Composer 컴퓨팅 SKU(CPU, 메모리, 스토리지용)가 적용됩니다.

다음과 같은 경우 CeleryExecutor로 태스크를 실행하는 것이 좋습니다.

  • 태스크 시작 시간이 중요한 경우
  • 태스크는 런타임 격리가 필요하지 않으며 리소스가 많이 사용되지 않는 경우

다음과 같은 경우에는 KubernetesExecutor로 태스크를 실행하는 것이 좋습니다.

  • 태스크에 런타임 격리가 필요한 경우. 예를 들어 태스크가 자체 포드에서 실행되기 때문에 메모리와 CPU를 두고 경쟁하지 않습니다.
  • 태스크에는 추가 시스템 라이브러리(또는 PyPI 패키지)가 필요한 경우
  • 태스크에 리소스가 많이 사용되므로 사용 가능한 CPU 및 메모리 리소스를 제어하려는 경우

KubernetesExecutor와 KubernetesPodOperator 비교

KubernetesExecutor로 태스크를 실행하는 것은 KubernetesPodOperator를 사용해 태스크를 실행하는 것과 유사합니다. 태스크는 포드에서 실행되므로 포드 수준의 태스크 격리와 더 나은 리소스 관리를 제공합니다.

하지만 몇 가지 주요 차이점이 있습니다.

  • KubernetesExecutor는 환경의 버전이 지정된 Cloud Composer 네임스페이스에서만 태스크를 실행합니다. Cloud Composer에서는 이 네임스페이스를 변경할 수 없습니다. KubernetesPodOperator가 포드 태스크를 실행하는 네임스페이스를 지정할 수 있습니다.
  • KubernetesExecutor는 모든 기본 제공 Airflow 연산자를 사용할 수 있습니다. KubernetesPodOperator는 컨테이너의 진입점으로 정의된 제공된 스크립트만 실행합니다.
  • KubernetesExecutor는 Cloud Composer 환경에 정의된 것과 동일한 Python, Airflow 구성 옵션 재정의, 환경 변수, PyPI 패키지와 함께 기본 Cloud Composer Docker 이미지를 사용합니다.

Docker 이미지 정보

기본적으로 KubernetesExecutor는 Cloud Composer가 Celery 작업자에 사용하는 것과 동일한 Docker 이미지를 사용하여 태스크를 시작합니다. 커스텀 PyPI 패키지 또는 환경 변수와 같이 환경에 지정한 모든 변경사항이 포함된 환경의 Cloud Composer 이미지입니다.

시작하기 전에

  • Cloud Composer 3에서 CeleryKubernetesExecutor를 사용할 수 있습니다.

  • Cloud Composer 3에서는 CeleryKubernetesExecutor 이외의 실행자를 사용할 수 없습니다. 즉, 하나의 DAG에서 CeleryExecutor, KubernetesExecutor 또는 둘 다를 사용하여 태스크를 실행할 수 있지만 KubernetesExecutor 또는 CeleryExecutor만 사용하도록 환경을 구성할 수는 없습니다.

CeleryKubernetesExecutor 구성

KubernetesExecutor와 관련된 기존 Airflow 구성 옵션을 재정의할 수 있습니다.

  • [kubernetes]worker_pods_creation_batch_size

    이 옵션은 스케줄러 루프당 Kubernetes 작업자 포드 생성 호출 수를 정의합니다. 기본값은 1이므로 스케줄러 하트비트당 단일 포드만 시작됩니다. KubernetesExecutor를 많이 사용하는 경우 이 값을 늘리는 것이 좋습니다.

  • [kubernetes]worker_pods_pending_timeout

    이 옵션은 작업자가 실패한 것으로 간주되기 전에 작업자가 Pending상태(포드 생성 중)에 머무를 수 있는 시간(초)을 정의합니다. 기본값은 5분입니다.

KubernetesExecutor 또는 CeleryExecutor로 태스크 실행

하나의 DAG에서 CeleryExecutor, KubernetesExecutor 또는 둘 다를 사용하여 태스크를 실행할 수 있습니다.

  • KubernetesExecutor로 태스크를 실행하려면 태스크의 queue 매개변수에 kubernetes 값을 지정합니다.
  • CeleryExecutor로 태스크를 실행하려면 queue 매개변수를 생략합니다.

다음 예시에서는 KubernetesExecutor를 사용하여 task-kubernetes 태스크를, CeleryExecutor를 사용하여 task-celery 태스크를 실행합니다.

import datetime
import airflow
from airflow.operators.python_operator import PythonOperator

with airflow.DAG(
  "composer_sample_celery_kubernetes",
  start_date=datetime.datetime(2022, 1, 1),
  schedule_interval="@daily") as dag:

  def kubernetes_example():
      print("This task runs using KubernetesExecutor")

  def celery_example():
      print("This task runs using CeleryExecutor")

  # To run with KubernetesExecutor, set queue to kubernetes
  task_kubernetes = PythonOperator(
    task_id='task-kubernetes',
    python_callable=kubernetes_example,
    dag=dag,
    queue='kubernetes')

  # To run with CeleryExecutor, omit the queue argument
  task_celery = PythonOperator(
    task_id='task-celery',
    python_callable=celery_example,
    dag=dag)

  task_kubernetes >> task_celery

KubernetesExecutor와 관련된 Airflow CLI 명령어 실행

gcloud를 사용하여 KubernetesExecutor와 관련된 여러 Airflow CLI 명령어를 실행할 수 있습니다.

작업자 포드 사양 맞춤설정

작업자 포드 사양을 태스크의 executor_config 매개변수에 전달하여 맞춤설정할 수 있습니다. 이를 통해 커스텀 CPU 및 메모리 요구사항을 정의할 수 있습니다.

태스크를 실행하는 데 사용되는 전체 작업자 포드 사양을 재정의할 수 있습니다. KubernetesExecutor에서 사용하는 태스크의 포드 사양을 검색하려면 kubernetes generate-dag-yaml Airflow CLI 명령어를 실행하면 됩니다.

작업자 포드 사양 맞춤설정에 대한 자세한 내용은 Airflow 문서를 참조하세요.

다음 예시에서는 커스텀 작업자 포드 사양을 사용하는 태스크를 보여줍니다.

PythonOperator(
    task_id='custom-spec-example',
    python_callable=f,
    dag=dag,
    queue='kubernetes',
    executor_config={
        'pod_override': k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name='base',
                        resources=k8s.V1ResourceRequirements(requests={
                            'cpu': '500m',
                            'memory': '1000Mi',
                        })
                    ),
                ],
            ),
        )
    },
)

태스크 로그 보기

KubernetesExecutor에서 실행한 태스크 로그는 CeleryExecutor에서 실행한 태스크 로그와 함께 로그 탭에서 확인할 수 있습니다.

  1. Google Cloud 콘솔에서 환경 페이지로 이동합니다.

    환경으로 이동

  2. 환경 목록에서 환경 이름을 클릭합니다. 환경 세부정보 페이지가 열립니다.

  3. 로그 탭으로 이동합니다.

  4. 모든 로그 > Airflow 로그 > 작업자로 이동합니다.

  5. airflow-k8s-worker라는 작업자는 KubernetesExecutor 태스크를 실행합니다. 특정 태스크의 로그를 찾으려면 검색에서 DAG ID 또는 태스크 ID를 키워드로 사용하면 됩니다.

다음 단계