为了提升数据流水线的性能,您可以将一些转换操作推送到 BigQuery 而不是 Apache Spark。转换下推是一种设置,可让 Cloud Data Fusion 数据流水线中的操作作为执行引擎推送到 BigQuery。因此,操作及其数据将转移到 BigQuery 并在 BigQuery 中执行。
转换下推可提升具有多个复杂 JOIN
操作或其他受支持的转换的流水线的性能。在 BigQuery 中执行一些转换可能比在 Spark 中执行这些转换更快。
不支持的转换和所有预览转换都在 Spark 中执行。
支持的转换
Cloud Data Fusion 6.5.0 及更高版本提供转换下推功能,但以下某些转换仅适用于更高版本。
JOIN
次操作
转换下推适用于 Cloud Data Fusion 6.5.0 及更高版本中的
JOIN
操作。支持基本(on-keys)和高级
JOIN
操作。联接必须正好有两个输入阶段,才能在 BigQuery 中执行。
配置为将一个或多个输入加载到内存中的联接将在 Spark(而不是 BigQuery)中执行,但以下情况除外:
- 联接的任何输入已下推。
- 您将联接配置为在 SQL Engine 中执行(请参阅强制执行阶段选项)。
BigQuery 接收器
转换下推适用于 Cloud Data Fusion 6.7.0 及更高版本中的 BigQuery 接收器。
当 BigQuery 接收器遵循已在 BigQuery 中执行的阶段时,将记录写入 BigQuery 的操作直接在 BigQuery 中执行。
如需提高此接收器的性能,您需要以下各项:
- 该服务帐号必须有权在 BigQuery 接收器使用的数据集中创建和更新表。
- 用于转换下推的数据集和 BigQuery 接收器必须存储在同一位置。
- 操作必须为以下项之一:
Insert
(不支持Truncate Table
选项)Update
Upsert
GROUP BY
个汇总
转换下推适用于 Cloud Data Fusion 6.7.0 及更高版本中的 GROUP BY
聚合。
BigQuery 中的 GROUP BY
聚合可用于以下操作:
Avg
Collect List
(从输出数组中移除 null 值)Collect Set
(从输出数组中移除 null 值)Concat
Concat Distinct
Count
Count Distinct
Count Nulls
Logical And
Logical Or
Max
Min
Standard Deviation
Sum
Sum of Squares
Corrected Sum of Squares
Variance
Shortest String
Longest String
在以下情况下,会在 BigQuery 中执行 GROUP BY
聚合:
- 它位于已下推的阶段。
- 您已将其配置为在 SQL Engine 中执行(请参阅强制执行阶段选项)。
删除重复的汇总数据
转换下推可用于在 Cloud Data Fusion 6.7.0 及更高版本中执行以下操作来删除重复汇总:
- 未指定过滤器操作
ANY
(所需字段的非 null 值)MIN
(指定字段的最小值)MAX
(指定字段的最大值)
不支持以下操作:
FIRST
LAST
在以下情况下,去重汇总会在 SQL 引擎中执行:
- 它位于已下推的阶段。
- 您已将其配置为在 SQL Engine 中执行(请参阅强制执行阶段选项)。
BigQuery 来源下推
BigQuery 来源下推在 Cloud Data Fusion 6.8.0 及更高版本中提供。
当 BigQuery 来源遵循与 BigQuery 下推兼容的阶段时,流水线可以执行 BigQuery 中所有兼容的阶段。
Cloud Data Fusion 会复制在 BigQuery 中执行流水线所需的记录。
当您使用 BigQuery 源下推时,系统会保留表分区和聚类属性,这样您就可以使用这些属性来优化进一步操作,例如联接。
其他要求
如需使用 BigQuery 来源下推功能,必须满足以下要求:
为 BigQuery 转换下推配置的服务帐号必须具有读取 BigQuery 来源数据集中的表的权限。
BigQuery 来源中使用的数据集和针对转换下推配置的数据集必须存储在同一位置。
窗口汇总
转换下推适用于 Cloud Data Fusion 6.9 及更高版本中的窗口聚合。BigQuery 中支持以下操作的时间窗口聚合:
Rank
Dense Rank
Percent Rank
N tile
Row Number
Median
Continuous Percentile
Lead
Lag
First
Last
Cumulative distribution
Accumulate
在以下情况下,窗口聚合在 BigQuery 中执行:
- 它位于已下推的阶段。
- 您已将其配置为在 SQL Engine 中执行(请参阅强制下推的阶段选项)。
Wrangler 过滤器下推
Cloud Data Fusion 6.9 版及更高版本支持 Wrangler 过滤器下推功能。
使用 Wrangler 插件时,您可以推送过滤条件(称为 Precondition
操作),以便在 BigQuery(而不是 Spark)中执行。
只有使用前提条件的 SQL 模式才支持过滤器下推,该版本也在 6.9 版中发布了。在此模式下,插件可接受 ANSI 标准 SQL 的前提条件表达式。
如果使用 SQL 模式作为前提条件,系统会为 Wrangler 插件停用指令和用户定义的指令,因为 SQL 模式不支持前提条件。
启用转换下推后,具有多个输入的 Wrangler 插件不支持前提条件的 SQL 模式。如果与多个输入配合使用,则此带有 SQL 过滤条件的 Wrangler 阶段将在 Spark 中执行。
在以下情况下,系统会在 BigQuery 中执行过滤条件:
- 它位于已下推的阶段。
- 您已将其配置为在 SQL Engine 中执行(请参阅强制下推的阶段选项)。
指标
如需详细了解 Cloud Data Fusion 针对在 BigQuery 中执行的流水线部分提供的指标,请参阅 BigQuery 下推流水线指标。
何时使用转换下推
在 BigQuery 中执行转换涉及以下操作:
- 针对流水线中受支持的阶段将记录写入 BigQuery。
- 在 BigQuery 中执行受支持的阶段。
- 在执行支持的转换后从 BigQuery 读取记录,除非后续有 BigQuery 接收器。
根据数据集的大小,可能存在相当大的网络开销,这可能会对启用“转换下推”功能时的整体流水线执行时间产生负面影响。
由于网络开销,我们建议在以下情况下使用转换下推:
- 系统会按顺序执行多个受支持的操作(各阶段之间无需步骤)。
- 通过 BigQuery 执行转换(相对于 Spark)获得的性能提升效果,超过了将数据移入或移出 BigQuery 的延迟时间。
运作方式
当您运行使用转换下推的流水线时,Cloud Data Fusion 会在 BigQuery 中执行受支持的转换阶段。流水线中的所有其他阶段都在 Spark 中执行。
执行转换时:
Cloud Data Fusion 将记录写入 Cloud Storage,然后执行 BigQuery 加载作业,以将输入数据集加载到 BigQuery 中。
然后,使用 SQL 语句将
JOIN
操作和受支持的转换作为 BigQuery 作业执行。如果在执行作业后需要进一步处理,则可以将记录从 BigQuery 导出到 Spark。但是,如果启用了尝试直接复制到 BigQuery 接收器选项,并且 BigQuery 接收器遵循在 BigQuery 中执行的阶段,则系统会将记录直接写入目标 BigQuery 接收器表。
下图显示了“转换下推”如何在 BigQuery(而非 Spark)中执行受支持的转换。
最佳实践
调整集群和执行程序大小
如需优化流水线中的资源管理,请执行以下操作:
为工作负载使用适当数量的集群工作器(节点)。换句话说,您可以通过完全使用实例的可用 CPU 和内存来充分利用预配的 Dataproc 集群,同时从处理大型作业的 BigQuery 执行速度中受益。
使用自动扩缩集群来提高流水线的并行性。
在流水线执行期间从 BigQuery 推送或拉取记录的各个阶段调整资源配置。
建议:尝试增加执行程序资源的 CPU 核心数量(最多可增加工作器节点使用的 CPU 核心数量)。执行器会在数据进出 BigQuery 时优化序列化和反序列化步骤中的 CPU 使用。如需了解详情,请参阅集群大小调整。
在 BigQuery 中执行转换的一个好处是流水线可以在较小的 Dataproc 集群上运行。如果联接是流水线中资源最密集的操作,您可以尝试使用较小的集群大小,因为繁重的 JOIN
操作现在是在 BigQuery 中执行的,这样可以降低总体计算费用。
使用 BigQuery Storage Read API 更快地检索数据
BigQuery 执行转换后,您的流水线可能会在 Spark 中执行额外的阶段。在 Cloud Data Fusion 6.7.0 及更高版本中,转换下推支持 BigQuery Storage Read API,该 API 缩短了延迟时间并加快了对 Spark 的读取操作。可以缩短流水线的整体执行时间。
该 API 会并行读取记录,因此我们建议您相应地调整执行器的大小。如果在 BigQuery 中执行资源密集型操作,请减少执行程序的内存分配,以提高流水线运行时的并行性(请参阅调整集群和执行程序大小)。
BigQuery Storage Read API 默认处于停用状态。您可以在安装了 Scala 2.12 的执行环境(包括 Dataproc 2.0 和 Dataproc 1.5)中启用该功能。
考虑数据集大小
在 JOIN
操作中考虑数据集的大小。对于生成大量输出记录的 JOIN
操作(例如类似于跨 JOIN
操作的内容),生成的数据集大小可能比输入数据集大几个数量级。此外,请考虑在整体流水线性能背景下,发生对这些记录进行额外的 Spark 处理(例如转换或接收器)时,将这些记录拉取回 Spark 的开销。
减少数据偏差
针对严重偏差的数据执行 JOIN
操作可能会导致 BigQuery 作业超出资源利用率限制,从而导致 JOIN
操作失败。为防止出现这种情况,请转到“连接符”插件设置,然后在 Skewed Input Stage 字段中确定倾斜输入。这样,Cloud Data Fusion 能够以可降低 BigQuery 语句超出限制的风险的方式排列输入。