使用 Datastream 和 Dataflow 将数据流式传输到 BigQuery

在本页中,您可以找到有关使用 Datastream 和 Dataflow 将数据流式传输到 BigQuery 的最佳实践。

根据用户定义的键对副本数据集进行分区

BigQuery 中的暂存数据集会自动分区。不过,默认情况下,副本数据集不会分区,因为副本表的分区键必须根据特定业务逻辑进行定义,而不是由 Datastream 和 Dataflow 强制执行。

对于副本数据集中需要分区的每个表:

  1. 停止并排空 Dataflow 作业

  2. 使用 BigQuery 中的 SQL 编辑器,针对副本数据集中的每个表运行以下 SQL 脚本。在本例中,datastream_cdc 数据集中的 actor 表包含一个 last_update 列,我们希望将其设置为分区键。通过运行脚本,您可以使用正确的分区键重新创建表。

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. 使用 Datastream to BigQuery 模板重新创建 Dataflow 作业。

运行用户定义的函数来处理事件数据

您可以使用 Datastream to BigQuery 模板运行 JavaScript 用户定义的函数。为此,请先将包含函数的文件放置在 Cloud Storage 中的特定位置。之后,执行以下操作:

  • 在模板中使用 javascriptTextTransformGcsPath 参数指定 Cloud Storage 中包含用户定义的函数的文件的位置。
  • 使用 javascriptTextTransformFunctionName 参数指定要作为用户定义的函数调用的 JavaScript 函数的名称。

例如,您可以运行用户定义的函数,以便在 BigQuery 中保留副本数据集表中的已删除记录。此过程称为“软删除”。

为此,请创建一个函数,用于将 _metadata_deleted 列的值复制到名为 is_deleted 的新列中,然后将 _metadata_deleted 列值重置为 false。这会导致 Dataflow 作业在 BigQuery 中更新副本数据集时忽略删除事件并保留已删除的记录。

以下是此用户定义的函数的示例代码:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

设置合并频率

使用 Datastream to BigQuery 模板mergeFrequencyMinutes 参数设置合并频率。这是 BigQuery 中副本数据集中给定表之间的合并间隔时间(以分钟为单位)。在回填历史数据时,我们建议您将合并频率保持在较低水平(12 或 24 小时),以控制费用。

例如,如果您将此参数的值设置为 10 分钟,则 Dataflow 将每 10 分钟运行一次使用该模板的作业。不过,作业首次运行时会延迟 5 分钟。在本例中,如果作业在上午 9:14 运行,则首次合并将在上午 9:29 发生(合并需要 10 分钟,延迟需要 5 分钟)。第二次合并将在 9:39 上午进行,所有后续合并将以 10 分钟的间隔进行(9:49 上午、9:59 上午、10:09 上午等)。

如果您将合并频率设置为 60 分钟,则作业将在每小时的初始运行延迟 5 分钟后运行。如果作业安排在 10:00 运行,则实际上会在 10:05 运行,因为有 5 分钟的延迟。所有后续合并都将以 60 分钟的间隔进行(上午 11:05、下午 12:05、下午 1:05 等)。

无论是出于控制成本还是其他原因,您可能无法以符合业务需求的频率执行合并。您可能没有最新的数据。如需访问最新的数据,请在 BigQuery 中基于暂存区数据集和副本数据集的表创建视图,该视图会模拟合并。此视图会作为一个逻辑表(适用于暂存区数据集和副本数据集)进行创建。如果合并频率较低,并且您需要更快地访问数据,请使用视图。