This page shows you how to create a streaming pipeline using a Google-provided Dataflow template. Specifically, this page uses the Pub/Sub Topic to BigQuery template as an example.
The Pub/Sub Topic to BigQuery template is a streaming pipeline that reads JSON-formatted messages from a Pub/Sub topic and writes them to a BigQuery table.
Before you begin
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud Console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Cloud project. Learn how to confirm that billing is enabled for your project.
- Enable the Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager APIs.
- Create a Cloud Storage bucket:
- In the Cloud Console, go to the Cloud Storage Browser page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select the following: Standard.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
Create a Pub/Sub topic
- In the Cloud Console, go to the Pub/Sub Topics page.
Go to Pub/Sub Topics - Click addCreate topic.
- In the Topic ID field, provide a unique topic name. For example,
taxirides-realtime
. - Click Create topic.
Create a BigQuery dataset and table
Create a BigQuery dataset and table with the appropriate schema for your Pub/Sub topic using the Cloud Console.
In this example, the name of the dataset is taxirides
and the name of the
table is realtime
. To create this dataset and table:
- In the Cloud Console, go to the BigQuery page.
Go to BigQuery - In the Explorer panel, select the project where you want to create the dataset.
- In the details panel, click Create dataset.
- On the Create dataset page:
- For Dataset ID, enter
taxirides
. - For Data location, choose United States (US). Currently, public datasets are
stored in the
US
multi-region location. For simplicity, place your dataset in the same location.
- For Dataset ID, enter
- Leave the other default settings in place and click Create dataset.
- In the Explorer panel, expand your project.
- Next to your dataset, click the view actions icon, then click Open.
- In the details panel, click Create table.
- On the Create table page:
- In the Source section, for Create table from, select Empty table.
- In the Destination section, for Table name, enter
realtime
. - In the Schema section, click the Edit as text toggle and paste the following schema definition into the box:
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- In the Partitioning and cluster settings section, for Partitioning, select the timestamp field.
- Leave the other default settings in place and click Create table.
Run the pipeline
Run a streaming pipeline using the Google-provided Pub/Sub Topic to BigQuery template.
- In the Cloud Console, go to the Dataflow Jobs page.
Go to Dataflow Jobs - Click Create job from template.
- Enter a Job name for your Dataflow job.
- For Dataflow template, select the Pub/Sub Topic to BigQuery template.
- For Input Pub/Sub topic, enter
projects/pubsub-public-data/topics/taxirides-realtime
. The pipeline gets incoming data from the input topic.This is a public topic based on the NYC Taxi & Limousine Commission's open dataset expanded with additional routing information using the Google Maps Directions API and interpolated timestamps to simulate a real-time scenario. The following is a sample message from this topic, in the JSON format:
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- For BigQuery output table, enter
PROJECT_ID:taxirides.realtime
. ReplacePROJECT_ID
with the project ID of the project where you created your BigQuery dataset. - For Temporary location, enter
gs://
. ReplaceBUCKET_NAME
/temp/BUCKET_NAME
with the name of your Cloud Storage bucket.temp
is a folder in that bucket for storing temporary files, like the staged pipeline job. - Click Run job.
- View the data written to BigQuery. Go to the BigQuery page.
Go to BigQueryYou can submit queries using standard SQL. For example, the following query selects all rows that have been added in the past 24 hours:
SELECT * FROM `
PROJECT_ID
.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000Replace
PROJECT_ID
with the project ID of the project where you created your BigQuery dataset.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used in this quickstart, follow these steps.
- In the Cloud Console, go to the Dataflow Jobs page.
Go to Dataflow Jobs - In the Cloud Console, go to the BigQuery page.
Go to BigQuery- In the Explorer panel, expand your project.
- Next to your dataset, click the view actions icon, then click Open.
- In the details panel, click Delete dataset.
- In the Delete dataset dialog, confirm the delete command by typing
the name of your dataset (
taxirides
) and clicking Delete.