Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。
CLI 语法版本简介
Cloud Composer 2 中的 Airflow 使用 Airflow 2 CLI 语法
支持的 Airflow CLI 命令
如需查看支持的 Airflow CLI 命令的完整列表,请参阅 gcloud composer environments run
参考文档。
准备工作
您必须拥有足够的权限,才能将 Google Cloud CLI 与 Cloud Composer 搭配使用并运行 Airflow CLI 命令。
在 2.4.0 之前的 Cloud Composer 版本中,您需要访问环境集群的控制平面才能运行 Airflow CLI 命令。
运行 Airflow CLI 命令
要在您的环境中运行 Airflow CLI 命令,请使用 gcloud
:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
SUBCOMMAND \
-- SUBCOMMAND_ARGUMENTS
替换:
ENVIRONMENT_NAME
替换为环境的名称。LOCATION
替换为环境所在的区域。- 将
SUBCOMMAND
替换为受支持的 Airflow CLI 命令之一。 - 将
SUBCOMMAND_ARGUMENTS
替换为 Airflow CLI 命令的参数。
子命令参数分隔符
使用 --
分隔指定的 Airflow CLI 命令的参数:
Airflow 2
对于 Airflow 2 CLI 语法:
- 将复合 CLI 命令指定为子命令。
- 在
--
分隔符后面指定复合命令的任何参数,作为子命令参数。
gcloud composer environments run example-environment \
dags list -- --output=json
Airflow 1
Cloud Composer 2 仅支持 Airflow 2。
默认位置
大多数 gcloud composer
命令需要位置信息。您可以使用 --location
标志指定位置,也可以设置默认位置。
示例
例如,如需在 Cloud Composer 环境中触发名为 sample_quickstart
且 ID 为 5077
的 DAG,请使用以下命令:
Airflow 2
gcloud composer environments run example-environment \
--location us-central1 dags trigger -- sample_quickstart \
--run-id=5077
Airflow 1
Cloud Composer 2 仅支持 Airflow 2。
在专用 IP 环境中运行命令
从 Cloud Composer 2.4.0 版开始,您无需进行额外配置,即可在专用 IP 环境中运行 Airflow CLI 命令。您的机器无需访问环境的集群控制平面端点即可运行这些命令。
在 2.4.0 之前的 Cloud Composer 版本中:
如需在专用 IP 环境中运行 Airflow CLI 命令,请在可访问 GKE 集群的控制平面端点的机器上运行这些命令。您的选项可能会因专用集群配置而有所不同。
如果您环境的集群中停用了公共端点访问权限,则无法使用 gcloud composer
命令运行 Airflow CLI。如需能够运行 Airflow CLI 命令,请执行以下步骤:
- 在 VPC 网络中创建虚拟机
- 获取集群凭据。运行以下命令:
bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
- 使用
kubectl
运行 Airflow 命令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
--container airflow-scheduler -- airflow dags list
将 COMPOSER_NAMESPACE
替换为类似于 composer-2-0-28-airflow-2-3-394zxc12411
的命名空间。您可以在工作负载列表中或使用 kubectl get namespaces
命令找到 Cloud Composer。
如果环境的集群中启用了公共端点访问权限,您还可以从其外部 IP 地址已添加到已获授权的网络的机器运行 Airflow CLI 命令。如需启用从您的机器访问集群的功能,请将您机器的外部地址添加到环境的已获授权的网络列表中。
通过 Cloud Composer API 运行 Airflow CLI 命令
从 Cloud Composer 2.4.0 版开始,您可以通过 Cloud Composer API 运行 Airflow CLI 命令。
执行一项命令
构建 environments.executeAirflowCommand
API 请求:
{
"environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
"command": "AIRFLOW_COMMAND",
"subcommand": "AIRFLOW_SUBCOMMAND",
"parameters": [
"SUBCOMMAND_PARAMETER"
]
}
替换以下内容:
PROJECT_ID
:项目 ID。LOCATION
:环境所在的区域。ENVIRONMENT_NAME
:您的环境的名称。AIRFLOW_COMMAND
:您要执行的 Airflow CLI 命令,例如dags
。AIRFLOW_SUBCOMMAND
:您要执行的 Airflow CLI 命令的子命令,例如list
。- (可选)
SUBCOMMAND_PARAMETER
:子命令的参数。如果您想使用多个参数,请向列表中添加更多项。
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
"environment": "projects/example-project/locations/us-central1/environments/example-environment",
"command": "dags",
"subcommand": "list",
"parameters": [
"-o json",
"--verbose"
]
}
轮询命令状态
通过 Cloud Composer API 执行 Airflow CLI 命令后,请发出 PollAirflowCommand 请求并检查 exitInfo
中的字段是否存在错误和状态代码,以确认命令是否已成功完成。output
字段包含日志行。
如需获取命令执行状态并提取日志,请提供 ExecuteAirflowCommandRequest
返回的 executionId
、pod
和 podNamespace
值:
示例:
// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
"executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
"pod": "airflow-scheduler-1327d8cd68-hblpd",
"podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
"nextLineNumber": 1
}
问题排查
无法连接到集群控制平面
运行 gcloud composer environments run
或 kubectl
命令时,您可能会遇到以下错误:
Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"
问题现象:此错误消息表示运行这些命令的计算机没有网络连接。
解决方法:请遵循在专用 IP 环境中运行命令部分中介绍的指南,或使用kubectl
命令超时部分中提供的说明。