Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。
CLI 语法版本简介
Cloud Composer 2 中的 Airflow 使用 Airflow 2 CLI 语法
支持的 Airflow CLI 命令
如需查看支持的 Airflow CLI 命令的完整列表,请参阅 gcloud composer environments run
参考文档。
准备工作
您必须具有足够的权限才能使用
gcloud
命令行 工具并运行 Airflow CLI 命令。如需了解详情,请参阅访问权限控制。在 2.4.0 之前的 Cloud Composer 版本中,您需要访问环境集群的控制平面才能运行 Airflow CLI 命令。
运行 Airflow CLI 命令
要在您的环境中运行 Airflow CLI 命令,请使用 gcloud
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。- 将
SUBCOMMAND
替换为受支持的 Airflow CLI 命令之一。 - 将
SUBCOMMAND_ARGUMENTS
替换为 Airflow CLI 命令的参数。
子命令参数分隔符
使用 --
分隔指定的 Airflow CLI 命令的参数:
Airflow 2
对于 Airflow 2 CLI 语法:
- 将复合 CLI 命令指定为子命令。
- 在
--
分隔符后面指定复合命令的任何参数,作为子命令参数。
gcloud composer environments run example-environment \
dags list -- --output=json
Airflow 1
Cloud Composer 2 仅支持 Airflow 2。
默认位置
大多数 gcloud composer
命令需要位置信息。您可以使用 --location
标志指定位置,也可以设置默认位置。
示例
例如,如需在 Cloud Composer 环境中触发名为 sample_quickstart
且 ID 为 5077
的 DAG,请使用以下命令:
Airflow 2
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
Airflow 1
Cloud Composer 2 仅支持 Airflow 2。
在专用 IP 环境中运行命令
从 Cloud Composer 2.4.0 版开始,您可以运行 Airflow CLI 而无需额外配置。您的机器无需访问环境的集群控制平面端点即可运行这些命令。
在 Cloud Composer 2.4.0 之前的版本中:
如需在专用 IP 环境中运行 Airflow CLI 命令,请执行以下操作: 在可以访问 GKE 集群 控制平面端点您的选项可能会因专用集群配置而有所不同。
如果您环境的集群中停用了公共端点访问权限,则无法使用 gcloud composer
命令运行 Airflow CLI。为了能够运行 Airflow CLI 命令,请执行以下步骤:
- 在 VPC 网络中创建虚拟机
- 获取集群凭据。运行以下命令:
bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
- 使用
kubectl
运行 Airflow 命令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
将 COMPOSER_NAMESPACE
替换为类似于 composer-2-0-28-airflow-2-3-394zxc12411
的命名空间。您可以在这里找到 Cloud Composer
或者使用 kubectl get namespaces
命令来查看。
如果您在环境的集群中启用了公共端点访问权限,可以执行以下操作: 还可以从外部 IP 地址为 添加到已获授权的网络中要从您的计算机启用访问权限,请将 虚拟机的外部地址 授权网络列表。
通过 Cloud Composer API 运行 Airflow CLI 命令
从 Cloud Composer 2.4.0 版开始,您可以通过 Cloud Composer API 运行 Airflow CLI 命令。
执行一项命令
构建一个 environments.executeAirflowCommand
API 请求:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
替换以下内容:
PROJECT_ID
:项目 ID。LOCATION
:环境所在的区域。ENVIRONMENT_NAME
:您的环境的名称。AIRFLOW_COMMAND
:您要执行的 Airflow CLI 命令,例如dags
。AIRFLOW_SUBCOMMAND
:您要执行的 Airflow CLI 命令的子命令,例如list
。- (可选)
SUBCOMMAND_PARAMETER
:子命令的参数。如果您 要使用多个参数,请向列表中添加更多项。
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
轮询命令状态
通过 Cloud Composer API 执行 Airflow CLI 命令后,请检查
此命令已成功完成
PollAirflowCommand 请求,并检查
exitInfo
中的字段,表示错误和状态代码。output
字段包含
日志行。
如需获取命令执行状态和提取日志,请提供 executionId
。
ExecuteAirflowCommandRequest
返回的 pod
和 podNamespace
值:
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}
问题排查
无法连接到集群控制平面
运行 gcloud composer environments run
或 kubectl
命令时,您可能会遇到以下错误:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
症状:此错误消息表示没有网络 从运行这些命令的计算机进行连接。
解决方案:遵循
在专用 IP 环境中运行命令
部分或参阅
kubectl
命令超时部分。