访问 Airflow 命令行界面

Cloud Composer 1 | Cloud Composer 2

Apache Airflow 具有命令行界面 (CLI),可用于执行任务,例如触发和管理 DAG、获取有关 DAG 运行和任务的信息、添加和删除连接和用户。

CLI 语法版本简介

Cloud Composer 2 中的 Airflow 使用 Airflow 2 CLI 语法

支持的 Airflow CLI 命令

如需查看支持的 Airflow CLI 命令的完整列表,请参阅 gcloud composer environments run 参考文档。

准备工作

  • 您必须具有足够的权限,才能将 gcloud 命令行工具与 Cloud Composer 搭配使用并运行 Airflow CLI 命令。如需了解详情,请参阅访问权限控制

  • 在 2.4.0 之前的 Cloud Composer 版本中,您需要访问环境集群的控制平面才能运行 Airflow CLI 命令。

运行 Airflow CLI 命令

要在您的环境中运行 Airflow CLI 命令,请使用 gcloud

gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    SUBCOMMAND \
    -- SUBCOMMAND_ARGUMENTS

您需要在其中:

  • ENVIRONMENT_NAME 替换为环境的名称。
  • LOCATION 替换为环境所在的区域。
  • SUBCOMMAND 替换为受支持的 Airflow CLI 命令之一。
  • SUBCOMMAND_ARGUMENTS 替换为 Airflow CLI 命令的参数。

子命令参数分隔符

使用 -- 分隔指定的 Airflow CLI 命令的参数:

Airflow 2

对于 Airflow 2 CLI 语法:

  • 将复合 CLI 命令指定为子命令。
  • -- 分隔符后面,将复合命令的任何参数指定为子命令参数。
gcloud composer environments run example-environment \
    dags list -- --output=json

Airflow 1

Cloud Composer 2 仅支持 Airflow 2。

默认位置

大多数 gcloud composer 命令需要位置信息。您可以使用 --location 标志指定位置,也可以设置默认位置

示例

例如,如需在 Cloud Composer 环境中触发名为 sample_quickstart 且 ID 为 5077 的 DAG,请使用以下命令:

Airflow 2

gcloud composer environments run example-environment \
    --location us-central1 dags trigger -- sample_quickstart \
    --run-id=5077

Airflow 1

Cloud Composer 2 仅支持 Airflow 2。

在专用 IP 环境中运行命令

从 Cloud Composer 2.4.0 版开始,您可以在专用 IP 环境中运行 Airflow CLI 命令,无需进行其他配置。您的机器无需访问环境的集群控制平面端点即可运行这些命令。

在 2.4.0 之前的 Cloud Composer 版本中:

如需在专用 IP 环境中运行 Airflow CLI 命令,请在可访问 GKE 集群控制平面端点的机器上运行这些命令。您的选项可能会因专用集群配置而有所不同。

如果您的环境集群中停用了公共端点访问权限,则无法使用 gcloud composer 命令运行 Airflow CLI。为了能够运行 Airflow CLI 命令,请执行以下步骤:

  1. 在 VPC 网络中创建虚拟机
  2. 获取集群凭据。运行以下命令: bash gcloud container clusters get-credentials CLUSTER_NAME \ --region REGION \ --project PROJECT \ --internal-ip
  • 使用 kubectl 运行 Airflow 命令。例如:
kubectl exec deployment/airflow-scheduler -n COMPOSER_NAMESPACE \
  --container airflow-scheduler -- airflow dags list

COMPOSER_NAMESPACE 替换为类似于 composer-2-0-28-airflow-2-3-394zxc12411 的命名空间。您可以在工作负载列表中找到 Cloud Composer,也可以使用 kubectl get namespaces 命令进行查找。

如果您的环境集群中启用了公共端点访问权限,您还可以通过计算机(具有已添加到授权网络的外部 IP 地址)运行 Airflow CLI 命令。如需启用从机器进行访问,请将机器的外部地址添加到环境的授权网络列表中。

通过 Cloud Composer API 运行 Airflow CLI 命令

从 Cloud Composer 2.4.0 版开始,您可以通过 Cloud Composer API 运行 Airflow CLI 命令。

执行一项命令

构建 environments.executeAirflowCommand API 请求:

{
  "environment": "projects/PROJECT_ID/locations/LOCATION/environments/ENVIRONMENT_NAME",
  "command": "AIRFLOW_COMMAND",
  "subcommand": "AIRFLOW_SUBCOMMAND",
  "parameters": [
    "SUBCOMMAND_PARAMETER"
  ]
}

替换以下内容:

  • PROJECT_ID项目 ID
  • LOCATION:环境所在的区域。
  • ENVIRONMENT_NAME:您的环境的名称。
  • AIRFLOW_COMMAND:您要执行的 Airflow CLI 命令,例如 dags
  • AIRFLOW_SUBCOMMAND:您要执行的 Airflow CLI 命令的子命令,例如 list
  • (可选)SUBCOMMAND_PARAMETER:子命令的参数。如果要使用多个参数,请在列表中添加更多项。

示例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:executeAirflowCommand
{
  "environment": "projects/example-project/locations/us-central1/environments/example-environment",
  "command": "dags",
  "subcommand": "list",
  "parameters": [
    "-o json",
    "--verbose"
  ]
}

意见调查命令状态

通过 Cloud Composer API 执行 Airflow CLI 命令后,请发出 PollAirflowCommand 请求并检查 exitInfo 中的字段是否存在错误和状态代码,检查该命令是否成功完成。output 字段包含日志行。

如需获取命令执行状态和提取日志,请提供 ExecuteAirflowCommandRequest 返回的 executionIdpodpodNamespace 值:

示例:

// POST https://composer.googleapis.com/v1/{environment=projects/*/locations/*/environments/*}:pollAirflowCommand
{
  "executionId": "39b82312-3a19-4d21-abac-7f8f19855ce7",
  "pod": "airflow-scheduler-1327d8cd68-hblpd",
  "podNamespace": "composer-2-4-0-airflow-2-5-3-184dadaf",
  "nextLineNumber": 1
}

问题排查

没有与集群控制平面的连接

运行 gcloud composer environments runkubectl 命令时,您可能会遇到以下错误:

Get "https://<IP Address>/api?timeout=32s": dial tcp <IP Address>:443: i/o timeout"

症状:此错误消息表示您运行这些命令的计算机没有网络连接。

解决方案:遵循在专用 IP 环境中运行命令部分中提供的准则,或使用 kubectl 命令超时部分中提供的说明。