The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.
Pipeline requirements
- The input Pub/Sub subscription must exist.
- The schema file for the Avro records must exist on Cloud Storage.
- The unprocessed Pub/Sub topic must exist.
- The output BigQuery dataset must exist.
Template parameters
Required parameters
- schemaPath: The Cloud Storage location of the Avro schema file. For example,
gs://path/to/my/schema.avsc
. - inputSubscription: The Pub/Sub input subscription to read from. For example,
projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_ID>
. - outputTableSpec: The BigQuery output table location to write the output to. For example,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
.Depending on thecreateDisposition
specified, the output table might be created automatically using the user provided Avro schema. - outputTopic: The Pub/Sub topic to use for unprocessed records. For example,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
.
Optional parameters
- useStorageWriteApiAtLeastOnce: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once semantics, set the parameter to
false
. This parameter applies only whenuseStorageWriteApi
istrue
. The default value isfalse
. - writeDisposition: The BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload) value. For example,
WRITE_APPEND
,WRITE_EMPTY
, orWRITE_TRUNCATE
. Defaults toWRITE_APPEND
. - createDisposition: The BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). For example,
CREATE_IF_NEEDED
andCREATE_NEVER
. Defaults toCREATE_IF_NEEDED
. - useStorageWriteApi: If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is
false
. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: When using the Storage Write API, specifies the number of write streams. If
useStorageWriteApi
istrue
anduseStorageWriteApiAtLeastOnce
isfalse
, then you must set this parameter. Defaults to: 0. - storageWriteApiTriggeringFrequencySec: When using the Storage Write API, specifies the triggering frequency, in seconds. If
useStorageWriteApi
istrue
anduseStorageWriteApiAtLeastOnce
isfalse
, then you must set this parameter.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Pub/Sub Avro to BigQuery template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
Replace the following:
JOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: the Cloud Storage path to the Avro schema file (for example,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameDEADLETTER_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
Replace the following:
JOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
SCHEMA_PATH
: the Cloud Storage path to the Avro schema file (for example,gs://MyBucket/file.avsc
)SUBSCRIPTION_NAME
: the Pub/Sub input subscription nameBIGQUERY_TABLE
: the BigQuery output table nameDEADLETTER_TOPIC
: the Pub/Sub topic to use for the unprocessed queue
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.