Real-time data processing with IoT Core
Contributed by the Google Cloud community. Not official Google documentation.
The setup described in this tutorial addresses following scenario: At industrial facilities, sensors are installed to monitor the equipment on site. Sensor data is continuously streamed to the cloud. There it is handled by different components for various purposes, such as real-time monitoring and alerts, long-term data storage for analysis, performance improvement, and model training.
Feasible scenarios are:
- Geographically dispersed facilities with centralized monitoring system.
- Monitoring of remote unmanned sites—power transformer stations, mobile base stations, etc.
In this tutorial, the sensors are simulated by a Java application script that continuously generates random measurement points and sends them to the cloud.
This tutorial focuses on two aspect of the monitoring application setup:
- Using IoT Core, a cloud managed service for IoT, to enforce structured handling of sensor devices' security keys and metadata, and secured delivery of measurement data between sensors and cloud.
- In-stream data handling in the cloud, where two parallel processing pipelines separate real-time monitoring and alerting from the less critical need for data storage and analysis.
Technical overview
This tutorial demonstrates how to push updates from Message Queueing Telemetry Transport (MQTT) devices to Google Cloud and process them in real time.
The tutorial includes sample code to show two kinds of data processing approaches that use Google Cloud products:
- A function deployed in Cloud Functions transforms data and logs it to Cloud Logging.
- A streaming application deployed in Dataflow transforms data and inserts it into BigQuery.
In both cases, sample temperature data is collected that is generated from simulated devices. This data is transformed into other data formats, and is passed to another Google Cloud product for further data processing and analysis. Cloud Functions is suitable for simple Extract/Transform/Load (ETL) processing, while Cloud Dataflow can handle more sophisticated data pipelines that involve multiple transformations, joins, windowing, and so on.
IoT Core can not only receive data from MQTT clients, but also can send configuration data to clients. It can be used to control behavior of devices or the surrounding environment.
Data structure
The sample MQTT client simulates devices and generates sample data with the following attributes:
DeviceId
: A unique identifier for individual devices.Timestamp
: A timestamp for when a temperature is measured.Temperature
: The measured temperature from the device.Coordinates
: The longitude and latitude of the device.
Architecture
The sample MQTT client simulates a device and sends sample data to IoT Core, which transforms and redirects requests to a Pub/Sub topic. After the data is stored in Pub/Sub, it is retrieved by two subscribers: a function in Cloud Functions and a streaming job running in Dataflow.
This tutorial shows how data is transformed and processed in Cloud Functions and Dataflow.
Objectives
This tutorial demonstrates how to:
- Deploy a function to Cloud Functions that transforms temperature data into JSON format and logs it to Stackdriver Logging.
- Deploy a streaming application to Dataflow that transforms temperature data into BigQuery row format and inserts it into BigQuery.
- Run an MQTT client that generates simulated temperature and coordinates, and then submits the data to IoT Core.
Cost
This tutorial uses billable components of Google Cloud, including:
- BigQuery
- Dataflow
- Cloud Functions
- IoT Core
- Pub/Sub
- Cloud Storage
- Compute Engine
- Datastore
You can use the Pricing Calculator to generate a cost estimate that is based on your projected usage.
Before you begin
Install software and download sample code
Make sure you have the following software installed:
Clone the following repository and change to into the directory for this tutorial's code:
git clone https://github.com/GoogleCloudPlatform/community.git
cd tutorials/cloud-iot-rtdp
tutorials/cloud-iot-rtdp
contains the following directory structure:
bin/
: script filesfunction/
: JavaScript filestreaming/
: Java streaming application
Configure a Google Cloud project and enable APIs
- Create or select a Google Cloud project.
- Enable billing for your project.
Enable the following APIs:
- IoT Core
- Cloud Functions
- Dataflow
Create a Cloud Storage bucket
- Open the Cloud Storage console.
Create a Cloud Storage bucket.
The bucket name must be unique across Cloud Storage.
Click Create folder, enter a temporary folder name, and then click Create.
Set environment variables
To make it easier to run commands, you can set environment variables so that you don't have to supply options for some values that you’ll use repeatedly. You will create the corresponding resources in later steps.
- Open Cloud Shell
Set the following environment variables:
export PROJECT=[PROJECT_ID] export REGION=[REGION_NAME] export ZONE=[ZONE_NAME] export BUCKET=[BUCKET_NAME] export REGISTRY=[CLOUD_IOT_CORE_REGISTRY_ID] export TOPIC=[CLOUD_PUBSUB_TOPIC_NAME]
Configure IoT Core
In this section, you create a topic in Pub/Sub and configure IoT Core to receive data from MQTT clients.
- Open the Pub/Sub console
- In the left navigation menu, click the Topics menu.
Click Create a topic. In the Name box, enter the topic name that you assigned earlier to the environment variable, and then click Create.
Open the IoT Core console.
Click Create device registry.
In the Registry ID box, type myregistry. Select a Google Cloud region close to you, and select the Pub/Sub topic that you just created.
When you're done, click Create.
In the Grant permission to service account dialog box, click Continue.
In Cloud Shell, generate a new public/private key pair, which will override the checked in pair:
cd bin ./create_cert.sh
In Cloud Shell, register devices in the device registry:
bin/register.sh
Create threshold values in Datastore
In this section, you insert threshold values for each of the devices, registered in the IoT Core Device Manager, in Datastore.
In Cloud Shell, run a Python script to insert the device objects into Datastore:
export GCLOUD_PROJECT=$PROJECT virtualenv env && source env/bin/activate pip install google-cloud-datastore cd bin python create_temp_alert_store.py deactivate
Open the Datastore console.
Confirm that the device entities have been created with the corresponding threshold temperature value:
Deploy a Cloud Function
In this section, you set up a function that logs data that is sent to IoT Core and is retrieved through Pub/Sub. It also compares the temperature received against the threshold value in Datastore. If the threshold is exceeded, an error is logged.
In Cloud Shell, deploy a function to Cloud Functions:
cd function gcloud beta functions deploy iot --stage-bucket $BUCKET --trigger-topic $TOPIC
You see results similar to the following:
/ [1 files][ 292.0 B/ 292.0 B] Operation completed over 1 objects/292.0 B. Deploying function (may take a while - up to 2 minutes)...done. availableMemoryMb: 256 entryPoint: iot eventTrigger: ...
Open the Cloud Functions console.
Confirm that you created a function:
Deploy a streaming application to Dataflow
In this section, you deploy a Java-based streaming application that transforms data that is retrieved from Pub/Sub and loads it into a BigQuery table.
In Cloud Shell, build and submit a streaming job:
cd bin ./job.sh
The results look similar to the following:
[INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building cloud-iot-rtdp 0.0.1-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ cloud-iot-rtdp --- ...
Open the Dataflow console.
Confirm that a streaming job is running:
Generate simulated temperature and coordinates data
Now you can run an MQTT client that generates simulated data on temperature and coordinates and then submits it to IoT Core.
In Cloud Shell, run an MQTT client to generate simulated data:
cd bin ./run.sh
You see results similar to the following:
[INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building cloud-iot-rtdp 0.0.1-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ cloud-iot-rtdp --- ...
Open the Cloud Functions console.
To confirm that a function is processing data, click the More options icon on the right side of your function, and then click View logs:
You see results similar to the following:
Open the Dataflow console.
To confirm that a streaming Dataflow job is processing data, click the job ID:
Open BigQuery.
Click the Compose Query button to open the query editor.
To confirm that the temperature data is stored in a BigQuery table, run the following query in the editor:
SELECT count(*) from [[PROJECT_ID]:iotds.temp_sensor]
If everything is working, you should see a single row in the results that displays a count of all the records that have been processed.
Handling alerts
Temperature measurements that are above the configured threshold for each device are logged as errors by Cloud Functions. You can view and analyse these in the Error console.
To active the error notifications, follow the documentation on Error reporting notifications.
Next steps
You can learn more about IoT, data processing, and visualization from the following links:
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see our Site Policies. Java is a registered trademark of Oracle and/or its affiliates.