在此页面中,您将找到有关如何使用 Datastream 和 Dataflow 将数据流式传输到 BigQuery 的最佳实践。
根据用户定义的键对复制的数据集进行分区
BigQuery 中的临时数据集会自动分区。不过,默认情况下,副本数据集不会进行分区,因为副本表的分区键必须根据特定的业务逻辑来定义,而不是由 Datastream 和 Dataflow 强制执行。
对于需要分区的副本数据集中的每个表:
使用 BigQuery 中的 SQL 编辑器,针对复制数据集中的每个表运行以下 SQL 脚本。在此示例中,
datastream_cdc
数据集中的actor
表格具有一个last_update
列,我们希望将其设置为分区键。运行该脚本后,您将使用正确的分区键重新创建表。create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new'
partition by date(last_update) as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor' drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor' alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor' 使用 Datastream to BigQuery 模板重新创建 Dataflow 作业。
运行用户定义的函数来处理事件数据
您可以使用 Datastream to BigQuery 模板运行 JavaScript 用户定义的函数。为此,请先将包含该函数的文件放置在 Cloud Storage 中的特定位置。之后,执行以下操作:
- 使用模板中的
javascriptTextTransformGcsPath
参数指定 Cloud Storage 中包含用户定义函数的文件所在的位置。 - 使用
javascriptTextTransformFunctionName
参数指定要作为用户定义的函数调用的 JavaScript 函数的名称。
例如,您可以运行用户定义的函数,以在 BigQuery 中保留副本数据集的表中的已删除记录。此过程称为软删除。
为此,请创建一个函数,将 _metadata_deleted
列的值复制到名为 is_deleted
的新列中,然后将 _metadata_deleted
列的值重置为 false
。这会导致 Dataflow 作业忽略删除事件,并在更新 BigQuery 中的副本数据集时保留已删除的记录。
以下是此用户定义的函数的示例代码:
/** * This function changes the behavior of the Datastream to * BigQuery template to allow soft deletes. * @param {string} messageString from DatastreamIO data * @return {string} same as an input message with an added property */ function transform(messageString) { // messageString is a JSON object as a string var messageJson = JSON.parse(messageString); // Moving the deleted flag to a new column will cause the pipeline to soft delete data. messageJson['is_deleted'] = messageJson['_metadata_deleted']; messageJson['_metadata_deleted'] = false; return JSON.stringify(messageJson); }
设置合并频率
使用 Datastream to BigQuery 模板的 mergeFrequencyMinutes
参数设置合并频率。这是 BigQuery 中副本数据集内指定表的合并间隔时间(以分钟为单位)。在回填历史数据的过程中,我们建议您将合并频率保持在较低水平(12 或 24 小时),以控制费用。
例如,如果您将此参数的值设置为 10 分钟,则 Dataflow 将每 10 分钟运行一次使用相应模板的作业。不过,作业首次运行时会有 5 分钟的延迟。在此示例中,如果作业在上午 9:14 运行,则第一次合并将在上午 9:29 进行(10 分钟用于合并,5 分钟用于延迟)。第二次合并将在上午 9:39 进行,之后的所有合并都将以 10 分钟为间隔进行(上午 9:49、上午 9:59、上午 10:09,依此类推)。
如果您将合并频率设置为 60 分钟,则作业将在整点运行,但首次运行作业时会有 5 分钟的延迟。如果作业安排在上午 10 点运行,则实际上会因 5 分钟的延迟而在上午 10:05 运行。之后的所有合并都将以 60 分钟为间隔进行(上午 11:05、下午 12:05、下午 1:05 等)。
无论是为了控制成本还是出于其他原因,您可能无法以满足业务需求的频率执行合并。您可能没有最新的数据。如需访问最新鲜的数据,请在 BigQuery 中基于过渡数据集和副本数据集的表创建视图,其中该视图会模拟合并。此视图会创建为一个逻辑表(适用于临时数据集和副本数据集)。如果合并频率较低,并且您需要更快地访问数据,请使用视图。