Pub/Sub Proto to BigQuery 模板

Pub/Sub proto to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

可以提供 JavaScript 用户定义函数 (UDF) 来转换数据。可以将在执行 UDF 期间发生的错误发送到单独的 Pub/Sub 主题或与 BigQuery 错误相同的未处理主题。

在为此场景运行 Dataflow 流水线之前,请考虑是否使用 Pub/Sub BigQuery 订阅UDF 满足您的要求。

流水线要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Proto 记录的架构文件必须存在于 Cloud Storage 中。
  • 输出 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。
  • 如果 BigQuery 表存在,则无论 createDisposition 值如何,该表都必须具有与 proto 数据匹配的架构。

模板参数

必需参数

  • protoSchemaPath(Proto 架构文件的 Cloud Storage 路径):独立描述符集文件的 Cloud Storage 路径。示例:gs://MyBucket/schema.pb。通过将 --descriptor_set_out=schema.pb 添加到编译 proto 的 protoc 命令中,可以生成 schema.pb--include_imports 标志可用于确保文件是独立的。
  • fullMessageName(完整 Proto 消息名称):完整消息名称(例如:package.name.MessageName)。如果消息嵌套在另一个消息中,则使用“.”分隔符包含所有消息(例如:package.name.OuterMessage.InnerMessage)。“package.name”应来自 package 语句,而不是 java_package 语句。
  • inputSubscription(Pub/Sub 输入订阅):要从中读取输入的 Pub/Sub 订阅,格式为“projects/your-project-id/subscriptions/your-subscription-name”(示例:projects/your-project-id/subscriptions/your-subscription-name)。
  • outputTableSpec(BigQuery 输出表):要将输出写入到的 BigQuery 表位置。该名称应采用 <project>:<dataset>.<table_name> 格式。表的架构必须与输入对象匹配。
  • outputTopic(输出 Pub/Sub 主题):应将数据发布到的主题的名称,格式为“projects/your-project-id/topics/your-topic-name”(示例:projects/your-project-id/topics/your-topic-name)。

可选参数

  • preserveProtoFieldNames(保留 Proto 字段名称):用于控制是应保留 Proto 字段名称还是将其转换为 lowerCamelCase 的标志。如果表已存在,则应根据与表架构匹配的内容来确定此值。否则,它将确定所创建表的列名。设置为 true 可保留 proto snake_case。设置为 False 会将字段转换为 lowerCamelCase。(默认值:false)。
  • bigQueryTableSchemaPath(BigQuery 表架构路径):BigQuery 架构 JSON 文件的 Cloud Storage 路径。如果未设置此值,则系统将根据 Proto 架构推断架构。(例如:gs://MyBucket/bq_schema.json)。
  • udfOutputTopic(用于 UDF 失败的 Pub/Sub 输出主题):要将 UDF 失败发送到的可选输出主题。如果未设置此选项,则失败将写入到与 BigQuery 失败相同的主题。(示例:projects/your-project-id/topics/your-topic-name)。
  • writeDisposition(要用于 BigQuery 的写入处置方式):BigQuery WriteDisposition。例如,WRITE_APPEND、WRITE_EMPTY 或 WRITE_TRUNCATE。默认值为:WRITE_APPEND。
  • createDisposition(要用于 BigQuery 的创建处置方式):BigQuery CreateDisposition。例如,CREATE_IF_NEEDED、CREATE_NEVER。默认值为:CREATE_IF_NEEDED。
  • javascriptTextTransformGcsPath(JavaScript UDF 来源的 Cloud Storage 路径):包含用户定义的函数的 JavaScript 代码的 Cloud Storage 路径模式。(示例:gs://your-bucket/your-function.js)。
  • javascriptTextTransformFunctionName(UDF JavaScript 函数名称):要从 JavaScript 文件调用的函数的名称。只能使用字母、数字和下划线。(示例:“transform”或“transform_udf1”)。
  • javascriptTextTransformReloadIntervalMinutes(JavaScript UDF 自动重新加载间隔 [分钟]):定义工作器检查 JavaScript UDF 更改以重新加载文件的间隔。默认值为 0。
  • useStorageWriteApi(使用 BigQuery Storage Write API):如果为 true,则流水线会在将数据写入 BigQuery 时使用 Storage Write API(请参阅 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api)。默认值为 false。以“正好一次”模式使用 Storage Write API 时,您必须设置以下参数:“BigQuery Storage Write API 的流数量”和“BigQuery Storage Write API 的触发频率(以秒为单位)”。如果您启用 Dataflow“至少一次”模式或将 useStorageWriteApiAtLeastOnce 参数设置为 true,则无需设置流数量或触发频率。
  • useStorageWriteApiAtLeastOnce(在 BigQuery Storage Write API 中使用“至少一次”语义):此参数仅在启用了“使用 BigQuery Storage Write API”时有效。如果启用,则系统会将“至少一次”语义用于 Storage Write API,否则会使用“正好一次”语义。默认值为:false。
  • numStorageWriteApiStreams(BigQuery Storage Write API 的流数量):流数量定义了 BigQueryIO 的写入转换的并行性,大致对应于流水线将使用的 Storage Write API 的流数量。如需了解建议的值,请参阅 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api。默认值为 0。
  • storageWriteApiTriggeringFrequencySec(BigQuery Storage Write API 的触发频率 [以秒为单位]):触发频率将决定数据需要多久才能在 BigQuery 中可供查询。如需了解建议的值,请参阅 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Proto to BigQuery template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    替换以下内容:

    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
    • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
    • BIGQUERY_TABLE:BigQuery 输出表名称
    • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
    • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
    • BIGQUERY_TABLE:BigQuery 输出表名称
    • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

    后续步骤