Streaming Avro records into BigQuery using Dataflow

Stay organized with collections Save and categorize content based on your preferences.

This tutorial describes storing Avro SpecificRecord objects in BigQuery using Dataflow by automatically generating the table schema and transforming the input elements. This tutorial also showcases the usage of Avro-generated classes to materialize or transmit intermediate data between workers in your Dataflow pipeline.

Apache Avro is a serialization system that relies on schemas to structure data. Because the schema is always present when Avro data is read or written, the serialization is both fast and small. The performance benefits make it a popular choice for passing messages between systems, such as an app sending events to an analytics system through a message broker. You can use the Avro schema to manage your BigQuery data warehouse schema. Converting the Avro schema to BigQuery table structure requires custom code, which is demonstrated in this tutorial.

This tutorial is intended for developers and architects who are interested in using the Avro schema to manage your BigQuery data warehouse schema. This tutorial assumes you're familiar with Avro and Java.

The following diagram illustrates the high-level architecture of this tutorial.

Architecture of an Avro schema managing your BigQuery data warehouse
schema.

This tutorial uses a simple order-processing system with the following steps to demonstrate this architectural pattern:

  • An online app generates events when your customer makes a purchase.
  • An order object contains a unique identifier, the list of items purchased, and a timestamp.
  • A Dataflow pipeline reads the OrderDetails SpecificRecord Avro messages from a Pub/Sub topic.
  • The Dataflow pipeline writes the records to Cloud Storage as Avro files.
  • The OrderDetails class automatically generates the corresponding BigQuery schema.
  • The OrderDetails objects are written to BigQuery using a generic transform function.

Objectives

  • Ingest JSON strings from a Pub/Sub data stream using Dataflow.
  • Transform the JSON objects to objects of Avro-generated classes.
  • Generate BigQuery table schema from the Avro schema.
  • Write the Avro records to a file in Cloud Storage.
  • Write the Avro records to BigQuery.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. Enable the BigQuery, Cloud Storage, and Dataflow APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  7. Enable the BigQuery, Cloud Storage, and Dataflow APIs.

    Enable the APIs

  8. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

Setting up your environment

  1. In Cloud Shell, clone the source repository:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/bigquery-ingest-avro-dataflow-sample.git
    
  2. Open the env.sh file:

    cd $HOME/bigquery-ingest-avro-dataflow-sample
    nano env.sh
    
  3. The env.sh file contains preset default values that you can use for this tutorial, but you can modify these files for your environment.

    # pubsub topic
    MY_TOPIC="avro-records"
    
    # Cloud Storage Bucket
    MY_BUCKET="${GOOGLE_CLOUD_PROJECT}_avro_beam"
    
    # Avro file Cloud Storage output path
    AVRO_OUT="${MY_BUCKET}/out/"
    
    # Region for Cloud Pub/Sub and Cloud Dataflow
    REGION="us-central1"
    
    # Region for BigQuery
    BQ_REGION="US"
    
    # BigQuery dataset name
    BQ_DATASET="sales"
    
    # BigQuery table name
    BQ_TABLE="orders"
    

    Replace the following:

    • avro-records: The name for your Pub/Sub topic.
    • $GOOGLE_CLOUD_PROJECT"_avro_beam: The name of your Cloud Storage bucket that is generated by your Cloud project ID.
    • $MY_BUCKET/""out/": The path for the Cloud Storage bucket that contains your Avro output.
    • us-central1: The region that you use for Pub/Sub and Dataflow. For more information about regions, see Geography and regions.
    • US: The region for BigQuery. For more information about locations, see Dataset locations.
    • sales: The BigQuery dataset name.
    • orders: The BigQuery table name.
    • 1: The maximum number of Dataflow workers.
  4. Set the environment variables:

     . ./env.sh
    

Creating resources

  1. In Cloud Shell, create a Pub/Sub topic:

    gcloud pubsub topics create "${MY_TOPIC}"
    
  2. Create a Cloud Storage bucket:

    gsutil mb -l "${REGION}" -c regional "gs://${MY_BUCKET}"
    

    The Cloud Storage bucket backs up the raw events generated by the app. The bucket can also serve as an alternate source for offline analytics and validation by using Spark and Hadoop jobs run on Dataproc.

  3. Create a BigQuery dataset:

    bq --location="${BQ_REGION}" mk --dataset "${GOOGLE_CLOUD_PROJECT}:${BQ_DATASET}"
    

    A BigQuery dataset contains tables and views in a single region or a geography containing multiple regions. For more information, see Creating datasets.

Starting the Beam Dataflow app

  1. In Cloud Shell, deploy and run the pipeline on the Dataflow runner:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/BeamAvro
    
    mvn clean package
    
    java -cp target/BeamAvro-bundled-1.0-SNAPSHOT.jar \
    com.google.cloud.solutions.beamavro.AvroToBigQuery \
    --runner=DataflowRunner \
    --project="${GOOGLE_CLOUD_PROJECT}" \
    --stagingLocation="gs://${MY_BUCKET}/stage/" \
    --tempLocation="gs://${MY_BUCKET}/temp/" \
    --inputPath="projects/${GOOGLE_CLOUD_PROJECT}/topics/${MY_TOPIC}" \
    --workerMachineType=n1-standard-1 \
    --region="${REGION}" \
    --dataset="${BQ_DATASET}" \
    --bigQueryTable="${BQ_TABLE}" \
    --outputPath="gs://${MY_BUCKET}/out/" \
    --jsonFormat=false \
    --avroSchema="$(<../orderdetails.avsc)"
    

    The output contains your app ID. Make a note of your app ID because it's needed later in the tutorial.

  2. In the Google Cloud console, go to Dataflow.

    Go to Dataflow

  3. To view the pipeline status, click your app ID. The pipeline status is displayed as a graph.

    Graph of pipeline status.

Review the code

In the AvroToBigQuery.java file, the pipeline options with the required parameters passed through the command-line parameters. The streaming mode option is also enabled. The BigQuery table schema is auto-generated from the Avro schema by using Beam Schema by BigQuery IO

For the Avro input format, objects are read from Pub/Sub. If the input format is JSON, the events are read and transformed into Avro objects.

Schema avroSchema = new Schema.Parser().parse(options.getAvroSchema());

if (options.getJsonFormat()) {
  return input
      .apply("Read Json", PubsubIO.readStrings().fromTopic(options.getInputPath()))
      .apply("Make GenericRecord", MapElements.via(JsonToAvroFn.of(avroSchema)));
} else {
  return input.apply("Read GenericRecord", PubsubIO.readAvroGenericRecords(avroSchema)
      .fromTopic(options.getInputPath()));
}

The pipeline branches out to perform two separate writes:

The BigQueryIO writes the Avro objects to BigQuery by internally transforming them to TableRow objects using Beam Schemas. Refer to the mapping between BigQuery datatypes and Avro data types.

View results in BigQuery

To test the pipeline, start the gen.py script. This script simulates the generation of order events and pushes them to the Pub/Sub topic.

  1. In Cloud Shell, change to the sample event generator script directory and run the script:

    cd $HOME/bigquery-ingest-avro-dataflow-sample/generator
    python3 -m venv env
    . ./env/bin/activate
    pip install -r requirements.txt
    python3 gen.py -p $GOOGLE_CLOUD_PROJECT -t $MY_TOPIC -n 100 -f avro
    
  2. In the Google Cloud console, go to BigQuery.

    Go to BigQuery

  3. To view the table schema, click the sales dataset, and then select the orders table. If you modified the default environment variables in env.sh, the dataset and table names might be different.

    Table schema of the `orders` table.

  4. To view some sample data, run a query in the Query Editor:

    SELECT * FROM sales.orders LIMIT 5
    

    Query result of sample data.

    The BigQuery table schema is auto-generated from the Avro records and the data is automatically converted to the BigQuery table structure.

Clean up

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete the individual resources

  1. Follow these instructions to stop the Dataflow job.

  2. Delete the Cloud Storage bucket:

    gsutil rm -r gs://$MY_BUCKET
    

What's next