Airflow CLI 액세스

Cloud Composer 1 | Cloud Composer 2

Apache Airflow에는 DAG 트리거 및 관리, DAG 실행 및 태스크 정보 가져오기, 연결 및 사용자 추가와 삭제 등의 작업을 수행하는 데 사용할 수 있는 명령줄 인터페이스(CLI)가 있습니다.

CLI 구문 버전 정보

Cloud Composer 2의 Airflow는 Airflow 2 CLI 구문을 사용합니다

지원되는 Airflow CLI 명령어

지원되는 Airflow 명령어의 전체 목록은 gcloud composer environments run 참조를 확인하세요.

시작하기 전에

Airflow CLI 명령어 실행

환경에서 Airflow CLI 명령어를 실행하려면 gcloud를 사용합니다.

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

다음과 같이 바꿉니다.

  • ENVIRONMENT_NAME을 환경 이름으로 바꿉니다.
  • LOCATION을 환경이 위치한 리전으로 바꿉니다.
  • SUBCOMMAND지원되는 Airflow CLI 명령어 중 하나로 바꿉니다.
  • SUBCOMMAND_ARGUMENTS를 Airflow CLI 명령어의 인수로 바꿉니다.

하위 명령어 인수 구분 기호

지정된 Airflow CLI 명령어의 인수를 --로 구분합니다.

Airflow 2

Airflow 2 CLI 구문의 경우:

  • 복합 CLI 명령어를 하위 명령어로 지정합니다.
  • -- 구분 기호 뒤에 복합 명령어의 인수를 하위 명령어 인수로 지정합니다.
gcloud composer environments run example-environment \
    dags list -- --output=json

Airflow 1

Cloud Composer 2는 Airflow 2만 지원합니다.

기본 위치

대부분 gcloud composer 명령어에는 위치가 필요합니다. --location 플래그로 위치를 지정하거나 기본 위치를 설정할 수 있습니다.

예시

예를 들어 Cloud Composer 환경에서 ID가 5077sample_quickstart DAG를 트리거하려면 다음을 실행합니다.

Airflow 2

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Airflow 1

Cloud Composer 2는 Airflow 2만 지원합니다.

비공개 IP 환경에서 명령어 실행

Cloud Composer 버전 2.4.0부터는 추가 구성 없이 비공개 IP 환경에서 Airflow CLI 명령어를 실행할 수 있습니다. 머신에서 이러한 명령어를 실행하기 위해 환경의 클러스터 제어 영역 엔드포인트에 액세스할 필요가 없습니다.

Cloud Composer 버전 2.4.0 이하:

비공개 IP 환경에서 Airflow CLI 명령어를 실행하려면 GKE 클러스터의 제어 영역 엔드포인트에 액세스할 수 있는 머신에서 실행합니다. 실행 옵션은 비공개 클러스터 구성에 따라 다를 수 있습니다.

공개 엔드포인트 액세스가 환경의 클러스터에서 중지되면 gcloud composer 명령어를 사용하여 Airflow CLI를 실행할 수 없습니다. Airflow CLI 명령어를 실행하려면 다음 단계를 수행합니다.

  1. VPC 네트워크에 VM을 만듭니다.
  2. 클러스터 사용자 인증 정보를 가져옵니다. bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip 명령어를 실행합니다.
  • kubectl을 사용하여 Airflow 명령어를 실행합니다. 예를 들면 다음과 같습니다.
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

COMPOSER_NAMESPACEcomposer-2-0-28-airflow-2-3-394zxc12411과 유사한 네임스페이스로 바꿉니다. 워크로드 목록에서 또는 kubectl get namespaces 명령어를 사용하여 Cloud Composer를 찾을 수 있습니다.

공개 엔드포인트 액세스가 환경의 클러스터에서 사용 설정되면 승인된 네트워크에 추가된 외부 IP 주소가 있는 머신에서 Airflow CLI 명령어를 실행할 수도 있습니다. 머신에서 액세스를 사용 설정하려면 머신의 외부 주소를 사용자 환경의 승인된 네트워크 목록에 추가합니다.

Cloud Composer API를 통해 Airflow CLI 명령어 실행

Cloud Composer 버전 2.4.0부터 Cloud Composer API를 통해 Airflow CLI 명령어를 실행할 수 있습니다.

명령어 실행

environments.executeAirflowCommand API 요청을 생성합니다.

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

다음을 바꿉니다.

  • PROJECT_ID: 프로젝트 ID입니다.
  • LOCATION: 환경이 위치한 리전입니다.
  • ENVIRONMENT_NAME: 환경의 이름입니다.
  • AIRFLOW_COMMAND: 실행하려는 Airflow CLI 명령어입니다(예: dags).
  • AIRFLOW_SUBCOMMAND: 실행하려는 Airflow CLI 명령어의 하위 명령어입니다(예: list).
  • (선택사항) SUBCOMMAND_PARAMETER: 하위 명령어의 매개변수입니다. 매개변수를 두 개 이상 사용하려면 목록에 항목을 더 추가합니다.

예:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

Poll 명령어 상태

Cloud Composer API를 통해 Airflow CLI 명령어를 실행한 후 PollAirflowCommand 요청을 전송하고 오류 및 상태 코드에 대해 exitInfo의 필드를 검사하여 명령어가 성공적으로 완료되었는지 확인합니다. output 필드에는 로그 줄이 포함됩니다.

명령어 실행 상태와 로그를 가져오려면 ExecuteAirflowCommandRequest에서 반환한 executionId, pod, podNamespace 값을 제공합니다.

예:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

문제 해결

클러스터 제어 영역에 연결되지 않음

gcloud composer environments run 또는 kubectl 명령어를 실행할 때 다음과 같은 오류가 발생할 수 있습니다.

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

증상: 이 오류 메시지는 이러한 명령어를 실행하는 컴퓨터에서 네트워크 연결이 안됨을 나타냅니다.

솔루션: 비공개 IP 환경에서 명령어 실행 섹션에 제시된 가이드라인을 따르거나 kubectl 명령어 타임아웃 섹션에서 제공되는 안내를 따릅니다.