Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow 具有指令列介面 (CLI),可用於執行觸發及管理 DAG、取得 DAG 執行和工作相關資訊,以及新增和刪除連線和使用者等工作。
支援的 Airflow CLI 指令
Airflow 使用 Airflow 2 CLI 語法,相關說明請參閱 Airflow 說明文件。
如需支援的 Airflow CLI 指令完整清單,請參閱 gcloud composer environments run
指令的參考資料。
事前準備
您必須具備權限,才能搭配 Cloud Composer 使用 Google Cloud CLI,並執行 Airflow CLI 指令。
Airflow CLI 指令會耗用
environments.executeAirflowCommand
配額。在 2.4.0 之前的 Cloud Composer 版本中,您需要存取環境叢集的控制層,才能執行 Airflow CLI 指令。
執行 Airflow CLI 指令
如要在環境中執行 Airflow CLI 指令,請使用 gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
取代下列項目:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。SUBCOMMAND
:支援的 Airflow CLI 指令之一。SUBCOMMAND_ARGUMENTS
,並加入 Airflow CLI 指令的引數。
子指令引數分隔符
以 --
分隔指定 Airflow CLI 指令的引數:
- 將複合 CLI 指令指定為子指令。
- 在
--
分隔符號後,將複合指令的任何引數指定為子指令引數。
範例:
gcloud composer environments run example-environment \
dags list -- --output=json
預設位置
大多數 gcloud composer
指令都需要位置。如要指定位置,您可以使用 --location
標記或設定預設位置。
舉例來說,如要在 Cloud Composer 環境中觸發名稱為 sample_quickstart
且 ID 為 5077
的 DAG,請使用以下指令:
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
在私人 IP 環境中執行指令
在 2.4.0 之前的 Cloud Composer 版本中:
如要在私人 IP 環境中執行 Airflow CLI 指令,請在可存取 GKE 叢集控制層端點的機器上執行指令。可用選項可能因私有叢集設定而異。
如果環境叢集停用了公開端點存取權,就無法使用 gcloud composer
指令執行 Airflow CLI。如要執行 Airflow CLI 指令,請完成下列步驟:
- 在虛擬私有雲網路中建立 VM
取得叢集憑證。執行下列指令:
gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
使用 kubectl
執行 Airflow 指令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
將 COMPOSER_NAMESPACE
替換為類似下列的命名空間:
composer-2-0-28-airflow-2-3-394zxc12411
。您可以在工作負載清單中找到 Cloud Composer,也可以使用 kubectl get namespaces
指令。
如果環境的叢集已啟用公開端點存取權,您也可以從外部 IP 位址已新增至授權網路的機器,執行 Airflow CLI 指令。如要從電腦啟用存取權,請將電腦的外部位址新增至環境的授權網路清單。
執行 gcloud composer environments run
或 kubectl
指令時,可能會遇到下列錯誤:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
症狀:這則錯誤訊息表示您執行這些指令的電腦沒有網路連線。
解決方案:請按照「在私有 IP 環境中執行指令」一節中的指南操作,或使用「kubectl
指令逾時」一節中的操作說明。
透過 Cloud Composer API 執行 Airflow CLI 指令
從 Cloud Composer 2.4.0 版開始,您可以透過 Cloud Composer API 執行 Airflow CLI 指令。
執行指令
建構 environments.executeAirflowCommand
API 要求:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
更改下列內容:
PROJECT_ID
:專案 ID。LOCATION
:環境所在的區域。ENVIRONMENT_NAME
:環境名稱。AIRFLOW_COMMAND
:您要執行的 Airflow CLI 指令,例如dags
。AIRFLOW_SUBCOMMAND
:要執行的 Airflow CLI 指令子指令,例如list
。- (選用)
SUBCOMMAND_PARAMETER
:子指令的參數。如要使用多個參數,請在清單中新增更多項目。
範例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
輪詢指令狀態
透過 Cloud Composer API 執行 Airflow CLI 指令後,請發出 PollAirflowCommand 要求,並檢查 exitInfo
中的欄位是否有錯誤和狀態碼,確認指令是否順利完成。output
欄位包含記錄檔行。
如要取得指令執行狀態並擷取記錄,請提供 executionId
、pod
和 podNamespace
值 (由 ExecuteAirflowCommandRequest
傳回):
範例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}