存取 Airflow 指令列介面

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Apache Airflow 具有指令列介面 (CLI),可用於執行觸發及管理 DAG、取得 DAG 執行和工作相關資訊,以及新增和刪除連線和使用者等工作。

支援的 Airflow CLI 指令

Airflow 使用 Airflow 2 CLI 語法,相關說明請參閱 Airflow 說明文件。

如需支援的 Airflow CLI 指令完整清單,請參閱 gcloud composer environments run 指令的參考資料。

事前準備

  • 您必須具備權限,才能搭配 Cloud Composer 使用 Google Cloud CLI,並執行 Airflow CLI 指令。

  • Airflow CLI 指令會耗用environments.executeAirflowCommand 配額

  • 在 2.4.0 之前的 Cloud Composer 版本中,您需要存取環境叢集的控制層,才能執行 Airflow CLI 指令。

執行 Airflow CLI 指令

如要在環境中執行 Airflow CLI 指令,請使用 gcloud CLI:

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

取代下列項目:

  • ENVIRONMENT_NAME:環境名稱。
  • LOCATION:環境所在的區域。
  • SUBCOMMAND支援的 Airflow CLI 指令之一。
  • SUBCOMMAND_ARGUMENTS,並加入 Airflow CLI 指令的引數。

子指令引數分隔符

-- 分隔指定 Airflow CLI 指令的引數:

  • 將複合 CLI 指令指定為子指令。
  • -- 分隔符號後,將複合指令的任何引數指定為子指令引數。

範例:

gcloud composer environments run example-environment \
    dags list -- --output=json

預設位置

大多數 gcloud composer 指令都需要位置。如要指定位置,您可以使用 --location 標記或設定預設位置

舉例來說,如要在 Cloud Composer 環境中觸發名稱為 sample_quickstart 且 ID 為 5077 的 DAG,請使用以下指令:

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

在私人 IP 環境中執行指令

在 2.4.0 之前的 Cloud Composer 版本中:

如要在私人 IP 環境中執行 Airflow CLI 指令,請在可存取 GKE 叢集控制層端點的機器上執行指令。可用選項可能因私有叢集設定而異。

如果環境叢集停用了公開端點存取權,就無法使用 gcloud composer 指令執行 Airflow CLI。如要執行 Airflow CLI 指令,請完成下列步驟:

  1. 在虛擬私有雲網路中建立 VM
  2. 取得叢集憑證。執行下列指令:

    gcloud container clusters get-credentials CLUSTER_NAME \
      --region REGION \
      --project PROJECT \
      --internal-ip
    

使用 kubectl 執行 Airflow 指令。例如:

kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

COMPOSER_NAMESPACE 替換為類似下列的命名空間: composer-2-0-28-airflow-2-3-394zxc12411。您可以在工作負載清單中找到 Cloud Composer,也可以使用 kubectl get namespaces 指令。

如果環境的叢集已啟用公開端點存取權,您也可以從外部 IP 位址已新增至授權網路的機器,執行 Airflow CLI 指令。如要從電腦啟用存取權,請將電腦的外部位址新增至環境的授權網路清單

執行 gcloud composer environments runkubectl 指令時,可能會遇到下列錯誤:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

症狀:這則錯誤訊息表示您執行這些指令的電腦沒有網路連線。

解決方案:請按照「在私有 IP 環境中執行指令」一節中的指南操作,或使用「kubectl 指令逾時」一節中的操作說明。

透過 Cloud Composer API 執行 Airflow CLI 指令

從 Cloud Composer 2.4.0 版開始,您可以透過 Cloud Composer API 執行 Airflow CLI 指令。

執行指令

建構 environments.executeAirflowCommand API 要求:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

更改下列內容:

  • PROJECT_ID專案 ID
  • LOCATION:環境所在的區域。
  • ENVIRONMENT_NAME:環境名稱。
  • AIRFLOW_COMMAND:您要執行的 Airflow CLI 指令,例如 dags
  • AIRFLOW_SUBCOMMAND:要執行的 Airflow CLI 指令子指令,例如 list
  • (選用) SUBCOMMAND_PARAMETER:子指令的參數。如要使用多個參數,請在清單中新增更多項目。

範例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

輪詢指令狀態

透過 Cloud Composer API 執行 Airflow CLI 指令後,請發出 PollAirflowCommand 要求,並檢查 exitInfo 中的欄位是否有錯誤和狀態碼,確認指令是否順利完成。output 欄位包含記錄檔行。

如要取得指令執行狀態並擷取記錄,請提供 executionIdpodpodNamespace 值 (由 ExecuteAirflowCommandRequest 傳回):

範例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

後續步驟