在 BigQuery 中执行转换

本页面介绍了如何在 Cloud Data Fusion 中执行向 BigQuery 而不是 Spark 的转换。

如需了解详情,请参阅转换下推概览

准备工作

转换下推在 6.5.0 及更高版本中提供。如果您的流水线在较早的环境中运行,则可以将实例升级到最新版本。

为流水线启用转换下推

控制台

如需在已部署的流水线中启用转换下推,请执行以下操作:

  1. 转到您的实例:

    1. 在 Google Cloud 控制台中,转到 Cloud Data Fusion 页面。

    2. 如需在 Cloud Data Fusion 网页界面中打开实例,请点击实例,然后点击查看实例

      转到实例

  2. 依次点击 菜单 > 列表

    系统会打开已部署的流水线标签页。

  3. 点击所需的已部署流水线,在 Pipeline Studio 中将其打开。

  4. 依次点击配置 > 转换下推

    启用转换下推。

  5. 点击启用转换下推

  6. 数据集字段中,输入 BigQuery 数据集名称。

    可选:若要使用宏,请点击 M。如需了解详情,请参阅数据集

  7. 可选:根据需要配置选项。

  8. 点击保存

可选配置

.
属性 支持宏 支持的 Cloud Data Fusion 版本 说明
使用连接 6.7.0 及更高版本 是否使用现有连接。
Connection 6.7.0 及更高版本 连接的名称。此连接提供项目和服务帐号信息。
可选:使用函数 ${conn(connection_name)} 宏。
数据集项目 ID 6.5.0 如果数据集与 BigQuery 作业运行所在的项目不同,请输入数据集的项目 ID。如果未指定任何值,则默认使用运行作业的项目 ID。
项目 ID 6.5.0 Google Cloud 项目 ID。
服务账号类型 6.5.0 从下列选项中选择一项:
  • 文件路径:服务帐号的文件路径。
  • JSON:服务帐号的 JSON 内容。
默认值为 JSON
服务账号文件路径 6.5.0 本地文件系统上用于授权的服务帐号密钥的路径。在 Dataproc 集群中运行时,此参数设置为 auto-detect。在其他集群上运行时,该文件必须存在于集群中的每个节点上。默认值为 auto-detect
服务账号 JSON 6.5.0 服务帐号 JSON 文件的内容。
临时存储分区名称 6.5.0 存储临时数据的 Cloud Storage 存储桶。 如果该映像不存在,系统会自动创建该映像,但不会将其自动删除。Cloud Storage 数据在加载到 BigQuery 后会被删除。如果未提供此值,系统会创建一个唯一的存储桶,然后在流水线运行完成后将其删除。该服务帐号必须有权在配置的项目中创建存储分区。
位置 6.5.0 创建 BigQuery 数据集的位置。 如果数据集或临时存储桶已存在,则系统会忽略此值。默认值为 US 多区域。
加密密钥名称 6.5.1/0.18.1 客户管理的加密密钥 (CMEK),用于加密写入插件创建的任何存储桶、数据集或表的数据。如果存储桶、数据集或表已存在,则系统会忽略此值。
完成后保留 BigQuery 表 6.5.0 是否保留在流水线运行期间创建的所有 BigQuery 临时表,以进行调试和验证。默认值为 No
临时表 TTL(以小时为单位) 6.5.0 为 BigQuery 临时表设置表 TTL(以小时为单位)。在流水线被取消且清理过程中断时(例如,如果执行集群突然关闭),这可以用作故障安全。将此值设置为 0 会停用表 TTL。默认值为 72(3 天)。
作业优先级 6.5.0 用于执行 BigQuery 作业的优先级。选择以下选项之一:
  1. 批量:批量作业一旦有空闲资源可用就会加入队列并启动,这通常在几分钟内。如果作业未在三小时内启动,其优先级将切换为交互式。
  2. 交互式:交互式作业会尽快执行,并计入并发速率限制和每日速率限制。
默认值为 Batch
强制下推的阶段 6.7.0 始终在 BigQuery 中执行的支持的阶段。 每个阶段名称必须独占一行。
需要跳过下推式广告素材的阶段 6.7.0 支持永不在 BigQuery 中执行的阶段。每个阶段名称必须独占一行。
使用 BigQuery Storage Read API 6.7.0 在流水线执行期间从 BigQuery 提取记录时,是否使用 BigQuery Storage Read API。此选项可以提高转换下推的性能,但会产生额外费用。这需要在执行环境中安装 Scala 2.12。

在日志中监控性能变化

流水线运行时日志包含显示 BigQuery 中运行的 SQL 查询的消息。您可以监控流水线中的哪些阶段会被推送到 BigQuery。

以下示例展示了流水线开始执行时的日志条目。日志表明流水线中的 JOIN 操作已下推 BigQuery 以供执行:

  INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'Users' can be executed on BigQuery: true
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'UserProfile'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'UserDetails'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'Users'
  INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'UserPurchases' can be executed on BigQuery: true
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'Purchases'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'UserPurchases'
  INFO  [Driver:i.c.p.g.b.s.BigQuerySQLEngine@190] - Validating join for stage 'MostPopularNames' can be executed on BigQuery: true
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@131] - Starting push for dataset 'FirstNameCounts'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@292] - Starting join for dataset 'MostPopularNames'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@193] - Starting pull for dataset 'MostPopularNames'

以下示例展示了将为下推执行过程中涉及的每个数据集分配的表名称:

  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset Purchases stored in table <TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset UserDetails stored in table <TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset FirstNameCounts stored in table <TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@145] - Executing Push operation for dataset UserProfile stored in table <TABLE_ID>

随着执行继续,日志会显示推送阶段的完成,并最终执行 JOIN 操作。例如:

  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@133] - Completed push for dataset 'UserProfile'
  DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@133] - Completed push for dataset 'UserDetails'
  DEBUG [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@235] - Executing join operation for dataset Users
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQueryJoinDataset@118] - Creating table `<TABLE_ID>` using job: <JOB_ID> with SQL statement: SELECT `UserDetails`.id AS `id` , `UserDetails`.first_name AS `first_name` , `UserDetails`.last_name AS `last_name` , `UserDetails`.email AS `email` , `UserProfile`.phone AS `phone` , `UserProfile`.profession AS `profession` , `UserProfile`.age AS `age` , `UserProfile`.address AS `address` , `UserProfile`.score AS `score` FROM `your_project.your_dataset.<DATASET_ID>` AS `UserProfile` LEFT JOIN `your_project.your_dataset.<DATASET_ID>` AS `UserDetails` ON `UserProfile`.id = `UserDetails`.id
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQueryJoinDataset@151] - Created BigQuery table `<TABLE_ID>
  INFO  [batch-sql-engine-adapter:i.c.p.g.b.s.BigQuerySQLEngine@245] - Executed join operation for dataset Users

所有阶段均完成后,系统会显示一条消息,提示 Pull 操作已完成。这表示 BigQuery 导出过程已触发,并且此导出作业开始后将开始将记录读入流水线。例如:

DEBUG [batch-sql-engine-adapter:i.c.c.e.s.b.BatchSQLEngineAdapter@196] - Completed pull for dataset 'MostPopularNames'

如果流水线执行遇到错误,日志中会记录这些错误。

如需详细了解 BigQuery JOIN 操作的执行情况(例如资源利用率、执行时间和错误原因),您可以使用作业日志中显示的作业 ID 查看 BigQuery 作业数据。

查看流水线指标

如需详细了解 Cloud Data Fusion 针对在 BigQuery 中执行的流水线部分提供的指标,请参阅 BigQuery 下推流水线指标

后续步骤