运行 Apache Flink 流水线的作业。您可以在现有部署中运行作业,也可以运行按需作业。
如需为作业创建部署,请参阅创建部署。
必需的 API
如需创建和管理适用于 Apache Flink 的 BigQuery 引擎作业,您必须启用适用于 Apache Flink 的 BigQuery 引擎 API。
gcloud services enable managedflink.googleapis.com
如果您的流水线需要,您可能需要启用 Cloud Storage 等其他 API。
所需的角色和权限
本部分介绍了管理作业时所需的角色。如需详细了解适用于 Apache Flink 的 BigQuery Engine 角色,请参阅 适用于 Apache Flink 的 BigQuery Engine 预定义角色。
创建、更新和删除
如需获得创建、更新和删除作业所需的权限,请让您的管理员为您授予项目的 Managed Flink Developer (roles/managedflink.developer
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含创建、更新和删除作业所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需创建、更新和删除作业,您需要具备以下权限:
-
创建作业:
managedflink.jobs.create
-
更新作业:
managedflink.jobs.update
-
删除作业:
managedflink.jobs.delete
获取和列出
如需获得检索作业相关信息和列出作业所需的权限,请让管理员向您授予项目的 Managed Flink Viewer (roles/managedflink.viewer
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含检索作业相关信息和列出作业所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需检索作业相关信息和列出作业,您需要拥有以下权限:
-
获取作业的详细信息:
managedflink.jobs.get
-
列出作业:
managedflink.jobs.list
作业的属性
适用于 Apache Flink 的 BigQuery 引擎作业具有以下属性。
作业 ID
作业的 ID。在您创建作业时,适用于 Apache Flink 的 BigQuery 引擎会自动生成作业 ID。
作业名称
作业的可选名称。作业名称不必具有唯一性。
位置
运行作业的位置。该位置必须是受支持的 Google Cloud 区域之一。如果作业是在现有部署中创建的,则作业位置必须与部署位置一致。您无法更改作业的地理位置。如需查看可用位置的列表,请参阅 适用于 Apache Flink 的 BigQuery 引擎位置。
项目 ID
您要创建的作业的 Google Cloud 项目的 ID。 系统会在您指定的项目中创建作业。如果作业是在现有部署中创建的,则作业项目必须与部署项目一致。您无法更改作业的项目。如需了解 Google Cloud 项目 ID 和项目编号,请参阅识别项目。
部署 ID
要用于此作业的适用于 Apache Flink 的 BigQuery 引擎部署的名称。 如果您未指定现有部署,系统会创建一次性部署来运行作业。该部署仅在作业运行时存在,无法用于运行其他作业。
作业文件
创建作业时,您需要指定一个文件来定义 Apache Flink 流水线。适用于 Apache Flink 的 BigQuery 引擎会使用此文件来执行作业。
适用于 Apache Flink 的 BigQuery 引擎支持 JAR、Python 和 SQL 文件。
如需详细了解这些作业类型,请参阅 Apache Flink 文档中的以下页面:
如需了解如何使用 Apache Flink JAR 文件,请参阅程序打包和分布式执行。
如需了解 PyFlink(Apache Flink 的 Python API),请参阅 Python API。
如需了解如何使用 Apache Flink SQL,请参阅 SQL。
Python 虚拟环境
对于 Python 作业,您必须提供一个归档文件,用于为作业打包 Python 虚拟环境。按如下方式创建归档文件:
验证您是否已安装 Python 3.11 版。适用于 Apache Flink 的 BigQuery 引擎需要版本 3.11。
python3 --version
创建 Python 虚拟环境。
python -m venv pyflink_venv source pyflink_venv/bin/activate
安装
apache-flink
Python 库以及作业所需的任何其他依赖项。pip install "apache-flink==1.19.0" venv-pack # Install other dependencies that your job needs # pip install ...
使用
venv-pack
工具打包环境。venv-pack -o pyflink_venv.tar.gz
使用
gcloud storage cp
命令将归档文件上传到 Cloud Storage。gcloud storage cp pyflink_venv.tar.gz gs://BUCKET_NAME/pyflink_venv.tar.gz
运行作业时,请在
gcloud alpha managed-flink jobs create
命令的--python-venv
参数中指定 Cloud Storage 位置。--python-venv=gs://BUCKET_NAME/pyflink_venv.tar.gz \
创建按需作业
请按照以下步骤创建按需作业,按需作业不会与现有部署相关联。
如需使用 gcloud CLI 创建按需作业,请使用 gcloud alpha managed-flink jobs create
命令。
gcloud alpha managed-flink jobs create FILE \
--location=REGION \
--project=PROJECT_ID \
--name=JOB_NAME \
--staging-location=STAGING_LOCATION \
--min-parallelism=MINIMUM_SLOTS \
--max-parallelism=MAXIMUM_SLOTS \
-- JOB_ARGUMENTS
替换以下内容:
FILE
:作业文件的绝对路径。对于 JAR 文件,您还可以指定存储在 Artifact Registry 中的工件的路径。如需了解详情,请参阅使用 Artifact Registry。REGION
:适用于 Apache Flink 的 BigQuery 引擎区域,例如us-central1
PROJECT_ID
:适用于 Apache Flink 的 BigQuery 引擎项目 IDJOB_NAME
:作业的名称STAGING_LOCATION
:用于暂存作业工件的 Cloud Storage 位置MAXIMUM_SLOTS
:作业可用的任务槽数上限MINIMUM_SLOTS
:作业可用的任务槽数下限JOB_ARGUMENTS
:要传递给 Apache Flink 作业的作业参数列表
根据您的作业,您可能需要指定以下其他参数:
--class
:对于 Java,指定 Apache Flink 作业的main
类。如果 JAR 文件清单不包含main
类,则此参数是必需的。--jars
:为作业指定其他 JAR 文件。--python-venv
:对于 Python,指定作业的已归档虚拟环境的 Cloud Storage 位置。Python 作业需要此参数。如需了解详情,请参阅 Python 虚拟环境。
如需使用服务账号模拟,请参阅使用服务账号模拟。
首次在项目或子网中创建部署或按需作业时,创建过程可能需要 30 分钟或更长时间才能完成。之后,创建新的部署或作业所需的时间会缩短。
在现有部署中创建作业
请按照以下步骤在现有部署中创建作业。如需创建部署,请参阅创建和管理部署。
如需使用 gcloud CLI 创建作业,请使用 gcloud alpha managed-flink jobs create
命令。
gcloud alpha managed-flink jobs create FILE \
--location=REGION \
--project=PROJECT_ID \
--deployment=DEPLOYMENT_ID \
--name=JOB_NAME \
--staging-location=STAGING_LOCATION \
--min-parallelism=MINIMUM_SLOTS \
--max-parallelism=MAXIMUM_SLOTS \
-- JOB_ARGUMENTS
替换以下内容:
FILE
:作业文件的绝对路径。对于 JAR 文件,您还可以指定存储在 Artifact Registry 中的工件的路径。如需了解详情,请参阅使用 Artifact Registry。REGION
:适用于 Apache Flink 的 BigQuery 引擎区域,例如us-central1
PROJECT_ID
:适用于 Apache Flink 的 BigQuery 引擎项目 IDDEPLOYMENT_ID
:适用于 Apache Flink 的 BigQuery 引擎部署的名称。JOB_NAME
:作业的名称STAGING_LOCATION
:用于暂存作业工件的 Cloud Storage 位置MAXIMUM_SLOTS
:作业可用的任务槽数上限MINIMUM_SLOTS
:作业可用的任务槽数下限JOB_ARGUMENTS
:要传递给 Apache Flink 作业的作业参数列表
根据您的作业,您可能需要指定以下其他参数:
--class
:对于 Java,指定 Apache Flink 作业的main
类。如果 JAR 文件清单不包含main
类,则此参数是必需的。--jars
:为作业指定其他 JAR 文件。--python-venv
:对于 Python,指定作业的已归档虚拟环境的 Cloud Storage 位置。Python 作业需要此参数。如需了解详情,请参阅 Python 虚拟环境。
如需使用服务账号模拟,请参阅使用服务账号模拟。
使用 Artifact Registry
对于 Java 作业,您可以使用 Artifact Registry 存储和管理作业的 JAR 文件。使用 Artifact Registry 后,您无需在本地计算机上拥有 JAR 文件即可运行作业,并且 CI/CD 流水线无需构建或下载 JAR 即可提交作业。您需要具有 Artifact Registry Reader Identity and Access Management 角色才能提交作业。
如需运行存储在 Artifact Registry 中的 JAR 文件作业,请在 gcloud alpha managed-flink jobs create
命令中将 Artifact Registry 路径指定为作业文件。请使用以下格式为 Artifact Registry 路径:
ar://PROJECT_ID/LOCATION/REPOSITORY/FILE_PATH
替换以下内容:
PROJECT_ID
:Artifact Registry 的项目 IDREGION
:代码库所在的区域REPOSITORY
:代码库的名称ARTIFACT
:工件名称FILE_PATH
:JAR 文件的路径;如需了解详情,请参阅列出文件
示例:
gcloud alpha managed-flink jobs create \
ar://my-project/us-central1/my-repo/com/example/word-count/1.0/word-count-1.0-20241021.203909-1.jar
...
如需详细了解如何使用 Artifact Registry 管理 Java 软件包,请参阅管理 Java 软件包。
更新作业
您可以修改作业的自动调整设置。如需了解详情,请参阅适用于 Apache Flink 的 BigQuery Engine 自动扩缩。
获取作业的详细信息
控制台
如需在 Google Cloud 控制台中获取作业相关信息,请按以下步骤操作:
在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine”作业页面。
作业页面会显示作业的详细信息,包括作业状态。
如需打开作业详情页面,请点击作业的名称。在作业详情页面上,您可以查看作业图和作业指标。
gcloud
如需使用 gcloud CLI 检索作业相关信息,请使用 gcloud alpha managed-flink jobs describe
命令。此命令会检索初始作业实现和作业的状态。
gcloud alpha managed-flink jobs describe \
JOB_ID \
--project=PROJECT_ID \
--location=REGION
替换以下内容:
JOB_ID
:适用于 Apache Flink 的 BigQuery 引擎作业的 IDPROJECT_ID
:适用于 Apache Flink 的 BigQuery 引擎项目 IDREGION
:适用于 Apache Flink 的 BigQuery 引擎作业的所在区域
列出作业
控制台
如需查看作业列表,请在 Google Cloud 控制台中,前往“适用于 Apache Flink 的 BigQuery Engine”作业页面。
gcloud
如需使用 gcloud CLI 列出项目中的作业,请使用 gcloud alpha managed-flink jobs list
命令。此命令会列出指定区域和项目中的所有作业。
gcloud alpha managed-flink jobs list \
REGION \
--project=PROJECT_ID
替换以下内容:
REGION
:适用于 Apache Flink 的 BigQuery 引擎作业所在的区域PROJECT_ID
:适用于 Apache Flink 的 BigQuery 引擎项目 ID
删除作业
gcloud
如需使用 gcloud CLI 删除作业,请使用 gcloud alpha managed-flink jobs delete
命令。
gcloud alpha managed-flink jobs delete \
JOB_ID \
--project=PROJECT_ID \
--location=REGION
替换以下内容:
JOB_ID
:适用于 Apache Flink 的 BigQuery 引擎作业的 IDPROJECT_ID
:适用于 Apache Flink 的 BigQuery 引擎项目 IDREGION
:适用于 Apache Flink 的 BigQuery 引擎作业的所在区域
限制
- 您的 Apache Flink 流水线必须与 Apache Flink 1.19 兼容。
- Python 流水线必须使用 Python 3.11 版。