Datastream to BigQuery (Stream) 模板

Datastream to BigQuery 模板是一种流处理流水线,可读取 Datastream 数据并将其复制到 BigQuery 中。该模板使用 Pub/Sub 通知从 Cloud Storage 中读取数据,并将其复制到时间分区的 BigQuery 暂存表中。复制后,该模板会在 BigQuery 中执行 MERGE,以将所有变更数据捕获 (CDC) 更改插入/更新到源表的副本中。

此模板处理创建和更新通过复制管理的 BigQuery 表。当需要数据定义语言 (DDL) 时,对 Datastream 的回调将提取源表架构并将其转换为 BigQuery 数据类型。支持的操作包括下列操作:

  • 在插入数据时创建新表。
  • 在 BigQuery 表中添加新列,且初始值为 null。
  • 在 BigQuery 中忽略丢弃的列,且未来的值为 null。
  • 重命名的列将作为新列添加到 BigQuery 中。
  • 类型更改不会传播到 BigQuery。

建议使用“至少一次”流处理模式运行此流水线,因为模板在将临时 BigQuery 表中的数据合并到主 BigQuery 表时执行去重操作。流水线中的此步骤意味着使用“正好一次”流处理模式没有其他优势。

流水线要求

  • 已准备好或已在复制数据的 Datastream 数据流。
  • 已经为 Datastream 数据启用 Cloud Storage Pub/Sub 通知
  • 已创建 BigQuery 目标数据集,并且 Compute Engine 服务账号已获得这些数据集的管理员访问权限。
  • 若要创建目标副本表,源表中必须有主键。
  • MySQL 或 Oracle 源数据库。不支持 PostgreSQL 和 SQL Server 数据库。

模板参数

必需参数

  • inputFilePattern:Cloud Storage 中 Datastream 文件输出的文件位置,格式为:gs://<BUCKET_NAME>/<ROOT_PATH>/。
  • inputFileFormat:Datastream 生成的输出文件的格式。值可以是“avro”或“json”。默认值为:avro。
  • gcsPubSubSubscription:Cloud Storage 用于通知 Dataflow 可处理的新文件的 Pub/Sub 订阅,格式为:projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>。
  • outputStagingDatasetTemplate:包含暂存表的数据集的名称。此参数支持模板,例如 {_metadata_dataset}_log 或 my_dataset_log。通常,此参数是数据集名称。默认值为:{_metadata_dataset}。
  • outputDatasetTemplate:包含副本表的数据集的名称。此参数支持模板,例如 {_metadata_dataset} 或 my_dataset。通常,此参数是数据集名称。默认值为:{_metadata_dataset}。
  • deadLetterQueueDirectory:Dataflow 用于写入死信队列输出的路径。此路径不得与 Datastream 文件输出的路径相同。默认值为空。

可选参数

  • streamName:用于轮询架构信息的数据流的名称或模板。默认值为:{_metadata_stream}。默认值通常就足够了。
  • rfcStartDateTime:用于从 Cloud Storage 中提取数据的起始日期时间 (https://tools.ietf.org/html/rfc3339)。默认值为:1970-01-01T00:00:00.00Z。
  • fileReadConcurrency:要读取的并发 DataStream 文件的数量。默认值为 10。
  • outputProjectId:包含要将数据输出到其中的 BigQuery 数据集的 Google Cloud 项目的 ID。此参数的默认项目是 Dataflow 流水线在其中运行的项目。
  • outputStagingTableNameTemplate:用于为暂存表命名的模板。例如,{_metadata_table}。默认值为:{_metadata_table}_log。
  • outputTableNameTemplate:用于副本表名称的模板,例如 {_metadata_table}。默认值为:{_metadata_table}。
  • ignoreFields:BigQuery 中要忽略的字段(以英文逗号分隔)。默认值为:_metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count。(示例:_metadata_stream,_metadata_schema)。
  • mergeFrequencyMinutes:合并给定表之间的分钟数。默认值为:5。
  • dlqRetryMinutes:DLQ 重试之间的分钟数。默认值为:10。
  • dataStreamRootUrl:Datastream API 根网址。默认值为 https://datastream.googleapis.com/
  • applyMerge:是否为作业停用 MERGE 查询。默认值为:true。
  • mergeConcurrency:并发 BigQuery MERGE 查询的次数。仅当 applyMerge 设置为 true 时有效。默认值为:30。
  • partitionRetentionDays:运行 BigQuery 合并时用于分区保留的天数。默认值为:1。
  • useStorageWriteApiAtLeastOnce:此参数仅在启用了“使用 BigQuery Storage Write API”时有效。如果为 true,则系统会将“至少一次”语义用于 Storage Write API。否则,系统会使用“正好一次”语义。默认值为:false。
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。(示例:gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0。
  • pythonTextTransformGcsPath:包含用户定义的函数的 Python 代码的 Cloud Storage 路径模式。(示例:gs://your-bucket/your-transforms/*.py)。
  • pythonRuntimeVersion:用于 Python UDF 的运行时版本。
  • pythonTextTransformFunctionName:要从 JavaScript 文件调用的函数的名称。只能使用字母、数字和下划线。(示例:transform_udf1)。
  • runtimeRetries:在失败之前重试运行时的次数。默认值为:5。
  • useStorageWriteApi:如果为 true,则流水线使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。默认值为 false。如需了解详情,请参阅“使用 Storage Write API”(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • numStorageWriteApiStreams:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。默认值为 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:CDC 数据,序列化为 JSON 字符串。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the Datastream to BigQuery template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
    8. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
    • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH:Datastream 数据的 Cloud Storage 路径。例如 gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME:要从中读取已更改文件的 Pub/Sub 订阅。例如:projects/my-project-id/subscriptions/my-subscription-id
    • BIGQUERY_DATASET:您的 BigQuery 数据集名称。
    • BIGQUERY_TABLE:您的 BigQuery 表模板。例如 {_metadata_schema}_{_metadata_table}_log

    后续步骤