具有 Python UDF 的 Pub/Sub to BigQuery 模板

具有 Python UDF 的 Pub/Sub to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 读取 JSON 格式的消息并将其写入 BigQuery 表中。或者,您可以提供用 Python 编写的用户定义的函数 (UDF) 来处理收到的消息。

流水线要求

  • BigQuery 表必须存在且具有架构。
  • Pub/Sub 消息数据必须使用 JSON 格式,或者您必须提供将消息数据转换为 JSON 的 UDF。JSON 数据必须与 BigQuery 表架构匹配。例如,如果 JSON 载荷的格式为 {"k1":"v1", "k2":"v2"},则 BigQuery 表必须具有两个名为 k1k2 的字符串列。
  • 指定 inputSubscriptioninputTopic 参数,但不能同时指定这两者。

模板参数

参数 说明
outputTableSpec 要写入的 BigQuery 表,格式为 "PROJECT_ID:DATASET_NAME.TABLE_NAME"
inputSubscription 可选:要读取的 Pub/Sub 订阅,格式为 "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME"
inputTopic 可选:要读取的 Pub/Sub 主题,格式为 "projects/PROJECT_ID/topics/TOPIC_NAME"
outputDeadletterTable 未能到达输出表的消息的 BigQuery 表,格式为 "PROJECT_ID:DATASET_NAME.TABLE_NAME"。 如果该表不存在,则系统会在流水线运行时创建该表。如果您未指定此参数,则系统会改为使用值 "OUTPUT_TABLE_SPEC_error_records"
pythonExternalTextTransformGcsPath 可选:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 可选:您要使用的 Python 用户定义的函数 (UDF) 的名称。
useStorageWriteApi 可选:如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce 可选:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false
numStorageWriteApiStreams 可选:使用 Storage Write API 时,指定写入流的数量。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。
storageWriteApiTriggeringFrequencySec 可选:使用 Storage Write API 时,指定触发频率(以秒为单位)。如果 useStorageWriteApitrueuseStorageWriteApiAtLeastOncefalse,则必须设置此参数。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。
  • 运行模板

    控制台

    1. 转到 Dataflow 基于模板创建作业页面。
    2. 转到“基于模板创建作业”
    3. 作业名称字段中,输入唯一的作业名称。
    4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

      如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

    5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to BigQuery with Python UDF template。
    6. 在提供的参数字段中,输入您的参数值。
    7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
    8. 点击运行作业

    gcloud

    在 shell 或终端中,运行模板:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    替换以下内容:

    • JOB_NAME:您选择的唯一性作业名称
    • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
    • TOPIC_NAME:您的 Pub/Sub 主题名称
    • DATASET:您的 BigQuery 数据集
    • TABLE_NAME:您的 BigQuery 表名称

    API

    如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    替换以下内容:

    • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
    • JOB_NAME:您选择的唯一性作业名称
    • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
    • VERSION:您要使用的模板的版本

      您可使用以下值:

    • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
    • TOPIC_NAME:您的 Pub/Sub 主题名称
    • DATASET:您的 BigQuery 数据集
    • TABLE_NAME:您的 BigQuery 表名称

    后续步骤