Cloud Composer 3 | Cloud Composer 2 | Cloud Composer�
Apache Airflow 具有指令列介面 (CLI),可用於執行觸發及管理 DAG、取得 DAG 執行和工作相關資訊,以及新增和刪除連線和使用者等工作。
支援的 Airflow CLI 指令
Airflow 使用 Airflow 2 CLI 語法,相關說明請參閱 Airflow 說明文件。
如需支援的 Airflow CLI 指令完整清單,請參閱 gcloud composer environments run
指令的參考資料。
事前準備
您必須具備權限,才能搭配使用 Google Cloud CLI 與 Cloud Composer,並執行 Airflow CLI 指令。
Airflow CLI 指令會耗用
environments.executeAirflowCommand
配額。在 2.4.0 之前的 Cloud Composer 版本中,您需要存取環境叢集的控制層,才能執行 Airflow CLI 指令。
執行 Airflow CLI 指令
如要在環境中執行 Airflow CLI 指令,請使用 gcloud CLI:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
取代下列項目:
ENVIRONMENT_NAME
:環境名稱。LOCATION
:環境所在的區域。SUBCOMMAND
:支援的 Airflow CLI 指令之一。SUBCOMMAND_ARGUMENTS
,並加入 Airflow CLI 指令的引數。
子指令引數分隔符
以 --
分隔指定 Airflow CLI 指令的引數:
- 將複合 CLI 指令指定為子指令。
- 在
--
分隔符號後,將複合指令的任何引數指定為子指令引數。
範例:
gcloud composer environments run example-environment \
dags list -- --output=json
預設位置
大多數 gcloud composer
指令都需要位置。如要指定位置,您可以使用 --location
標記或設定預設位置。
舉例來說,如要在 Cloud Composer 環境中觸發名稱為 sample_quickstart
且 ID 為 5077
的 DAG,請使用以下指令:
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
在私人 IP 環境中執行指令
在 2.4.0 之前的 Cloud Composer 版本中:
如要在私人 IP 環境中執行 Airflow CLI 指令,請在可存取 GKE 叢集控制層端點的機器上執行指令。可用選項可能因私有叢集設定而異。
如果環境叢集停用了公開端點存取權,就無法使用 gcloud composer
指令執行 Airflow CLI。如要執行 Airflow CLI 指令,請完成下列步驟:
- 在虛擬私有雲網路中建立 VM
取得叢集憑證。執行下列指令:
gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
使用 kubectl
執行 Airflow 指令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
將 COMPOSER_NAMESPACE
替換為類似下列的命名空間:
composer-2-0-28-airflow-2-3-394zxc12411
。您可以在工作負載清單中找到 Cloud Composer,也可以使用 kubectl get namespaces
指令。
如果環境的叢集已啟用公開端點存取權,您也可以從外部 IP 位址已新增至授權網路的機器,執行 Airflow CLI 指令。如要從電腦啟用存取權,請將電腦的外部位址新增至環境的授權網路清單。
執行 gcloud composer environments run
或 kubectl
指令時,可能會遇到下列錯誤:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
症狀:這則錯誤訊息表示執行這些指令的電腦沒有網路連線。
解決方案:請按照「在私有 IP 環境中執行指令」一節中的指南操作,或使用「kubectl
指令逾時」一節中的操作說明。
透過 Cloud Composer API 執行 Airflow CLI 指令
從 Cloud Composer 2.4.0 版開始,您可以使用 Cloud Composer API 執行 Airflow CLI 指令。
執行指令
建構 environments.executeAirflowCommand
API 要求:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
更改下列內容:
PROJECT_ID
:專案 ID。LOCATION
:環境所在的區域。ENVIRONMENT_NAME
:環境名稱。AIRFLOW_COMMAND
:您要執行的 Airflow CLI 指令,例如dags
。AIRFLOW_SUBCOMMAND
:要執行的 Airflow CLI 指令子指令,例如list
。- (選用)
SUBCOMMAND_PARAMETER
:子指令的參數。如要使用多個參數,請在清單中新增更多項目。
範例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
輪詢指令狀態
透過 Cloud Composer API 執行 Airflow CLI 指令後,請發出 PollAirflowCommand 要求,並檢查 exitInfo
中的欄位是否有錯誤和狀態碼,確認指令是否順利完成。output
欄位包含記錄檔行。
如要取得指令執行狀態並擷取記錄,請提供 executionId
、pod
和 podNamespace
值 (由 ExecuteAirflowCommandRequest
傳回):
範例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}