Cloud Storage Text to BigQuery (Stream) with Python UDF template

Cloud Storage Text to BigQuery pipeline is a streaming pipeline that streams text files stored in Cloud Storage, transforms them using a Python user-defined function (UDF) that you provide, and appends the result to BigQuery.

The pipeline runs indefinitely and needs to be terminated manually via a cancel and not a drain, due to its use of the Watch transform, which is a splittable DoFn that does not support draining.

Pipeline requirements

  • Create a JSON file that describes the schema of your output table in BigQuery.

    Ensure that there is a top-level JSON array titled fields and that its contents follow the pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. For example:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
    
  • Create a Python (.py) file with your UDF function that supplies the logic to transform the lines of text. Your function must return a JSON string.

    The following example splits each line of a CSV file, creates a JSON object with the values, and returns a JSON string:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)
    

Template parameters

Parameter Description
pythonExternalTextTransformGcsPath The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName The name of the Python user-defined function (UDF) that you want to use.
JSONPath Cloud Storage location of your BigQuery schema file, described as a JSON. For example: gs://path/to/my/schema.json.
outputTable The fully qualified BigQuery table. For example: my-project:dataset.table
inputFilePattern Cloud Storage location of the text you'd like to process. For example: gs://my-bucket/my-files/text.txt.
bigQueryLoadingTemporaryDirectory Temporary directory for BigQuery loading process. For example: gs://my-bucket/my-files/temp_dir
outputDeadletterTable Table for messages that failed to reach the output table. For example: my-project:dataset.my-unprocessed-table. If it doesn't exist, it is created during pipeline execution. If not specified, <outputTableSpec>_error_records is used instead.

User-defined function

This template requires a UDF that parses the input files, as described in Pipeline requirements. The template calls the UDF for every line of text in each input file. For more information about creating UDFs, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: a single line of text from an input file.
  • Output: a JSON string that matches the schema of the BigQuery destination table.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Storage Text to BigQuery (Stream) with Python UDF template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • PYTHON_FUNCTION: The name of the Python user-defined function (UDF) that you want to use.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_PYTHON_UDF_FILE: The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: the Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • BIGQUERY_UNPROCESSED_TABLE: the name of your BigQuery table for unprocessed messages
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
  • PYTHON_FUNCTION: The name of the Python user-defined function (UDF) that you want to use.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: the Cloud Storage path to the JSON file containing the schema definition
  • PATH_TO_PYTHON_UDF_FILE: The Cloud Storage URI of the Python code file that defines the user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: the Cloud Storage path to your text dataset
  • BIGQUERY_TABLE: your BigQuery table name
  • BIGQUERY_UNPROCESSED_TABLE: the name of your BigQuery table for unprocessed messages
  • PATH_TO_TEMP_DIR_ON_GCS: the Cloud Storage path to the temp directory

What's next