Datastream to BigQuery (Stream) 模板

Datastream to BigQuery 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到 BigQuery 中。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将其复制到时间分区的 BigQuery 暂存表中。复制后,该模板会在 BigQuery 中执行 MERGE,以将所有更改数据捕获 (CDC) 更改插入/更新到源表的副本中。

此模板处理创建和更新通过复制管理的 BigQuery 表。当需要数据定义语言 (DDL) 时,对 Datastream 的回调将提取源表架构并将其转换为 BigQuery 数据类型。支持的操作包括下列操作:

  • 在插入数据时创建新表。
  • 在 BigQuery 表中添加新列,且初始值为 null。
  • 在 BigQuery 中忽略丢弃的列,且未来的值为 null。
  • 重命名的列将作为新列添加到 BigQuery 中。
  • 类型更改不会传播到 BigQuery。

建议使用“至少一次”流处理模式运行此流水线,因为模板在将临时 BigQuery 表中的数据合并到主 BigQuery 表时执行去重操作。流水线中的此步骤意味着使用“正好一次”流处理模式没有其他优势。

流水线要求

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • BigQuery 目标数据集已创建,而 Compute Engine 服务账号已获授予管理员的访问权限。
  • 若要创建目标副本表,源表中必须有主键。
  • MySQL 或 Oracle 源数据库。不支持 PostgreSQL 数据库。

模板参数

参数 说明
inputFilePattern Cloud Storage 中要复制的 Datastream 文件的位置。此文件位置通常是数据流的根路径。
gcsPubSubSubscription 包含 Datastream 文件通知的 Pub/Sub 订阅。例如 projects/my-project-id/subscriptions/my-subscription-id
inputFileFormat Datastream 生成的输出文件的格式。例如:avro,json。默认值:avro
outputStagingDatasetTemplate 包含暂存表的现有数据集的名称。您可以将模板 {_metadata_dataset} 包括为占位符,它会被替换为源数据集/架构的名称(例如 {_metadata_dataset}_log)。
outputDatasetTemplate 包含副本表的现有数据集的名称。您可以将模板 {_metadata_dataset} 包括为占位符,它会被替换为源数据集/架构的名称(例如 {_metadata_dataset})。
deadLetterQueueDirectory 用于存储任何未处理消息以及无法处理原因的文件路径。默认值为 Dataflow 作业的临时位置下的目录。在大多数情况下,默认值就可以了。
outputStagingTableNameTemplate 可选:暂存表的名称模板。默认值为 {_metadata_table}_log。如果您要复制多个架构,建议使用 {_metadata_schema}_{_metadata_table}_log
outputTableNameTemplate 可选:副本表名称的模板。默认值:{_metadata_table}。如果您要复制多个架构,建议使用 {_metadata_schema}_{_metadata_table}
outputProjectId 可选:BigQuery 数据集的项目,用于将数据输出到其中。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
streamName 可选:用于轮询架构信息的数据流的名称或模板。默认值:{_metadata_stream}
mergeFrequencyMinutes 可选:合并给定表格的时间间隔(分钟)。默认值:5。
dlqRetryMinutes 可选:死信队列 (DLQ) 重试之间的分钟数。默认值:10。
javascriptTextTransformGcsPath 可选:.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName 可选: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
useStorageWriteApi 可选:如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce 可选:使用 Storage Write API 时,请指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
numStorageWriteApiStreams 可选:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。
storageWriteApiTriggeringFrequencySec 可选:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。
applyMerge 可选:指定模板在将数据复制到暂存表后是否在 BigQuery 中执行 MERGE 语句。默认值:true.
fileReadConcurrency 可选:要同时读取的 Datastream 文件的数量。默认值:10。
mergeConcurrency 可选:并发 BigQuery MERGE 语句的数量。默认值:30。
partitionRetentionDays 可选:运行 BigQuery MERGE 语句时用于分区保留的天数。默认值:1。
rfcStartDateTime 可选:从 Cloud Storage 读取文件的开始时间(以 RFC 3339 日期时间值表示)。默认值:1970-01-01T00:00:00.00Z

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:CDC 数据,序列化为 JSON 字符串。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the Datastream to BigQuery template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
    8. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    请替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
    • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    请替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
    • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log

    后续步骤