BigQuery Data Transfer Service 运行通知

本页面简要介绍了 BigQuery Data Transfer Service 的运行通知。

您可以为 BigQuery Data Transfer Service 配置两种类型的运行通知:

  • Pub/Sub 通知:转移作业运行成功或失败时发送的机器可读通知
  • 电子邮件通知:转移作业运行失败时发送的人类可读通知

您可以单独配置每种类型,也可以同时使用 Pub/Sub 和电子邮件两种运行通知。

Pub/Sub 通知

Pub/Sub 通知功能会将转移作业运行的相关信息发送到 Pub/Sub 主题。当已完成的转移作业运行处于以下状态时,系统将触发 Pub/Sub 通知:

  • SUCCEEDED
  • FAILED
  • CANCELLED

您可以向您具有足够权限的任何项目中的任何 Pub/Sub 主题发送通知。Pub/Sub 主题收到通知生成的消息后,系统可以将这些消息发送给该主题的任意数量的订阅者。

准备工作

在配置 Pub/Sub 转移作业运行通知之前,您应该完成以下准备工作:

  1. 为将接收通知的项目启用 Pub/Sub API。

    启用该 API

  2. 对接收通知的项目拥有足够的权限:

    • 如果您拥有接收通知的项目,则您很可能具备所需的权限。

    • 如果您打算创建主题来接收通知,应具备 pubsub.topics.create 权限。

    • 无论您是打算使用新主题还是现有主题,都应拥有 pubsub.topics.getIamPolicypubsub.topics.setIamPolicy 权限。如果您创建了一个主题,则通常已经具备该主题的权限。以下预定义 IAM 角色同时具有 pubsub.topics.getIamPolicypubsub.topics.setIamPolicy 权限:pubsub.admin。如需了解详情,请参阅 Pub/Sub 访问权限控制

  3. 存在要向其发送通知的 Pub/Sub 主题

通知格式

发送到 Pub/Sub 主题的通知由以下两部分组成:

  • 特性:一组键值对,用于描述事件。
  • 载荷:包含已更改对象的元数据的字符串。

属性

特性是指 BigQuery Data Transfer Service 发送到 Pub/Sub 主题的所有通知中包含的键值对。无论通知的载荷如何,通知始终包含下列键值对:

特性名称 示例 说明
eventType TRANSFER_RUN_FINISHED 刚刚发生的事件的类型。TRANSFER_RUN_FINISHED 是唯一的可能值。
payloadFormat JSON_API_V1 对象载荷的格式。JSON_API_V1 是唯一的可能值。

载荷

负载是一个字符串,其中包含传输运行的元数据。目前无法配置负载类型,提供此类型的目的在于适应未来的 API 版本变更。

载荷类型 说明
JSON_API_V1 载荷将是一个采用 UTF-8 编码的 JSON 序列化字符串,其中包含 TransferRun 的资源表示形式

电子邮件通知

转移作业运行失败时,电子邮件通知会发送人类可读的电子邮件。这些邮件会发送给转移管理员(即设置转移作业的账号)的电子邮件地址。您无法配置邮件内容,也无法配置邮件收件人。

如果您使用服务账号对转移作业配置进行身份验证,则可能无权访问电子邮件地址来接收转移作业运行电子邮件通知。在这种情况下,我们建议您设置 Pub/Sub 通知以接收转移作业运行通知。

如需向更多用户发送转移作业运行电子邮件通知,请设置电子邮件转发规则以分发邮件。如果您使用的是 Gmail,则可以自动将 Gmail 邮件转发到另一个账号

电子邮件通知由 BigQuery Data Transfer Service 发送,其中包含有关转移作业配置、转移作业运行以及与失败运行相关的运行历史记录链接的详细信息。例如:

From: bigquery-data-transfer-service-noreply@google.com
To: TRANSFER_ADMIN
Title: BigQuery Data Transfer Service — Transfer Run Failure —
DISPLAY_NAME

Transfer Configuration
Display Name: DISPLAY_NAME
Source: DATA_SOURCE
Destination: PROJECT_ID

Run Summary
Run: RUN_NAME
Schedule Time: SCHEDULE_TIME
Run Time: RUN_TIME
View Run History


Google LLC 1600 Amphitheatre Parkway, Mountain View, CA 94043

This email was sent because you indicated you are willing to receive Run
Notifications from the BigQuery Data Transfer Service. If you do not wish to
receive such emails in the future, click View Transfer Configuration and
un-check the "Send E-mail Notifications" option.

开启或修改通知

如需开启通知或修改现有通知,请选择以下选项之一:

控制台

  1. 前往 Google Cloud 控制台中的 BigQuery 页面。

    转到 BigQuery 页面

  2. 点击导航菜单中的数据传输

  3. 如需为新转移作业开启通知,请点击 创建转移作业。如需调整现有转移作业的通知,请点击转移作业的名称,然后点击修改

  4. 通知选项部分,点击要启用的通知类型旁边的切换开关。

    • 电子邮件通知:启用此选项后,转移作业管理员会在转移作业运行失败时收到电子邮件通知。
    • Pub/Sub 通知:启用此选项时,请选择您的主题名称或点击创建主题。此选项用于为您的转移作业配置 Pub/Sub 运行通知

Java

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Java 设置说明进行操作。 如需了解详情,请参阅 BigQuery Java API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.bigquery.datatransfer.v1.CreateTransferConfigRequest;
import com.google.cloud.bigquery.datatransfer.v1.DataTransferServiceClient;
import com.google.cloud.bigquery.datatransfer.v1.ProjectName;
import com.google.cloud.bigquery.datatransfer.v1.TransferConfig;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

// Sample to get run notification
public class RunNotification {

  public static void main(String[] args) throws IOException {
    // TODO(developer): Replace these variables before running the sample.
    final String projectId = "MY_PROJECT_ID";
    final String datasetId = "MY_DATASET_ID";
    final String pubsubTopicName = "MY_TOPIC_NAME";
    final String query =
        "SELECT CURRENT_TIMESTAMP() as current_time, @run_time as intended_run_time, "
            + "@run_date as intended_run_date, 17 as some_integer";
    Map<String, Value> params = new HashMap<>();
    params.put("query", Value.newBuilder().setStringValue(query).build());
    params.put(
        "destination_table_name_template",
        Value.newBuilder().setStringValue("my_destination_table_{run_date}").build());
    params.put("write_disposition", Value.newBuilder().setStringValue("WRITE_TRUNCATE").build());
    params.put("partitioning_field", Value.newBuilder().build());
    TransferConfig transferConfig =
        TransferConfig.newBuilder()
            .setDestinationDatasetId(datasetId)
            .setDisplayName("Your Scheduled Query Name")
            .setDataSourceId("scheduled_query")
            .setParams(Struct.newBuilder().putAllFields(params).build())
            .setSchedule("every 24 hours")
            .setNotificationPubsubTopic(pubsubTopicName)
            .build();
    runNotification(projectId, transferConfig);
  }

  public static void runNotification(String projectId, TransferConfig transferConfig)
      throws IOException {
    try (DataTransferServiceClient dataTransferServiceClient = DataTransferServiceClient.create()) {
      ProjectName parent = ProjectName.of(projectId);
      CreateTransferConfigRequest request =
          CreateTransferConfigRequest.newBuilder()
              .setParent(parent.toString())
              .setTransferConfig(transferConfig)
              .build();
      TransferConfig config = dataTransferServiceClient.createTransferConfig(request);
      System.out.println(
          "\nScheduled query with run notification created successfully :" + config.getName());
    } catch (ApiException ex) {
      System.out.print("\nScheduled query with run notification was not created." + ex.toString());
    }
  }
}

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为客户端库设置身份验证

transfer_config_name = "projects/1234/locations/us/transferConfigs/abcd"
pubsub_topic = "projects/PROJECT-ID/topics/TOPIC-ID"
from google.cloud import bigquery_datatransfer
from google.protobuf import field_mask_pb2

transfer_client = bigquery_datatransfer.DataTransferServiceClient()

transfer_config = bigquery_datatransfer.TransferConfig(name=transfer_config_name)
transfer_config.notification_pubsub_topic = pubsub_topic
update_mask = field_mask_pb2.FieldMask(paths=["notification_pubsub_topic"])

transfer_config = transfer_client.update_transfer_config(
    {"transfer_config": transfer_config, "update_mask": update_mask}
)

print(f"Updated config: '{transfer_config.name}'")
print(f"Notification Pub/Sub topic: '{transfer_config.notification_pubsub_topic}'")

运行通知价格

如果您配置了 Pub/Sub 运行通知,则需要支付 Pub/Sub 费用。如需了解详情,请参阅 Pub/Sub 价格页面。

后续步骤