具有 Python UDF 的 Cloud Storage Text to BigQuery (Stream) 模板

Cloud Storage Text to BigQuery 流水线是一种流处理流水线,用于流式传输 Cloud Storage 中存储的文本文件,使用您提供的 Python 用户定义的函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery。

流水线无限期运行,需要通过取消而非排空手动终止,原因是其使用 Watch 转换,该转换是不支持排空的可拆分 DoFn

流水线要求

  • 创建一个用于描述 BigQuery 中的输出表架构的 JSON 文件。

    确保有一个名为 fields 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。例如:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 Python (.py) 文件。您的函数必须返回一个 JSON 字符串。

    以下示例将拆分 CSV 文件的每一行,使用相应值创建 JSON 对象,并返回 JSON 字符串:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

模板参数

参数 说明
pythonExternalTextTransformGcsPath Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 您要使用的 Python 用户定义的函数 (UDF) 的名称。
JSONPath 您的 BigQuery 架构文件的 Cloud Storage 位置,以 JSON 格式描述。例如:gs://path/to/my/schema.json
outputTable 完全限定的 BigQuery 表,例如 my-project:dataset.table
inputFilePattern 您要处理的文本的 Cloud Storage 位置,例如:gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录,例如:gs://my-bucket/my-files/temp_dir
outputDeadletterTable 无法到达输出表的消息表。例如 my-project:dataset.my-unprocessed-table。如果该表不存在,则系统会在流水线执行期间创建它。如果未指定此参数,则系统会改用 <outputTableSpec>_error_records

用户定义的函数

此模板需要使用 UDF 来解析输入文件,如流水线要求中所述。该模板会为每个输入文件中的每一行文本调用 UDF。如需详细了解如何创建 UDF,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:来自输入文件的一行文本。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Storage Text to BigQuery (Stream) with Python UDF template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • PYTHON_FUNCTION:您要使用的 Python 用户定义的函数 (UDF) 的名称。
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_PYTHON_UDF_FILE:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • PYTHON_FUNCTION:您要使用的 Python 用户定义的函数 (UDF) 的名称。
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_PYTHON_UDF_FILE:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • BIGQUERY_UNPROCESSED_TABLE:未处理消息的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

后续步骤