在 BigQuery 中执行转换

本页面介绍了如何执行到 BigQuery 的转换 而不是 Cloud Data Fusion 中的 Spark。

如需了解详情,请参阅将转换推送到 BigQuery

准备工作

转换推送在 6.5.0 及更高版本中可用。如果您的流水线在较早的环境中运行,则可以将实例升级到最新版本。

在流水线上启用转换推送功能

控制台

要在已部署的流水线上启用转换下推,请执行以下操作: 以下:

  1. 前往您的实例:

    1. 在 Google Cloud 控制台中,转到 Cloud Data Fusion 页面。

    2. 如需在 Cloud Data Fusion Studio 中打开实例,请点击实例,然后点击查看实例

      转到实例

  2. 点击 菜单 > 列表

    此时将打开已部署的流水线标签页。

  3. 点击所需的已部署流水线,以在 Pipeline Studio 中将其打开。

  4. 依次点击配置 > 转换推送

    启用转换推送。

  5. 点击启用转换下推

  6. 数据集字段中,输入 BigQuery 数据集名称。

    可选:要使用宏,请点击 M。如需了解详情,请参阅 数据集

  7. 可选:根据需要配置选项。

  8. 点击保存

可选配置

属性 支持宏 支持的 Cloud Data Fusion 版本 说明
使用连接 6.7.0 及更高版本 是否使用现有连接。
连接 6.7.0 及更高版本 连接的名称。此连接会提供项目和服务账号信息。
可选:使用宏函数 ${conn(connection_name)}
数据集项目 ID 6.5.0 如果数据集位于与运行 BigQuery 作业的项目不同的项目中,请输入数据集的项目 ID。如果未指定任何值,则默认使用运行作业的项目 ID。
项目 ID 6.5.0 Google Cloud 项目 ID。
服务账号类型 6.5.0 从下列选项中选择一项:
  • 文件路径:服务账号的文件路径。
  • JSON:服务账号的 JSON 内容。
默认值为 JSON
服务账号文件路径 6.5.0 用于授权的服务账号密钥在本地文件系统中的路径。在 Dataproc 集群上运行时,该参数会设置为 auto-detect。在其他集群上运行时,该文件必须存在于集群中的每个节点上。默认值为 auto-detect
服务账号 JSON 6.5.0 服务账号 JSON 文件的内容。
临时存储分区名称 6.5.0 用于存储临时数据的 Cloud Storage 存储桶。 如果它不存在,它会自动创建,但不存在 自动删除Cloud Storage 数据加载到 BigQuery 后将被删除。如果未提供此值, 创建唯一的存储桶,并在流水线运行后将其删除 结束。该服务账号必须有权在配置的项目中创建存储桶。
位置 6.5.0 BigQuery 数据集的创建位置。 如果数据集或临时存储桶已存在,则系统会忽略此值。默认值为 US 多区域位置
加密密钥名称 6.5.1/0.18.1 用于对数据进行加密的客户管理的加密密钥 (CMEK) 写入任何存储桶、数据集或表。如果存储桶、数据集或表已存在,系统会忽略此值。
在完成后保留 BigQuery 表 6.5.0 是否保留所有 BigQuery 临时表 在流水线运行期间创建的用于调试和 进行验证。默认值为 No
临时表 TTL(以小时为单位) 6.5.0 为 BigQuery 临时表设置表 TTL(以小时为单位)。在流水线取消且清理过程中断时(例如,在执行集群突然关闭时),可将其用作故障安全机制。将此值设置为 0 会停用表 TTL。默认值为 72(3 天)。
作业优先级 6.5.0 用于执行 BigQuery 作业的优先级。选择 以下选项之一:
  1. 批量:批量作业会先行排队,一旦有闲置资源可用就会启动,通常在几分钟之内。如果作业未在三小时内启动,则其 优先级切换为互动式
  2. 交互式:交互式作业会尽快执行,并计入并发速率限制和每日速率限制。
默认值为批量
强制下推的阶段 6.7.0 支持始终在 BigQuery 中执行的阶段。 每个阶段名称都必须单独列为一行。
要跳过推送的阶段 6.7.0 永远不会在 BigQuery 中执行的支持的阶段。每个阶段名称都必须单独列为一行。
使用 BigQuery Storage Read API 6.7.0 提取记录时是否使用 BigQuery Storage Read API 从 BigQuery 提取。此选项可以提高转换下推的性能,但会产生额外费用。这需要在 执行环境

在日志中监控性能变化

流水线运行时日志包含显示 BigQuery 中运行的 SQL 查询的消息。您可以监控流水线中的哪些阶段会被推送到 BigQuery。

以下示例展示了流水线执行开始时的日志条目。这些日志表明流水线中的 JOIN 操作已推送到 BigQuery 以执行:

  INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'Users' can be executed on BigQuery: true
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'UserProfile'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'UserDetails'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'Users'
  INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'UserPurchases' can be executed on BigQuery: true
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'Purchases'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'UserPurchases'
  INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'MostPopularNames' can be executed on BigQuery: true
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'FirstNameCounts'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'MostPopularNames'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@193] - Starting pull for dataset 'MostPopularNames'

以下示例展示了将要为每个表分配的 下推执行涉及的数据集:

  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset Purchases stored in table <TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset UserDetails stored in table <TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset FirstNameCounts stored in table <TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset UserProfile stored in table <TABLE_ID>

随着执行过程的继续,日志会显示推送阶段的完成 JOIN 操作的执行。例如:

  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@133] - Completed push for dataset 'UserProfile'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@133] - Completed push for dataset 'UserDetails'
  DEBUG [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@235] - Executing join operation for dataset Users
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQueryJoinDataset@118] - Creating table `<TABLE_ID>` using job: <JOB_ID> with SQL statement: SELECT `UserDetails`.id AS `id` , `UserDetails`.first_name AS `first_name` , `UserDetails`.last_name AS `last_name` , `UserDetails`.email AS `email` , `UserProfile`.phone AS `phone` , `UserProfile`.profession AS `profession` , `UserProfile`.age AS `age` , `UserProfile`.address AS `address` , `UserProfile`.score AS `score` FROM `your_project.your_dataset.<DATASET_ID>` AS `UserProfile` LEFT JOIN `your_project.your_dataset.<DATASET_ID>` AS `UserDetails` ON `UserProfile`.id = `UserDetails`.id
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQueryJoinDataset@151] - Created BigQuery table `<TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@245] - Executed join operation for dataset Users

所有阶段都完成后,系统会显示一条消息,告知您 Pull 操作 已完成。这表示 BigQuery 导出过程已触发,并且在此导出作业开始后,记录将开始读取到流水线中。例如:

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@196] - Completed pull for dataset 'MostPopularNames'

如果流水线执行遇到错误,日志中会说明这些错误。

如需详细了解 BigQuery JOIN的执行, 资源利用率、执行时间和错误原因等操作, 可以使用作业 ID 查看 BigQuery 作业数据,作业 ID 显示在 和作业日志

查看流水线指标

如需详细了解 Cloud Data Fusion 提供的指标, 有关在 BigQuery 中执行的流水线的部分,请参阅 BigQuery 下推流水线指标

后续步骤