创建和管理作业

运行 Apache Flink 流水线的作业。您可以在现有部署中运行作业,也可以运行按需作业。

如需为作业创建部署,请参阅创建部署

必需的 API

如需创建和管理适用于 Apache Flink 的 BigQuery 引擎作业,您必须启用适用于 Apache Flink 的 BigQuery 引擎 API。

gcloud services enable managedflink.googleapis.com

如果您的流水线需要,您可能需要启用 Cloud Storage 等其他 API。

所需的角色和权限

本部分介绍了管理作业时所需的角色。如需详细了解适用于 Apache Flink 的 BigQuery Engine 角色,请参阅 适用于 Apache Flink 的 BigQuery Engine 预定义角色

创建、更新和删除

如需获得创建、更新和删除作业所需的权限,请让您的管理员为您授予项目的 Managed Flink Developer (roles/managedflink.developer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建、更新和删除作业所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建、更新和删除作业,您需要具备以下权限:

  • 创建作业: managedflink.jobs.create
  • 更新作业: managedflink.jobs.update
  • 删除作业: managedflink.jobs.delete

您也可以使用自定义角色或其他预定义角色来获取这些权限。

获取和列出

如需获得检索作业相关信息和列出作业所需的权限,请让管理员向您授予项目的 Managed Flink Viewer (roles/managedflink.viewer) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含检索作业相关信息和列出作业所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需检索作业相关信息和列出作业,您需要拥有以下权限:

  • 获取作业的详细信息: managedflink.jobs.get
  • 列出作业: managedflink.jobs.list

您也可以使用自定义角色或其他预定义角色来获取这些权限。

作业的属性

适用于 Apache Flink 的 BigQuery 引擎作业具有以下属性。

作业 ID

作业的 ID。在您创建作业时,适用于 Apache Flink 的 BigQuery 引擎会自动生成作业 ID。

作业名称

作业的可选名称。作业名称不必具有唯一性。

位置

运行作业的位置。该位置必须是受支持的 Google Cloud 区域之一。如果作业是在现有部署中创建的,则作业位置必须与部署位置一致。您无法更改作业的地理位置。如需查看可用位置的列表,请参阅 适用于 Apache Flink 的 BigQuery 引擎位置

项目 ID

您要创建的作业的 Google Cloud 项目的 ID。 系统会在您指定的项目中创建作业。如果作业是在现有部署中创建的,则作业项目必须与部署项目一致。您无法更改作业的项目。如需了解 Google Cloud 项目 ID 和项目编号,请参阅识别项目

部署 ID

要用于此作业的适用于 Apache Flink 的 BigQuery 引擎部署的名称。 如果您未指定现有部署,系统会创建一次性部署来运行作业。该部署仅在作业运行时存在,无法用于运行其他作业。

作业文件

创建作业时,您需要指定一个文件来定义 Apache Flink 流水线。适用于 Apache Flink 的 BigQuery 引擎会使用此文件来执行作业。

适用于 Apache Flink 的 BigQuery 引擎支持 JAR、Python 和 SQL 文件。

如需详细了解这些作业类型,请参阅 Apache Flink 文档中的以下页面:

  • 如需了解如何使用 Apache Flink JAR 文件,请参阅程序打包和分布式执行

  • 如需了解 PyFlink(Apache Flink 的 Python API),请参阅 Python API

  • 如需了解如何使用 Apache Flink SQL,请参阅 SQL

Python 虚拟环境

对于 Python 作业,您必须提供一个归档文件,用于为作业打包 Python 虚拟环境。按如下方式创建归档文件:

  1. 验证您是否已安装 Python 3.11 版。适用于 Apache Flink 的 BigQuery 引擎需要版本 3.11。

    python3 --version
    
  2. 创建 Python 虚拟环境

    python -m venv pyflink_venv
    source pyflink_venv/bin/activate
    
  3. 安装 apache-flink Python 库以及作业所需的任何其他依赖项。

    pip install "apache-flink==1.19.0" venv-pack
    # Install other dependencies that your job needs
    # pip install ...
    
  4. 使用 venv-pack 工具打包环境。

    venv-pack -o pyflink_venv.tar.gz
    
  5. 使用 gcloud storage cp 命令将归档文件上传到 Cloud Storage。

    gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
    
  6. 运行作业时,请在 gcloud alpha managed-flink jobs create 命令的 --python-venv 参数中指定 Cloud Storage 位置。

    --python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
    

创建按需作业

请按照以下步骤创建按需作业,按需作业不会与现有部署相关联。

如需使用 gcloud CLI 创建按需作业,请使用 gcloud alpha managed-flink jobs create 命令。

gcloud alpha managed-flink jobs create FILE \
  --location=REGION \
  --project=PROJECT_ID \
  --name=JOB_NAME \
  --staging-location=STAGING_LOCATION \
  --min-parallelism=MINIMUM_SLOTS \
  --max-parallelism=MAXIMUM_SLOTS \
  -- JOB_ARGUMENTS

替换以下内容:

  • FILE作业文件的绝对路径。对于 JAR 文件,您还可以指定存储在 Artifact Registry 中的工件的路径。如需了解详情,请参阅使用 Artifact Registry
  • REGION:适用于 Apache Flink 的 BigQuery 引擎区域,例如 us-central1
  • PROJECT_ID:适用于 Apache Flink 的 BigQuery 引擎项目 ID
  • JOB_NAME:作业的名称
  • STAGING_LOCATION:用于暂存作业工件的 Cloud Storage 位置
  • MAXIMUM_SLOTS:作业可用的任务槽数上限
  • MINIMUM_SLOTS:作业可用的任务槽数下限
  • JOB_ARGUMENTS:要传递给 Apache Flink 作业的作业参数列表

根据您的作业,您可能需要指定以下其他参数:

  • --class:对于 Java,指定 Apache Flink 作业的 main 类。如果 JAR 文件清单不包含 main 类,则此参数是必需的。
  • --jars:为作业指定其他 JAR 文件。
  • --python-venv:对于 Python,指定作业的已归档虚拟环境的 Cloud Storage 位置。Python 作业需要此参数。如需了解详情,请参阅 Python 虚拟环境

如需使用服务账号模拟,请参阅使用服务账号模拟

首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。

在现有部署中创建作业

请按照以下步骤在现有部署中创建作业。如需创建部署,请参阅创建和管理部署

如需使用 gcloud CLI 创建作业,请使用 gcloud alpha managed-flink jobs create 命令。

gcloud alpha managed-flink jobs create FILE \
  --location=REGION \
  --project=PROJECT_ID \
  --deployment=DEPLOYMENT_ID \
  --name=JOB_NAME \
  --staging-location=STAGING_LOCATION \
  --min-parallelism=MINIMUM_SLOTS \
  --max-parallelism=MAXIMUM_SLOTS \
  -- JOB_ARGUMENTS

替换以下内容:

  • FILE作业文件的绝对路径。对于 JAR 文件,您还可以指定存储在 Artifact Registry 中的工件的路径。如需了解详情,请参阅使用 Artifact Registry
  • REGION:适用于 Apache Flink 的 BigQuery 引擎区域,例如 us-central1
  • PROJECT_ID:适用于 Apache Flink 的 BigQuery 引擎项目 ID
  • DEPLOYMENT_ID:适用于 Apache Flink 的 BigQuery 引擎部署的名称。
  • JOB_NAME:作业的名称
  • STAGING_LOCATION:用于暂存作业工件的 Cloud Storage 位置
  • MAXIMUM_SLOTS:作业可用的任务槽数上限
  • MINIMUM_SLOTS:作业可用的任务槽数下限
  • JOB_ARGUMENTS:要传递给 Apache Flink 作业的作业参数列表

根据您的作业,您可能需要指定以下其他参数:

  • --class:对于 Java,指定 Apache Flink 作业的 main 类。如果 JAR 文件清单不包含 main 类,则此参数是必需的。
  • --jars:为作业指定其他 JAR 文件。
  • --python-venv:对于 Python,指定作业的已归档虚拟环境的 Cloud Storage 位置。Python 作业需要此参数。如需了解详情,请参阅 Python 虚拟环境

如需使用服务账号模拟,请参阅使用服务账号模拟

使用 Artifact Registry

对于 Java 作业,您可以使用 Artifact Registry 存储和管理作业的 JAR 文件。使用 Artifact Registry 后,您无需在本地计算机上拥有 JAR 文件即可运行作业,并且 CI/CD 流水线无需构建或下载 JAR 即可提交作业。您需要具有 Artifact Registry Reader Identity and Access Management 角色才能提交作业。

如需运行存储在 Artifact Registry 中的 JAR 文件作业,请在 gcloud alpha managed-flink jobs create 命令中将 Artifact Registry 路径指定为作业文件。请使用以下格式为 Artifact Registry 路径:

ar://PROJECT_ID/LOCATION/REPOSITORY/FILE_PATH

替换以下内容:

  • PROJECT_ID:Artifact Registry 的项目 ID
  • REGION:代码库所在的区域
  • REPOSITORY:代码库的名称
  • ARTIFACT:工件名称
  • FILE_PATH:JAR 文件的路径;如需了解详情,请参阅列出文件

示例:

gcloud alpha managed-flink jobs create  \
  ar://my-project/us-central1/my-repo/com/example/word-count/1.0/word-count-1.0-20241021.203909-1.jar
  ...

如需详细了解如何使用 Artifact Registry 管理 Java 软件包,请参阅管理 Java 软件包

更新作业

您可以修改作业的自动调整设置。如需了解详情,请参阅适用于 Apache Flink 的 BigQuery Engine 自动扩缩

获取作业的详细信息

控制台

如需在 Google Cloud 控制台中获取作业相关信息,请按以下步骤操作:

  1. 在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine”作业页面。

    转到作业

    作业页面会显示作业的详细信息,包括作业状态。

  2. 如需打开作业详情页面,请点击作业的名称。在作业详情页面上,您可以查看作业图和作业指标。

gcloud

如需使用 gcloud CLI 检索作业相关信息,请使用 gcloud alpha managed-flink jobs describe 命令。此命令会检索初始作业实现和作业的状态。

gcloud alpha managed-flink jobs describe \
  JOB_ID \
  --project=PROJECT_ID \
  --location=REGION

替换以下内容:

  • JOB_ID:适用于 Apache Flink 的 BigQuery 引擎作业的 ID
  • PROJECT_ID:适用于 Apache Flink 的 BigQuery 引擎项目 ID
  • REGION:适用于 Apache Flink 的 BigQuery 引擎作业的所在区域

列出作业

控制台

如需查看作业列表,请在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine”作业页面。

转到作业

gcloud

如需使用 gcloud CLI 列出项目中的作业,请使用 gcloud alpha managed-flink jobs list 命令。此命令会列出指定区域和项目中的所有作业。

gcloud alpha managed-flink jobs list \
  REGION \
  --project=PROJECT_ID

替换以下内容:

  • REGION:适用于 Apache Flink 的 BigQuery 引擎作业所在的区域
  • PROJECT_ID:适用于 Apache Flink 的 BigQuery 引擎项目 ID

删除作业

gcloud

如需使用 gcloud CLI 删除作业,请使用 gcloud alpha managed-flink jobs delete 命令。

gcloud alpha managed-flink jobs delete \
  JOB_ID \
  --project=PROJECT_ID \
  --location=REGION

替换以下内容:

  • JOB_ID:适用于 Apache Flink 的 BigQuery 引擎作业的 ID
  • PROJECT_ID:适用于 Apache Flink 的 BigQuery 引擎项目 ID
  • REGION:适用于 Apache Flink 的 BigQuery 引擎作业的所在区域

限制

  • 您的 Apache Flink 流水线必须与 Apache Flink 1.19 兼容。
  • Python 流水线必须使用 Python 3.11 版。

后续步骤