Quickstart Using Templates

This page shows you how to create a streaming pipeline using a Google-provided Dataflow template. Specifically, this page uses the Pub/Sub Topic to BigQuery template as an example.

The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.

  4. Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager APIs.

    Enable the APIs

  5. Create a Cloud Storage bucket:
    1. In the Cloud Console, go to the Cloud Storage Browser page.

      Go to Browser

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.

Create a Pub/Sub topic

  1. In the Cloud Console, go to the Pub/Sub Topics page.
    Go to Pub/Sub Topics
  2. Click Create topic.
  3. In the Topic ID field, provide a unique topic name. For example, taxirides-realtime.
  4. Click Create topic.

Create a BigQuery dataset and table

Create a BigQuery dataset and table with the appropriate schema for your Pub/Sub topic using the Cloud Console.

In this example, the name of the dataset is taxirides and the name of the table is realtime. To create this dataset and table:

  1. In the Cloud Console, go to the BigQuery page.
    Go to BigQuery
  2. In the Explorer panel, select the project where you want to create the dataset.
  3. In the details panel, click Create dataset.
  4. On the Create dataset page:
    1. For Dataset ID, enter taxirides.
    2. For Data location, choose United States (US). Currently, public datasets are stored in the US multi-region location. For simplicity, place your dataset in the same location.
  5. Leave the other default settings in place and click Create dataset.
  6. In the Explorer panel, expand your project.
  7. Next to your dataset, click the view actions icon, then click Open.
  8. In the details panel, click Create table.
  9. On the Create table page:
    1. In the Source section, for Create table from, select Empty table.
    2. In the Destination section, for Table name, enter realtime.
    3. In the Schema section, click the Edit as text toggle and paste the following schema definition into the box:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. In the Partitioning and cluster settings section, for Partitioning, select the timestamp field.
  10. Leave the other default settings in place and click Create table.

Run the pipeline

Run a streaming pipeline using the Google-provided Pub/Sub Topic to BigQuery template.

  1. In the Cloud Console, go to the Dataflow Jobs page.
    Go to Dataflow Jobs
  2. Click Create job from template.
  3. Enter a Job name for your Dataflow job.
  4. For Dataflow template, select the Pub/Sub Topic to BigQuery template.
  5. For Input Pub/Sub topic, enter projects/pubsub-public-data/topics/taxirides-realtime. The pipeline gets incoming data from the input topic.

    This is a public topic based on the NYC Taxi & Limousine Commission's open dataset expanded with additional routing information using the Google Maps Directions API and interpolated timestamps to simulate a real-time scenario. The following is a sample message from this topic, in the JSON format:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  6. For BigQuery output table, enter PROJECT_ID:taxirides.realtime. Replace PROJECT_ID with the project ID of the project where you created your BigQuery dataset.
  7. For Temporary location, enter gs://BUCKET_NAME/temp/. Replace BUCKET_NAME with the name of your Cloud Storage bucket. temp is a folder in that bucket for storing temporary files, like the staged pipeline job.
  8. Click Run job.
  9. View the data written to BigQuery. Go to the BigQuery page.
    Go to BigQuery

    You can submit queries using standard SQL. For example, the following query selects all rows that have been added in the past 24 hours:

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    Replace PROJECT_ID with the project ID of the project where you created your BigQuery dataset.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.

  1. In the Cloud Console, go to the Dataflow Jobs page.
    Go to Dataflow Jobs
    1. Select your streaming job from the job list.
    2. In the navigation, click Stop.
    3. In the Stop job dialog, choose to either Cancel or Drain your pipeline, then click Stop job.
  2. In the Cloud Console, go to the BigQuery page.
    Go to BigQuery
    1. In the Explorer panel, expand your project.
    2. Next to your dataset, click the view actions icon, then click Open.
    3. In the details panel, click Delete dataset.
    4. In the Delete dataset dialog, confirm the delete command by typing the name of your dataset (taxirides) and clicking Delete.

What's next