The Pub/Sub proto to BigQuery template is a streaming pipeline that ingests proto
data from a Pub/Sub subscription into a BigQuery table.
Any errors that occur while writing to the BigQuery table are streamed into a
Pub/Sub unprocessed topic.
A JavaScript user-defined function (UDF) can be provided to transform data. Errors while executing
the UDF can be sent to either a separate Pub/Sub topic or the same unprocessed topic as
the BigQuery errors.
Pipeline requirements
- The input Pub/Sub subscription must exist.
- The schema file for the Proto records must exist on Cloud Storage.
- The output Pub/Sub topic must exist.
- The output BigQuery dataset must exist.
- If the BigQuery table exists, it must have a schema matching the proto data regardless of the
createDisposition
value.
Template parameters
Parameter | Description |
---|---|
protoSchemaPath |
The Cloud Storage location of the self-contained proto schema file. For example, gs://path/to/my/file.pb .
This file can be generated with the --descriptor_set_out flag of the protoc command.
The --include_imports flag guarantees that the file is self-contained. |
fullMessageName |
The full proto message name. For example, package.name.MessageName , where package.name is the value
provided for the package statement and not the java_package statement. |
inputSubscription |
The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription> . |
outputTopic |
The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name> . |
outputTableSpec |
The BigQuery output table location. For example, my-project:my_dataset.my_table .
Depending on the createDisposition specified, the output table might be created
automatically using the input schema file. |
preserveProtoFieldNames |
(Optional) true to preserve the original Proto field name in JSON. false to use more standard JSON names.
For example, false would change field_name to fieldName . (Default: false ) |
bigQueryTableSchemaPath |
(Optional) Cloud Storage path to BigQuery schema path. For example, gs://path/to/my/schema.json . If this is
not provided, then the schema is inferred from the Proto schema. |
javascriptTextTransformGcsPath |
(Optional)
The Cloud Storage URI of the .js file that defines the JavaScript user-defined
function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Optional)
The name of the JavaScript user-defined function (UDF) that you want to use.
For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ } , then the function name is
myTransform . For sample JavaScript UDFs, see
UDF Examples.
|
udfOutputTopic |
(Optional) The Pub/Sub topic storing the UDF errors. For example,
projects/<project-id>/topics/<topic-name> . If this is not provided, UDF errors
are sent to the same topic as outputTopic . |
writeDisposition |
(Optional) The BigQuery WriteDisposition .
For example, WRITE_APPEND , WRITE_EMPTY or WRITE_TRUNCATE . Default: WRITE_APPEND . |
createDisposition |
(Optional) The BigQuery CreateDisposition .
For example, CREATE_IF_NEEDED , CREATE_NEVER . Default: CREATE_IF_NEEDED . |
User-defined function
Optionally, you can extend this template by writing a user-defined function (UDF) in JavaScript. The template calls the UDF for each input element. Element payloads are serialized as JSON strings.
To use a UDF, upload the JavaScript file to Cloud Storage and set the following template parameters:
Parameter | Description |
---|---|
javascriptTextTransformGcsPath |
The Cloud Storage location of the JavaScript file. |
javascriptTextTransformFunctionName |
The name of the JavaScript function. |
For more information, see Create user-defined functions for Dataflow templates.
Function specification
The UDF has the following specification:
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
regional endpoint is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub Proto to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ fullMessageName=PROTO_MESSAGE_NAME,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=UNPROCESSED_TOPIC
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-04-18-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: the Cloud Storage path to the Proto schema file (for example,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: the Proto message name (for example,package.name.MessageName
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameUNPROCESSED_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "fullMessageName": "PROTO_MESSAGE_NAME", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "UNPROCESSED_TOPIC" } } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the regional endpoint where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-04-18-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: the Cloud Storage path to the Proto schema file (for example,gs://MyBucket/file.pb
)PROTO_MESSAGE_NAME
: the Proto message name (for example,package.name.MessageName
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameUNPROCESSED_TOPIC
: the Pub/Sub topic to use for the unprocessed queue