具有 Python UDF 的 Pub/Sub Proto to BigQuery 模板

Pub/Sub proto to BigQuery 模板是一种流处理流水线,用于将 Pub/Sub 订阅中的 Avro 数据提取到 BigQuery 表中。向 BigQuery 表写入数据时发生的任何错误都会流式传输到 Pub/Sub 未处理的主题。

可以提供 Python 用户定义的函数 (UDF) 来转换数据。可以将在执行 UDF 期间发生的错误发送到单独的 Pub/Sub 主题或与 BigQuery 错误相同的未处理主题。

流水线要求

  • 用作输入来源的 Pub/Sub 订阅必须存在。
  • Proto 记录的架构文件必须存在于 Cloud Storage 中。
  • 输出 Pub/Sub 主题必须存在。
  • 用作输出目标的 BigQuery 数据集必须已存在。
  • 如果 BigQuery 表存在,则无论 createDisposition 值如何,该表都必须具有与 proto 数据匹配的架构。

模板参数

参数 说明
protoSchemaPath 独立的 proto 架构文件的 Cloud Storage 位置。例如 gs://path/to/my/file.pb。 您可以使用 protoc 命令的 --descriptor_set_out 标志生成此文件。--include_imports 标志可确保文件是独立的。
fullMessageName 完整的 proto 消息名称。例如 package.name.MessageName,其中 package.name 是为 package 语句(而不是 java_package 语句)提供的值。
inputSubscription 要读取的 Pub/Sub 输入订阅。例如 projects/<project>/subscriptions/<subscription>
outputTopic 要用于未处理的记录的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 输出表位置。例如 my-project:my_dataset.my_table。 根据指定的 createDisposition,系统可能会使用输入架构文件自动创建输出表。
preserveProtoFieldNames 可选:true 用于保留 JSON 中的原始 Proto 字段名称。false 用于使用更多标准 JSON 名称。例如,false 会将 field_name 更改为 fieldName。(默认:false
bigQueryTableSchemaPath 可选:BigQuery 架构路径到 Cloud Storage 路径。例如 gs://path/to/my/schema.json。如果未提供,则根据 Proto 架构推断架构。
pythonExternalTextTransformGcsPath 可选:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 可选:您要使用的 Python 用户定义的函数 (UDF) 的名称。
udfOutputTopic 可选:存储 UDF 错误的 Pub/Sub 主题。例如 projects/<project-id>/topics/<topic-name>。如果未提供,则会将 UDF 错误发送到 outputTopic 所在的主题。
writeDisposition 可选:BigQuery WriteDisposition。例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。默认值:WRITE_APPEND
createDisposition 可选:BigQuery CreateDisposition。例如 CREATE_IF_NEEDEDCREATE_NEVER。默认值为 CREATE_IF_NEEDED
useStorageWriteApi 可选:如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce 可选:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
numStorageWriteApiStreams 可选:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。
storageWriteApiTriggeringFrequencySec 可选:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the Pub/Sub Proto to BigQuery with Python UDF template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    替换以下内容:

    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
    • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
    • BIGQUERY_TABLE:BigQuery 输出表名称
    • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • SCHEMA_PATH:Proto 架构文件的 Cloud Storage 路径(例如 gs://MyBucket/file.pb
    • PROTO_MESSAGE_NAME:Proto 消息名称(例如 package.name.MessageName
    • SUBSCRIPTION_NAME:Pub/Sub 输入订阅名称
    • BIGQUERY_TABLE:BigQuery 输出表名称
    • UNPROCESSED_TOPIC:要用于未处理的队列的 Pub/Sub 主题

    后续步骤