使用 Datastream 和 Dataflow 将数据流式传输到 BigQuery

在此页面中,您将找到有关如何使用 Datastream 和 Dataflow 将数据流式传输到 BigQuery 的最佳实践。

根据用户定义的键对复制的数据集进行分区

BigQuery 中的临时数据集会自动分区。不过,默认情况下,副本数据集不会进行分区,因为副本表的分区键必须根据特定的业务逻辑来定义,而不是由 Datastream 和 Dataflow 强制执行。

对于需要分区的副本数据集中的每个表:

  1. 停止并排空 Dataflow 作业

  2. 使用 BigQuery 中的 SQL 编辑器,针对复制数据集中的每个表运行以下 SQL 脚本。在此示例中,datastream_cdc 数据集中的 actor 表格具有一个 last_update 列,我们希望将其设置为分区键。运行该脚本后,您将使用正确的分区键重新创建表。

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. 使用 Datastream to BigQuery 模板重新创建 Dataflow 作业。

运行用户定义的函数来处理事件数据

您可以使用 Datastream to BigQuery 模板运行 JavaScript 用户定义的函数。为此,请先将包含该函数的文件放置在 Cloud Storage 中的特定位置。之后,执行以下操作:

  • 使用模板中的 javascriptTextTransformGcsPath 参数指定 Cloud Storage 中包含用户定义函数的文件所在的位置。
  • 使用 javascriptTextTransformFunctionName 参数指定要作为用户定义的函数调用的 JavaScript 函数的名称。

例如,您可以运行用户定义的函数,以在 BigQuery 中保留副本数据集的表中的已删除记录。此过程称为软删除。

为此,请创建一个函数,将 _metadata_deleted 列的值复制到名为 is_deleted 的新列中,然后将 _metadata_deleted 列的值重置为 false。这会导致 Dataflow 作业忽略删除事件,并在更新 BigQuery 中的副本数据集时保留已删除的记录。

以下是此用户定义的函数的示例代码:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

设置合并频率

使用 Datastream to BigQuery 模板mergeFrequencyMinutes 参数设置合并频率。这是 BigQuery 中副本数据集内指定表的合并间隔时间(以分钟为单位)。在回填历史数据的过程中,我们建议您将合并频率保持在较低水平(12 或 24 小时),以控制费用。

例如,如果您将此参数的值设置为 10 分钟,则 Dataflow 将每 10 分钟运行一次使用相应模板的作业。不过,作业首次运行时会有 5 分钟的延迟。在此示例中,如果作业在上午 9:14 运行,则第一次合并将在上午 9:29 进行(10 分钟用于合并,5 分钟用于延迟)。第二次合并将在上午 9:39 进行,之后的所有合并都将以 10 分钟为间隔进行(上午 9:49、上午 9:59、上午 10:09,依此类推)。

如果您将合并频率设置为 60 分钟,则作业将在整点运行,但首次运行作业时会有 5 分钟的延迟。如果作业安排在上午 10 点运行,则实际上会因 5 分钟的延迟而在上午 10:05 运行。之后的所有合并都将以 60 分钟为间隔进行(上午 11:05、下午 12:05、下午 1:05 等)。

无论是为了控制成本还是出于其他原因,您可能无法以满足业务需求的频率执行合并。您可能没有最新的数据。如需访问最新鲜的数据,请在 BigQuery 中基于过渡数据集和副本数据集的表创建视图,其中该视图会模拟合并。此视图会创建为一个逻辑表(适用于临时数据集和副本数据集)。如果合并频率较低,并且您需要更快地访问数据,请使用视图。