Cloud Storage Text to BigQuery template with Python UDF

The Cloud Storage Text to BigQuery with Python UDF pipeline is a batch pipeline that reads text files stored in Cloud Storage, transforms them using a Python user-defined function (UDF), and appends the result to a BigQuery table.

Pipeline requirements

  • Create a JSON file that describes your BigQuery schema.

    Ensure that there is a top-level JSON array titled BigQuery Schema and that its contents follow the pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    The Cloud Storage Text to BigQuery batch template doesn't support importing data into STRUCT (Record) fields in the target BigQuery table.

    The following JSON describes an example BigQuery schema:

      "BigQuery Schema": [
          "name": "name",
          "type": "STRING"
          "name": "age",
          "type": "INTEGER"
  • Create a Python (.py) file with your UDF function that supplies the logic to transform the lines of text. Your function must return a JSON string.

    For example, this function splits each line of a CSV file and returns a JSON string after transforming the values.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Template parameters

Parameter Description
JSONPath The gs:// path to the JSON file that defines your BigQuery schema, stored in Cloud Storage. For example, gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/
pythonExternalTextTransformFunctionName The name of the Python user-defined function (UDF) that you want to use.
inputFilePattern The gs:// path to the text in Cloud Storage you'd like to process. For example, gs://path/to/my/text/data.txt.
outputTable The BigQuery table name you want to create to store your processed data in. If you reuse an existing BigQuery table, the data is appended to the destination table. For example,
bigQueryLoadingTemporaryDirectory The temporary directory for the BigQuery loading process. For example, gs://my-bucket/my-files/temp_dir.
useStorageWriteApi Optional: If true, the pipeline uses the BigQuery Storage Write API. The default value is false. For more information, see Using the Storage Write API.
useStorageWriteApiAtLeastOnce Optional: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics, set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: a line of text from a Cloud Storage input file.
  • Output: a JSON string that matches the schema of the BigQuery destination table.

Run the template

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • PYTHON_FUNCTION: The name of the Python user-defined function (UDF) that you want to use.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_PYTHON_UDF_FILE: The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/
  • PATH_TO_TEXT_DATA: your Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • PATH_TO_TEMP_DIR_ON_GCS: your Cloud Storage path to the temp directory

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • PYTHON_FUNCTION: The name of the Python user-defined function (UDF) that you want to use.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_PYTHON_UDF_FILE: The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/
  • PATH_TO_TEXT_DATA: your Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • PATH_TO_TEMP_DIR_ON_GCS: your Cloud Storage path to the temp directory
 * Copyright (C) 2022 Google LLC
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.

import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.StreamUtils;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONArray;
import org.json.JSONObject;

 * Templated pipeline to read text from TextIO, apply a javascript UDF to it, and write it to GCS.
 * <p>Check out <a
 * href="">README</a>
 * for instructions on how to use or modify this template.
      name = "GCS_Text_to_BigQuery_Flex",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to BigQuery with BigQuery Storage API support",
      description =
          "The Cloud Storage Text to BigQuery pipeline is a batch pipeline that allows you to read text files stored in "
              + "Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to a BigQuery table.",
      optionsClass = TextIOToBigQuery.Options.class,
      skipOptions = {
      documentation =
      flexContainerName = "text-to-bigquery",
      contactInformation = "",
      requirements = {
        "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
            + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
            + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"BigQuery Schema\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + " = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + " = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}</pre>"
      name = "GCS_Text_to_BigQuery_Xlang",
      category = TemplateCategory.BATCH,
      displayName =
          "Text Files on Cloud Storage to BigQuery with BigQuery Storage API & Python UDF support",
      type = Template.TemplateType.XLANG,
      description =
          "The Cloud Storage Text to BigQuery pipeline is a batch pipeline that allows you to read text files stored in "
              + "Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to a BigQuery table.",
      optionsClass = TextIOToBigQuery.Options.class,
      skipOptions = {
      optionalOptions = {"javascriptTextTransformGcsPath", "javascriptTextTransformFunctionName"},
      documentation =
      flexContainerName = "text-to-bigquery-xlang",
      contactInformation = "",
      requirements = {
        "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
            + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
            + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"BigQuery Schema\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + " = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + " = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}</pre>"
public class TextIOToBigQuery {

  /** Options supported by {@link TextIOToBigQuery}. */
  public interface Options
      extends DataflowPipelineOptions,
          BigQueryStorageApiBatchOptions {
        order = 1,
        groupName = "Source",
        optional = false,
        description = "The GCS location of the text you'd like to process",
        helpText = "The gs:// path to the text in Cloud Storage you'd like to process.",
        example = "gs://your-bucket/your-file.txt")
    String getInputFilePattern();

    void setInputFilePattern(String value);

        order = 2,
        optional = false,
        description = "JSON file with BigQuery Schema description",
        helpText =
            "The gs:// path to the JSON file that defines your BigQuery schema, stored in Cloud Storage.",
        example = "gs://your-bucket/your-schema.json")
    String getJSONPath();

    void setJSONPath(String value);

        order = 3,
        optional = false,
        groupName = "Target",
        description = "Output table to write to",
        helpText =
            "The location of the BigQuery table to use to store the processed data. If you reuse an existing table, it is overwritten.",
        example = "<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>")
    String getOutputTable();

    void setOutputTable(String value);

        order = 4,
        optional = false,
        description = "GCS path to javascript fn for transforming output",
        helpText =
            "The Cloud Storage URI of the `.js` file that defines the JavaScript user-defined function (UDF) you want to use.",
        example = "gs://your-bucket/your-transforms/*.js")
    String getJavascriptTextTransformGcsPath();

    void setJavascriptTextTransformGcsPath(String jsTransformPath);

        order = 5,
        optional = false,
        regexes = {"[a-zA-Z0-9_]+"},
        description = "UDF Javascript Function Name",
        helpText =
            "The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is `myTransform(inJson) { /* stuff...*/ }`, then the function name is `myTransform`. For sample JavaScript UDFs, see UDF Examples (",
        example = "transform_udf1")
    String getJavascriptTextTransformFunctionName();

    void setJavascriptTextTransformFunctionName(String javascriptTextTransformFunctionName);

        order = 6,
        optional = false,
        description = "Temporary directory for BigQuery loading process",
        helpText = "Temporary directory for BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp-dir")
    String getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(String directory);

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";

  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options, () -> writeToBQTransform(options));

   * Create the pipeline with the supplied options.
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
  static PipelineResult run(Options options, Supplier<Write<TableRow>> writeToBQ) {

    Pipeline pipeline = Pipeline.create(options);

    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");

    PCollection<String> source =
        pipeline.apply("Read from source",;
    PCollection<TableRow> udfOut;

    if (usePythonUdf) {
      udfOut =
                  ParDo.of(new PythonExternalTextTransformer.RowToTableRowElementFn()));
    } else {
      udfOut =
                      new SimpleFunction<String, TableRow>() {
                        public TableRow apply(String json) {
                          return BigQueryConverters.convertJsonToTableRow(json);

    udfOut.apply("Insert into Bigquery", writeToBQ.get());


  /** Create the {@link Write} transform that outputs the collection to BigQuery. */
  static Write<TableRow> writeToBQTransform(Options options) {
    return BigQueryIO.writeTableRows()

  /** Parse BigQuery schema from a Json file. */
  private static TableSchema parseSchema(String jsonPath) {
    TableSchema tableSchema = new TableSchema();
    List<TableFieldSchema> fields = new ArrayList<>();

    JSONObject jsonSchema = parseJson(jsonPath);

    JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);

    return tableSchema;

   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * it recursively.
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()

    if (inputField.has(MODE)) {

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);

    return field;

   * Parses a JSON file and returns a JSONObject containing the necessary source, sink, and schema
   * information.
   * @param pathToJson the JSON file location so we can download and parse it
   * @return the parsed JSONObject
  private static JSONObject parseJson(String pathToJson) {
    try {
      // accessing GCS needs to be done after the pipeline create call, otherwise FileSystems
      // doesn't know about GCS.
      ReadableByteChannel readableByteChannel =
, false));
      String json =
          new String(
      return new JSONObject(json);
    } catch (Exception e) {
      throw new RuntimeException(e);

What's next