具有 Python UDF 的 Cloud Storage Text to BigQuery 模板

具有 Python UDF 的 Cloud Storage Text to BigQuery 流水线是一种批处理流水线,用于读取 Cloud Storage 中存储的文本文件,使用 Python 用户定义的函数 (UDF) 转换这些文件,然后将结果附加到 BigQuery 表。

流水线要求

  • 创建一个用于描述 BigQuery 架构的 JSON 文件。

    确保有一个名为 BigQuery Schema 的顶级 JSON 数组,且该数组的内容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 格式。

    Cloud Storage Text to BigQuery 批处理模板不支持将数据导入目标 BigQuery 表中的 STRUCT(记录)字段。

    下面的 JSON 描述了一个 BigQuery 架构示例:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • 使用 UDF 函数(该函数提供转换文本行的逻辑)创建一个 Python (.py) 文件。您的函数必须返回一个 JSON 字符串。

    例如,以下函数将拆分 CSV 文件的每行文本,并通过转换值返回 JSON 字符串。

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

模板参数

参数 说明
JSONPath 用于定义 BigQuery 架构的 JSON 文件(存储在 Cloud Storage 中)的 gs:// 路径。例如 gs://path/to/my/schema.json
pythonExternalTextTransformGcsPath Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 您要使用的 Python 用户定义的函数 (UDF) 的名称。
inputFilePattern Cloud Storage 中待处理文本的 gs:// 路径。例如 gs://path/to/my/text/data.txt
outputTable 要创建用以存储已处理数据的 BigQuery 表名称。如果您重复使用现有 BigQuery 表,则数据将被附加到目标表。例如 my-project-name:my-dataset.my-table
bigQueryLoadingTemporaryDirectory BigQuery 加载进程的临时目录。例如 gs://my-bucket/my-files/temp_dir
useStorageWriteApi 可选:如果为 true,则流水线使用 BigQuery Storage Write API。默认值为 false。如需了解详情,请参阅使用 Storage Write API
useStorageWriteApiAtLeastOnce 可选:使用 Storage Write API 时,指定写入语义。如需使用“至少一次”语义,请将此参数设置为 true。如需使用“正好一次”语义,请将参数设置为 false。仅当 useStorageWriteApitrue 时,此参数才适用。默认值为 false

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:来自 Cloud Storage 输入文件的一行文本。
  • 输出:与 BigQuery 目标表的架构匹配的 JSON 字符串。

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • PYTHON_FUNCTION:您要使用的 Python 用户定义的函数 (UDF) 的名称。
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_PYTHON_UDF_FILE:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • PYTHON_FUNCTION:您要使用的 Python 用户定义的函数 (UDF) 的名称。
  • PATH_TO_BIGQUERY_SCHEMA_JSON:包含架构定义的 JSON 文件的 Cloud Storage 路径
  • PATH_TO_PYTHON_UDF_FILE:Python 代码文件的 Cloud Storage URI,该文件用于定义您要使用的用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA:文本数据集的 Cloud Storage 路径
  • BIGQUERY_TABLE:您的 BigQuery 表名称
  • PATH_TO_TEMP_DIR_ON_GCS:临时目录的 Cloud Storage 路径
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiBatchOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.PythonExternalTextTransformerOptions;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.StreamUtils;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONArray;
import org.json.JSONObject;

/**
 * Templated pipeline to read text from TextIO, apply a javascript UDF to it, and write it to GCS.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_GCS_Text_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "GCS_Text_to_BigQuery_Flex",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to BigQuery with BigQuery Storage API support",
      description =
          "The Cloud Storage Text to BigQuery pipeline is a batch pipeline that allows you to read text files stored in "
              + "Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to a BigQuery table.",
      optionsClass = TextIOToBigQuery.Options.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-bigquery",
      flexContainerName = "text-to-bigquery",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
            + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
            + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"BigQuery Schema\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + "obj.name = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + "obj.coffee = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}</pre>"
      }),
  @Template(
      name = "GCS_Text_to_BigQuery_Xlang",
      category = TemplateCategory.BATCH,
      displayName =
          "Text Files on Cloud Storage to BigQuery with BigQuery Storage API & Python UDF support",
      type = Template.TemplateType.XLANG,
      description =
          "The Cloud Storage Text to BigQuery pipeline is a batch pipeline that allows you to read text files stored in "
              + "Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to a BigQuery table.",
      optionsClass = TextIOToBigQuery.Options.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName"
      },
      optionalOptions = {"javascriptTextTransformGcsPath", "javascriptTextTransformFunctionName"},
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-bigquery",
      flexContainerName = "text-to-bigquery-xlang",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
            + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
            + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"BigQuery Schema\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + "obj.name = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + "obj.coffee = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}</pre>"
      })
})
public class TextIOToBigQuery {

  /** Options supported by {@link TextIOToBigQuery}. */
  public interface Options
      extends DataflowPipelineOptions,
          PythonExternalTextTransformerOptions,
          BigQueryStorageApiBatchOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        optional = false,
        description = "The GCS location of the text you'd like to process",
        helpText = "The gs:// path to the text in Cloud Storage you'd like to process.",
        example = "gs://your-bucket/your-file.txt")
    String getInputFilePattern();

    void setInputFilePattern(String value);

    @TemplateParameter.GcsReadFile(
        order = 2,
        optional = false,
        description = "JSON file with BigQuery Schema description",
        helpText =
            "The gs:// path to the JSON file that defines your BigQuery schema, stored in Cloud Storage.",
        example = "gs://your-bucket/your-schema.json")
    String getJSONPath();

    void setJSONPath(String value);

    @TemplateParameter.BigQueryTable(
        order = 3,
        optional = false,
        groupName = "Target",
        description = "Output table to write to",
        helpText =
            "The location of the BigQuery table to use to store the processed data. If you reuse an existing table, it is overwritten.",
        example = "<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>")
    String getOutputTable();

    void setOutputTable(String value);

    @TemplateParameter.JavascriptUdfFile(
        order = 4,
        optional = false,
        description = "GCS path to javascript fn for transforming output",
        helpText =
            "The Cloud Storage URI of the `.js` file that defines the JavaScript user-defined function (UDF) you want to use.",
        example = "gs://your-bucket/your-transforms/*.js")
    String getJavascriptTextTransformGcsPath();

    void setJavascriptTextTransformGcsPath(String jsTransformPath);

    @TemplateParameter.Text(
        order = 5,
        optional = false,
        regexes = {"[a-zA-Z0-9_]+"},
        description = "UDF Javascript Function Name",
        helpText =
            "The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is `myTransform(inJson) { /*...do stuff...*/ }`, then the function name is `myTransform`. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)",
        example = "transform_udf1")
    String getJavascriptTextTransformFunctionName();

    void setJavascriptTextTransformFunctionName(String javascriptTextTransformFunctionName);

    @Validation.Required
    @TemplateParameter.GcsWriteFolder(
        order = 6,
        optional = false,
        description = "Temporary directory for BigQuery loading process",
        helpText = "Temporary directory for BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp-dir")
    String getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(String directory);
  }

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";

  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options, () -> writeToBQTransform(options));
  }

  /**
   * Create the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
   */
  @VisibleForTesting
  static PipelineResult run(Options options, Supplier<Write<TableRow>> writeToBQ) {
    BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);

    Pipeline pipeline = Pipeline.create(options);

    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    PCollection<String> source =
        pipeline.apply("Read from source", TextIO.read().from(options.getInputFilePattern()));
    PCollection<TableRow> udfOut;

    if (usePythonUdf) {
      udfOut =
          source
              .apply(
                  "MapToRecord",
                  PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                      .stringMappingFunction())
              .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  "MapToTableRowElements",
                  ParDo.of(new PythonExternalTextTransformer.RowToTableRowElementFn()));
    } else {
      udfOut =
          source
              .apply(
                  TransformTextViaJavascript.newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .build())
              .apply(
                  MapElements.via(
                      new SimpleFunction<String, TableRow>() {
                        @Override
                        public TableRow apply(String json) {
                          return BigQueryConverters.convertJsonToTableRow(json);
                        }
                      }));
    }

    udfOut.apply("Insert into Bigquery", writeToBQ.get());

    return pipeline.run();
  }

  /** Create the {@link Write} transform that outputs the collection to BigQuery. */
  @VisibleForTesting
  static Write<TableRow> writeToBQTransform(Options options) {
    return BigQueryIO.writeTableRows()
        .withSchema(parseSchema(options.getJSONPath()))
        .to(options.getOutputTable())
        .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND)
        .withCustomGcsTempLocation(
            StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory()));
  }

  /** Parse BigQuery schema from a Json file. */
  private static TableSchema parseSchema(String jsonPath) {
    TableSchema tableSchema = new TableSchema();
    List<TableFieldSchema> fields = new ArrayList<>();

    JSONObject jsonSchema = parseJson(jsonPath);

    JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);
      fields.add(convertToTableFieldSchema(inputField));
    }
    tableSchema.setFields(fields);

    return tableSchema;
  }

  /**
   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * it recursively.
   *
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
   */
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()
            .setName(inputField.getString(NAME))
            .setType(inputField.getString(TYPE));

    if (inputField.has(MODE)) {
      field.setMode(inputField.getString(MODE));
    }

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);
        nestedFields.add(convertToTableFieldSchema(nestedJSON));
      }
      field.setFields(nestedFields);
    }

    return field;
  }

  /**
   * Parses a JSON file and returns a JSONObject containing the necessary source, sink, and schema
   * information.
   *
   * @param pathToJson the JSON file location so we can download and parse it
   * @return the parsed JSONObject
   */
  private static JSONObject parseJson(String pathToJson) {
    try {
      // accessing GCS needs to be done after the pipeline create call, otherwise FileSystems
      // doesn't know about GCS.
      ReadableByteChannel readableByteChannel =
          FileSystems.open(FileSystems.matchNewResource(pathToJson, false));
      String json =
          new String(
              StreamUtils.getBytesWithoutClosing(Channels.newInputStream(readableByteChannel)),
              StandardCharsets.UTF_8);
      return new JSONObject(json);
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

后续步骤