Edit on GitHub
Report issue
Page history

Real-time data processing with IoT Core

Author(s): @teppeiy ,   Published: 2018-04-10

Contributed by the Google Cloud community. Not official Google documentation.

The setup described in this tutorial addresses following scenario: At industrial facilities, sensors are installed to monitor the equipment on site. Sensor data is continuously streamed to the cloud. There it is handled by different components for various purposes, such as real-time monitoring and alerts, long-term data storage for analysis, performance improvement, and model training.

Feasible scenarios are:

  • Geographically dispersed facilities with centralized monitoring system.
  • Monitoring of remote unmanned sites—power transformer stations, mobile base stations, etc.

In this tutorial, the sensors are simulated by a Java application script that continuously generates random measurement points and sends them to the cloud.

This tutorial focuses on two aspect of the monitoring application setup:

  • Using IoT Core, a cloud managed service for IoT, to enforce structured handling of sensor devices' security keys and metadata, and secured delivery of measurement data between sensors and cloud.
  • In-stream data handling in the cloud, where two parallel processing pipelines separate real-time monitoring and alerting from the less critical need for data storage and analysis.

Technical overview

This tutorial demonstrates how to push updates from Message Queueing Telemetry Transport (MQTT) devices to Google Cloud and process them in real time.

The tutorial includes sample code to show two kinds of data processing approaches that use Google Cloud products:

  1. A function deployed in Cloud Functions transforms data and logs it to Cloud Logging.
  2. A streaming application deployed in Dataflow transforms data and inserts it into BigQuery.

In both cases, sample temperature data is collected that is generated from simulated devices. This data is transformed into other data formats, and is passed to another Google Cloud product for further data processing and analysis. Cloud Functions is suitable for simple Extract/Transform/Load (ETL) processing, while Cloud Dataflow can handle more sophisticated data pipelines that involve multiple transformations, joins, windowing, and so on.

IoT Core can not only receive data from MQTT clients, but also can send configuration data to clients. It can be used to control behavior of devices or the surrounding environment.

Data structure

The sample MQTT client simulates devices and generates sample data with the following attributes:

  • DeviceId: A unique identifier for individual devices.
  • Timestamp: A timestamp for when a temperature is measured.
  • Temperature: The measured temperature from the device.
  • Coordinates: The longitude and latitude of the device.

Architecture

The sample MQTT client simulates a device and sends sample data to IoT Core, which transforms and redirects requests to a Pub/Sub topic. After the data is stored in Pub/Sub, it is retrieved by two subscribers: a function in Cloud Functions and a streaming job running in Dataflow.

This tutorial shows how data is transformed and processed in Cloud Functions and Dataflow.

Architecture

Objectives

This tutorial demonstrates how to:

  • Deploy a function to Cloud Functions that transforms temperature data into JSON format and logs it to Stackdriver Logging.
  • Deploy a streaming application to Dataflow that transforms temperature data into BigQuery row format and inserts it into BigQuery.
  • Run an MQTT client that generates simulated temperature and coordinates, and then submits the data to IoT Core.

Cost

This tutorial uses billable components of Google Cloud, including:

You can use the Pricing Calculator to generate a cost estimate that is based on your projected usage.

Before you begin

Install software and download sample code

Make sure you have the following software installed:

Clone the following repository and change to into the directory for this tutorial's code:

git clone https://github.com/GoogleCloudPlatform/community.git
cd tutorials/cloud-iot-rtdp

tutorials/cloud-iot-rtdp contains the following directory structure:

  • bin/: script files
  • function/: JavaScript file
  • streaming/: Java streaming application

Configure a Google Cloud project and enable APIs

  1. Create or select a Google Cloud project.
  2. Enable billing for your project.
  3. Enable the following APIs:

    1. IoT Core
    2. Cloud Functions
    3. Dataflow

Create a Cloud Storage bucket

  1. Open the Cloud Storage console.
  2. Create a Cloud Storage bucket.

    Storage

    The bucket name must be unique across Cloud Storage.

  3. Click Create folder, enter a temporary folder name, and then click Create.

    folder

Set environment variables

To make it easier to run commands, you can set environment variables so that you don't have to supply options for some values that you’ll use repeatedly. You will create the corresponding resources in later steps.

  1. Open Cloud Shell
  2. Set the following environment variables:

    export PROJECT=[PROJECT_ID]
    export REGION=[REGION_NAME]
    export ZONE=[ZONE_NAME]
    export BUCKET=[BUCKET_NAME]
    export REGISTRY=[CLOUD_IOT_CORE_REGISTRY_ID]
    export TOPIC=[CLOUD_PUBSUB_TOPIC_NAME]
    

Configure IoT Core

In this section, you create a topic in Pub/Sub and configure IoT Core to receive data from MQTT clients.

  1. Open the Pub/Sub console
  2. In the left navigation menu, click the Topics menu.
  3. Click Create a topic. In the Name box, enter the topic name that you assigned earlier to the environment variable, and then click Create.

    topic

  4. Open the IoT Core console.

  5. Click Create device registry.

  6. In the Registry ID box, type myregistry. Select a Google Cloud region close to you, and select the Pub/Sub topic that you just created.

    registry

  7. When you're done, click Create.

  8. In the Grant permission to service account dialog box, click Continue.

  9. In Cloud Shell, generate a new public/private key pair, which will override the checked in pair:

    cd bin
    ./create_cert.sh
    
  10. In Cloud Shell, register devices in the device registry:

    bin/register.sh
    

Create threshold values in Datastore

In this section, you insert threshold values for each of the devices, registered in the IoT Core Device Manager, in Datastore.

  1. In Cloud Shell, run a Python script to insert the device objects into Datastore:

    export GCLOUD_PROJECT=$PROJECT
    virtualenv env && source env/bin/activate
    pip install google-cloud-datastore
    cd bin
    python create_temp_alert_store.py
    deactivate
    
  2. Open the Datastore console.

  3. Confirm that the device entities have been created with the corresponding threshold temperature value:

    data_store_confirm

Deploy a Cloud Function

In this section, you set up a function that logs data that is sent to IoT Core and is retrieved through Pub/Sub. It also compares the temperature received against the threshold value in Datastore. If the threshold is exceeded, an error is logged.

  1. In Cloud Shell, deploy a function to Cloud Functions:

    cd function
    gcloud beta functions deploy iot --stage-bucket $BUCKET --trigger-topic $TOPIC
    

    You see results similar to the following:

    / [1 files][  292.0 B/  292.0 B]
    Operation completed over 1 objects/292.0 B.
    Deploying function (may take a while - up to 2 minutes)...done.
    availableMemoryMb: 256
    entryPoint: iot
    eventTrigger:
    ...
    
  2. Open the Cloud Functions console.

  3. Confirm that you created a function:

    function_confirm

Deploy a streaming application to Dataflow

In this section, you deploy a Java-based streaming application that transforms data that is retrieved from Pub/Sub and loads it into a BigQuery table.

  1. In Cloud Shell, build and submit a streaming job:

    cd bin
    ./job.sh
    

    The results look similar to the following:

    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------------------------------------------------------------
    [INFO] Building cloud-iot-rtdp 0.0.1-SNAPSHOT
    [INFO] ------------------------------------------------------------------------
    [INFO]
    [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ cloud-iot-rtdp ---
    ...
    
  2. Open the Dataflow console.

  3. Confirm that a streaming job is running:

    stream_confirm

Generate simulated temperature and coordinates data

Now you can run an MQTT client that generates simulated data on temperature and coordinates and then submits it to IoT Core.

  1. In Cloud Shell, run an MQTT client to generate simulated data:

    cd bin
    ./run.sh
    

    You see results similar to the following:

    [INFO] Scanning for projects...
    [INFO]
    [INFO] ------------------------------------------------------------------------
    [INFO] Building cloud-iot-rtdp 0.0.1-SNAPSHOT
    [INFO] ------------------------------------------------------------------------
    [INFO]
    [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ cloud-iot-rtdp ---
    ...
    
  2. Open the Cloud Functions console.

  3. To confirm that a function is processing data, click the More options icon on the right side of your function, and then click View logs:

    view_logs

    You see results similar to the following:

    logs

  4. Open the Dataflow console.

  5. To confirm that a streaming Dataflow job is processing data, click the job ID:

    view_df

  6. Open BigQuery.

  7. Click the Compose Query button to open the query editor.

  8. To confirm that the temperature data is stored in a BigQuery table, run the following query in the editor:

    SELECT count(*) from [[PROJECT_ID]:iotds.temp_sensor]
    

    bq_editor

    If everything is working, you should see a single row in the results that displays a count of all the records that have been processed.

Handling alerts

Temperature measurements that are above the configured threshold for each device are logged as errors by Cloud Functions. You can view and analyse these in the Error console.

To active the error notifications, follow the documentation on Error reporting notifications.

error_console

Next steps

You can learn more about IoT, data processing, and visualization from the following links:

Submit a tutorial

Share step-by-step guides

Submit a tutorial

Request a tutorial

Ask for community help

Submit a request

View tutorials

Search Google Cloud tutorials

View tutorials

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see our Site Policies. Java is a registered trademark of Oracle and/or its affiliates.