Cloud Composer 1 |Cloud Composer 2 |Cloud Composer 3
Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。
CLI 语法版本简介
Cloud Composer 2 中的 Airflow 使用 Airflow 2 CLI 语法
支持的 Airflow CLI 命令
如需查看支持的 Airflow CLI 命令的完整列表,请参阅 gcloud composer environments run
参考文档。
准备工作
您必须拥有将
gcloud
命令行工具与 Cloud Composer 搭配使用以及运行 Airflow CLI 命令的足够权限。有关 相关信息,请参阅访问权限控制。在 Cloud Composer 2.4.0 之前的版本中, 对环境集群的控制平面的访问权限 来运行 Airflow CLI 命令
运行 Airflow CLI 命令
要在您的环境中运行 Airflow CLI 命令,请使用 gcloud
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。- 将
SUBCOMMAND
替换为受支持的 Airflow CLI 命令之一。 - 将
SUBCOMMAND_ARGUMENTS
替换为 Airflow CLI 命令的参数。
子命令参数分隔符
使用 --
分隔指定的 Airflow CLI 命令的参数:
Airflow 2
对于 Airflow 2 CLI 语法:
- 将复合 CLI 命令指定为子命令。
- 将复合命令的任何参数指定为子命令参数;
--
分隔符后。
gcloud composer environments run example-environment \
dags list -- --output=json
Airflow 1
Cloud Composer 2 仅支持 Airflow 2。
默认位置
大多数 gcloud composer
命令需要位置信息。您可以使用 --location
标志指定位置,也可以设置默认位置。
示例
例如,如需在 Cloud Composer 环境中触发名为 sample_quickstart
且 ID 为 5077
的 DAG,请使用以下命令:
Airflow 2
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
Airflow 1
Cloud Composer 2 仅支持 Airflow 2。
在专用 IP 环境中运行命令
从 Cloud Composer 2.4.0 版开始,您可以运行 Airflow CLI 而无需额外配置。您的机器无需访问环境的集群控制平面端点即可运行这些命令。
在 Cloud Composer 2.4.0 之前的版本中:
如需在专用 IP 环境中运行 Airflow CLI 命令,请执行以下操作: 在可以访问 GKE 集群 控制平面端点您的选项可能会因专用集群配置而有所不同。
如果您环境的集群中停用了公共端点访问权限,则无法使用 gcloud composer
命令运行 Airflow CLI。为了能够运行 Airflow CLI 命令,请执行以下步骤:
- 在 VPC 网络中创建虚拟机
- 获取集群凭据。运行以下命令:
bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
- 使用
kubectl
运行 Airflow 命令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
将 COMPOSER_NAMESPACE
替换为类似于以下内容的命名空间:
composer-2-0-28-airflow-2-3-394zxc12411
。您可以在工作负载列表中或使用 kubectl get namespaces
命令找到您的 Cloud Composer。
如果您在环境的集群中启用了公共端点访问权限,可以执行以下操作: 还可以从外部 IP 地址为 添加到已获授权的网络中要从您的计算机启用访问权限,请将 虚拟机的外部地址 授权网络列表。
通过 Cloud Composer API 运行 Airflow CLI 命令
从 Cloud Composer 2.4.0 版开始,您可以运行 Airflow CLI 通过 Cloud Composer API 编写命令
执行一项命令
构建一个 environments.executeAirflowCommand
API 请求:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
替换以下内容:
PROJECT_ID
:项目 ID。LOCATION
:环境所在的区域。ENVIRONMENT_NAME
:您的环境的名称。AIRFLOW_COMMAND
:您要执行的 Airflow CLI 命令,例如dags
。AIRFLOW_SUBCOMMAND
:用于 例如list
。- (可选)
SUBCOMMAND_PARAMETER
:子命令的参数。如果您 要使用多个参数,请向列表中添加更多项。
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
投票命令状态
通过 Cloud Composer API 执行 Airflow CLI 命令后,请发出 PollAirflowCommand 请求并检查 exitInfo
中的字段是否存在错误和状态代码,以确定命令是否已成功完成。output
字段包含
日志行。
如需获取命令执行状态并提取日志,请提供 ExecuteAirflowCommandRequest
返回的 executionId
、pod
和 podNamespace
值:
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}
问题排查
无法连接到集群控制平面
运行 gcloud composer environments run
或 kubectl
命令时,您可能会遇到以下错误:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
问题现象:此错误消息表示运行这些命令的计算机没有网络连接。
解决方案:遵循
在专用 IP 环境中运行命令
部分或参阅
kubectl
命令超时部分。