Pub/Sub Avro to BigQuery template

The Pub/Sub Avro to BigQuery template is a streaming pipeline that ingests Avro data from a Pub/Sub subscription into a BigQuery table. Any errors which occur while writing to the BigQuery table are streamed into a Pub/Sub unprocessed topic.

Pipeline requirements

  • The input Pub/Sub subscription must exist.
  • The schema file for the Avro records must exist on Cloud Storage.
  • The unprocessed Pub/Sub topic must exist.
  • The output BigQuery dataset must exist.

Template parameters

Parameter Description
schemaPath The Cloud Storage location of the Avro schema file. For example, gs://path/to/my/schema.avsc.
inputSubscription The Pub/Sub input subscription to read from. For example, projects/<project>/subscriptions/<subscription>.
outputTopic The Pub/Sub topic to use for unprocessed records. For example, projects/<project-id>/topics/<topic-name>.
outputTableSpec The BigQuery output table location. For example, <my-project>:<my-dataset>.<my-table>. Depending on the createDisposition specified, the output table may be created automatically using the user provided Avro schema.
writeDisposition Optional: The BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default: WRITE_APPEND
createDisposition Optional: The BigQuery CreateDisposition. For example, CREATE_IF_NEEDED, CREATE_NEVER. Default: CREATE_IF_NEEDED
useStorageWriteApi Optional: If true, the pipeline uses the BigQuery Storage Write API. The default value is false. For more information, see Using the Storage Write API.
useStorageWriteApiAtLeastOnce Optional: When using the Storage Write API, specifies the write semantics. To use at-least-once semantics, set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
numStorageWriteApiStreams Optional: When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
storageWriteApiTriggeringFrequencySec Optional: When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Pub/Sub Avro to BigQuery template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
  • BIGQUERY_TABLE: the BigQuery output table name
  • DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • SCHEMA_PATH: the Cloud Storage path to the Avro schema file (for example, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: the Pub/Sub input subscription name
  • BIGQUERY_TABLE: the BigQuery output table name
  • DEADLETTER_TOPIC: the Pub/Sub topic to use for the unprocessed queue

What's next