Locally connected microcontrollers and real-time analytics (part 2 of 2)
Contributed by Google employees.
This two-part tutorial demonstrates how to control an Arduino Microcontroller with a Raspberry Pi, connect the devices to IoT Core, post sensor data from the devices, and analyze the data in real time. Part 1 of the tutorial created a hybrid device, combining the strengths of a Linux-based microprocessor with internet connectivity and TLS stack, together with a constrained microcontroller for analog I/O.
Part 2 objectives
- Process sensor data from Pub/Sub using Dataflow.
- Store processed sensor data in BigQuery.
- Create a report dashboard using Google Data Studio.
- Create a notebook on Datalab.
Before you begin
This tutorial uses billable components of Google Cloud, including the following:
- IoT Core
Enable Dataflow for your project
Perform all of the steps in the "Before you begin" section of the Dataflow Quickstart—through creating a Cloud Storage bucket—on your local development environment (e.g., laptop).
Enable BigQuery for your project
Perform all of the steps in the "Before you begin" section of the BigQuery Quickstart.
Install environment dependencies and the Cloud SDK
Clone the source repository:
$ git clone https://github.com/GoogleCloudPlatform/community.git
Change to the directory for this tutorial:
$ cd community/tutorials/ardu-pi-serial-part-2
Create and activate the virtual environment:
$ virtualenv my_virtual_env $ . ./my_virtual_env/bin/activate $ pip install -r beam-requirements.txt
Follow the steps in this guide to install the Cloud SDK.
Create a BigQuery dataset
BigQuery is Google's fully managed serverless and highly scalable enterprise data warehouse solution.
A BigQuery dataset contains tables and views in a specified single region or a geography containing multiple regions. Follow the instructions to create a dataset in your project. The dataset location can only be specified while creating it. More details are available here.
Start the Dataflow job
Select your preferred Dataflow service region.
Run the command below to start the Apache Beam pipeline on the Dataflow runner.
$ python -m beam-solarwind --project [project_name] \ --topic [pub_sub_topic_name (e.g., projects/my-project/topics/my-topic)] \ --temp_location gs://[cloud_storage_bucket]/tmp \ --setup_file ./setup.py \ --region [your_preferred_region] \ --runner DataflowRunner \ --output "[bigquery_table_dataset].[table_name]" \ --output_avg "[bigquery_average_table_dataset].[table_avg]"
Go to the Dataflow interface in the GCP Console and select your newly created Dataflow job to see your pipeline.
The following diagram shows an example Dataflow pipeline:
The first part of the Dataflow job sets up the pipeline options with the required parameters passed through the
command-line parameters, as shown above. The
streaming mode option is also enabled. To allow access to the modules
available in the main session, the
save_main_session flag is set. After this, the beam pipeline object is created.
args, pipeline_args = parser.parse_known_args(argv) options = PipelineOptions(pipeline_args) options.view_as(SetupOptions).save_main_session = True options.view_as(StandardOptions).streaming = True p = beam.Pipeline(options=options)
The first two steps of the Dataflow pipeline read incoming events from Pub/Sub and then parse the JSON text:
records = (p | 'Read from PubSub' >> beam.io.ReadFromPubSub( topic=args.topic) | 'Parse JSON to Dict' >> beam.Map( json.loads))
There are two branches at the next step. The one on the right in the figure above writes the incoming stream of events to the raw BigQuery table. The table is created if it does not exist.
# Write to the raw table records | 'Write to BigQuery' >> beam.io.WriteToBigQuery( args.output, schema=Schema.get_warehouse_schema(), create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
The one on the left aggregates the events and writes them to the BigQuery average table.
Use the timestamp in the event object and emit it with the object. This is then used to create a sliding window of 300 seconds that starts every 30 seconds.
records | 'Add timestamp' >> beam.ParDo(AddTimestampToDict()) | 'Window' >> beam.WindowInto(beam.window.SlidingWindows( 300, 60, offset=0))
At the next stage, the record is emitted as a
key-valuetuple, in which the
clientidis the key and the object is the value.
'Dict to KeyValue' >> beam.ParDo(AddKeyToDict())
The elements are grouped by the clientid key, and the averages of all the metrics (temperature, pressure, etc.) are calculated.
'Group by Key' >> beam.GroupByKey() | 'Average' >> beam.ParDo(CountAverages())
The calculated average values are written to the BigQuery average table. The table is created if it does not exist.
'Write Avg to BigQuery' >> beam.io.WriteToBigQuery( args.output_avg, schema=Schema.get_avg_schema(), create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=BigQueryDisposition.WRITE_APPEND))
View results in BigQuery
- Ensure that the client from Part 1 is running and posting data to Pub/Sub through IoT Core.
- Go to the BigQuery UI in the GCP Console.
- In the BigQuery menu, select your project
- Select the dataset
- Select the table
- View the table schema.
- See table details.
Run the following queries to see the latest data:
Select the latest 20 records from the raw table:
select * from [my_dataset].[my_table] order by timestamp DESC limit 20;
The average table adds a single row for each time window. Run the query below to select the latest 20 records.
select * from [my_dataset].[my_avg_table] order by timestamp DESC limit 20;
BigQuery table schema:
BigQuery table preview:
Create a Data Studio report
Data Studio is a managed tool that allows creation and sharing of dashboards and reports.
- Go to the Data Studio interface.
- Click the + button to create a new blank report.
Add a new Data Source:
- Click the + Create New Data Source button
- Select the BigQuery by Google connector
- Select the BigQuery project
my_dataset, and table
my_table, and then click Connect.
- All the schema fields (clientid, temperature, pressure, etc.) would be auto-selected.
- Click Add to report and confirm by clicking the button in the popup.
Create a new chart:
- Click Add a Chart in the menu bar.
- Select a line chart.
- Select Date and Time range dimensions as the timestamp column.
- Select the
clientidfield as the breakdown dimension.
- Select a metric (e.g., temperature) and aggregation (e.g., AVG).
- Add a Text Box and label the chart.
- Repeat the steps above for additional metrics.
Data Studio report:
Create a Datalab notebook
Datalab is an interactive tool for data exploration that is built on Jupyter. The Jupyter Notebook is an open-source web application that allows you to create and share documents that contain live code, equations, visualizations, and narrative text.
- Go to the Datalab quickstart and perform all of the steps in the "Before you begin" section.
- Go to the notebooks page.
- Click the Upload button to add
- Click the notebook to open and edit it.
- Set the project ID (e.g.,
- Set the dataset name (e.g.,
- Set the raw table name (e.g.,
- Set the average table name (e.g.,
- Set the location (e.g.,
my_location). Important: The location must match that of the datasets referenced in the query.
- Set the client id (e.g.,
- Set the project ID (e.g.,
- From the Kernel menu in the menu bar, select python3.
- Click Run in the menu bar to execute the notebook.
- Clean up the Datalab environment.
- Delete the Data Studio report:
- Go to the Data Studio interface.
- In the menu section, click the three-dot menu next to the report name.
- Select Remove.
- Stop the Dataflow job.
Delete the Cloud Storage bucket:
$ gsutil rm -r gs://[cloud_storage_bucket]
Delete the BigQuery dataset:
$ bq rm -r [my_dataset] rm: remove dataset '[my_project_id]:[my_dataset]'? (y/N) y
To delete a project, do the following:
- In the GCP Console, go to the Projects page.
- In the project list, select the project you want to delete and click Delete project.
- In the dialog, type the project ID, and then click Shut down to delete the project.
- Check out the new tutorial on using the Sigfox Sens'it Discovery V3 device with this integration and learning how to encode and decode its binary data and configuration payloads, as well as store the data in real time in BigQuery.
- Learn more about IoT on Google Cloud.
- Learn more about Big data analytics on Google Cloud, to turn your IoT data into actionable insights.
- Try out other GCP features for yourself. Have a look at our tutorials.