本文档介绍了如何创建用于发送 Pub/Sub 通知的批处理作业。您可以使用 Pub/Sub 在作业或任务状态发生变化或进入特定状态时接收通知。如需了解详情,请参阅使用通知监控作业。
准备工作
- 如果您之前未使用过批处理功能,请参阅开始使用批处理,并完成适用于项目和用户的前提条件,以启用批处理功能。
- 为批处理通知创建或指定 Pub/Sub 主题。
- 配置订阅以接收和使用通知。
所需的角色
-
如需获得创建和运行用于发送通知的作业所需的权限,请让您的管理员为您授予以下 IAM 角色:
-
项目的 Batch Job Editor (
roles/batch.jobsEditor
)。 -
作业的服务账号(默认是 默认的 Compute Engine 服务账号)上的 Service Account User (
roles/iam.serviceAccountUser
) 角色。 -
Pub/Sub 主题或项目的 Pub/Sub Editor (
roles/pubsub.editor
)。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
-
项目的 Batch Job Editor (
-
除非您为作业的服务账号使用默认配置,否则请确保该账号具有必要的权限。
为了确保作业的服务账号拥有发布 Pub/Sub 通知所需的权限,请让您的管理员为作业的服务账号授予您的 Pub/Sub 主题的 Pub/Sub Publisher (
roles/pubsub.publisher
) IAM 角色。 - 如果您希望作业向与作业位于不同项目中的 Pub/Sub 主题发布通知,则必须向作业所在项目的批处理服务代理授予向该主题发布通知的权限。
为了确保作业所属项目的批处理服务代理拥有向另一个项目中的 Pub/Sub 主题发布 Pub/Sub 通知所需的权限,请让您的管理员向作业所属项目的批处理服务代理授予该 Pub/Sub 主题的 Pub/Sub Publisher (
roles/pubsub.publisher
) IAM 角色。
创建并运行用于发送通知的作业
您可以通过以下步骤创建用于发送 Pub/Sub 通知的批处理作业:
gcloud
使用 Google Cloud CLI 创建作业,在 JSON 文件的正文中添加 notifications
字段和一个或多个 jobNotification
对象:
{
...
"notifications": [
{
"pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
"message": {
ATTRIBUTES
}
}
]
...
}
替换以下内容:
PROJECT_ID
:包含 Pub/Sub 主题的项目的 ID。TOPIC_ID
:您在启用 Pub/Sub 通知时创建的主题的 Pub/Sub 主题 ID。ATTRIBUTES
:指定以下一个或多个属性,您可以通过每个属性接收有关作业或其所有任务的状态的通知。如需有关所有作业状态更改的通知,请指定以下各项:
"type": "JOB_STATE_CHANGED"
如需有关特定作业状态更改的通知,请指定以下内容:
"type": "JOB_STATE_CHANGED", "newJobState": "JOB_STATE"
将
JOB_STATE
替换为以下某个作业状态:QUEUED
SCHEDULED
RUNNING
SUCCEEDED
FAILED
如需详细了解作业状态,请参阅作业生命周期。
如需有关所有任务状态更改的通知,请指定以下各项:
"type": "TASK_STATE_CHANGED"
如需有关特定任务状态更改的通知,请指定以下内容:
"type": "TASK_STATE_CHANGED", "newTaskState": "TASK_STATE"
将
TASK_STATE
替换为以下某个任务状态:PENDING
ASSIGNED
RUNNING
SUCCEEDED
FAILED
如需详细了解任务状态,请参阅作业生命周期。
例如,假设您希望收到有关所有作业状态变更以及每次任务失败的通知。为此,您可以使用类似于以下内容的 JSON 配置文件:
{
"taskGroups": [
{
"taskSpec": {
"runnables": [
{
"script": {
"text": "echo Hello World! This is task $BATCH_TASK_INDEX."
}
}
]
},
"taskCount": 3,
}
],
"logsPolicy": {
"destination": "CLOUD_LOGGING"
},
"notifications": [
{
"pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
"message": {
"type": "JOB_STATE_CHANGED"
}
},
{
"pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
"message": {
"type": "TASK_STATE_CHANGED",
"newTaskState": "FAILED"
}
}
]
}
API
使用 REST API 在 JSON 文件的主正文中创建作业,其中包含 notifications
字段和一个或多个 jobNotification
对象:
{
...
"notifications": [
{
"pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
"message": {
ATTRIBUTES
}
}
]
...
}
替换以下内容:
PROJECT_ID
:包含 Pub/Sub 主题的项目的 ID。TOPIC_ID
:您在启用 Pub/Sub 通知时创建的主题的 Pub/Sub 主题 ID。ATTRIBUTES
:指定以下一个或多个属性,您可以通过每个属性接收有关作业或其所有任务的状态的通知。如需有关所有作业状态更改的通知,请指定以下各项:
"type": "JOB_STATE_CHANGED"
如需有关特定作业状态更改的通知,请指定以下内容:
"type": "JOB_STATE_CHANGED", "newJobState": "JOB_STATE"
将
JOB_STATE
替换为以下某个作业状态:QUEUED
SCHEDULED
RUNNING
SUCCEEDED
FAILED
如需详细了解作业状态,请参阅作业生命周期。
如需有关所有任务状态更改的通知,请指定以下各项:
"type": "TASK_STATE_CHANGED"
如需有关特定任务状态更改的通知,请指定以下内容:
"type": "TASK_STATE_CHANGED", "newTaskState": "TASK_STATE"
将
TASK_STATE
替换为以下某个任务状态:PENDING
ASSIGNED
RUNNING
SUCCEEDED
FAILED
如需详细了解任务状态,请参阅作业生命周期。
例如,假设您希望收到有关所有作业状态变更以及每次任务失败的通知。为此,您可以使用类似于以下内容的 JSON 配置文件:
{
"taskGroups": [
{
"taskSpec": {
"runnables": [
{
"script": {
"text": "echo Hello World! This is task $BATCH_TASK_INDEX."
}
}
]
},
"taskCount": 3,
}
],
"logsPolicy": {
"destination": "CLOUD_LOGGING"
},
"notifications": [
{
"pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
"message": {
"type": "JOB_STATE_CHANGED"
}
},
{
"pubsubTopic": "projects/PROJECT_ID/topics/TOPIC_ID",
"message": {
"type": "TASK_STATE_CHANGED",
"newTaskState": "FAILED"
}
}
]
}
Go
Java
Node.js
Python
作业开始运行后,您就可以使用其通知了。例如,如果作业的 Pub/Sub 主题有一个订阅会将通知流式传输到 BigQuery,您可以在 BigQuery 中分析 Pub/Sub 通知。
后续步骤
- 详细了解如何使用 Pub/Sub 通知和 BigQuery 监控作业状态。
- 如果您在创建或运行作业时遇到问题,请参阅问题排查。
- 查看作业和任务。
- 不妨详细了解作业创建选项。