Edit on GitHub
Report issue
Page history

Author(s): @{[{ username }]}  Published: {[{ TutorialCtrl.tutorial.date_published | date:'mediumDate' }]}

Preston Holmes | Solution Architect | Google

This tutorial demonstrates how to use Cloud IoT Core and Cloud Pub/Sub to provide a secure and scalable ingest layer, combined with a small relay service over private networking to an on-premises IoT solution.

Objectives

  • Configure IoT Core and Cloud Pub/Sub as a fully managed ingest system
  • Configure a stand-in for on-premises MQTT service on Compute Engine
  • Create a small, scalable relay service that pulls messages from Cloud Pub/Sub

Figure 1. Architecture diagram for tutorial components architecture diagram

Before you begin

This tutorial assumes you already have a Google Cloud Platform account set up and have completed the IoT Core quickstart.

Costs

This tutorial uses billable components of GCP, including the following:

  • Cloud IoT Core
  • Cloud Pub/Sub
  • Compute Engine

This tutorial should not generate any usage that would not be covered by the free tier, but you can use the Pricing Calculator to generate a cost estimate based on your projected production usage.

Introduction

The idea of using fully managed scaling services is common for developers looking to distribute content. A content delivery network (CDN) is often used to provide scale between the content origin and many globally distributed consumers.

Figure 2. CDN pattern architecture diagram

Many IoT-related projects begin with a simple MQTT broker created inside premises (on-premises) corporate infrastructure to simplify installation while still allowing internal access. Internal apps use data arriving on this broker, along with access to other on-premises systems to build business-facing IoT applications. There are many reasons why a company may prefer to pursue hybrid development. For example, hybrid development may make it easier to assess or control costs or control data access.

Taking this initial MQTT broker and exposing it externally to production scales of device traffic may pose many challenges to a strictly on-premises development, including the following:

  • Increasing reliability of the network available to devices.
  • Allowing many devices to connect concurrently (MQTT is a stateful connection).
  • Keeping the MQTT broker up and running reliably.
  • Keeping the connection between devices and the cloud secure, while not exposing other on-premises network services.

    These requirements resemble that of a CDN, but in reverse.

Figure 3. Global Ingest architecture diagram

Let's quickly set up up an implementation of a solution that uses Google's fully managed services and scale it to address some of these concerns while allowing the primary application to still be developed on-premises.

Set up the environment

If you do not already have a development environment set up with gcloud, it is recommended that you use Cloud Shell for any command line instructions.

Set the name of the Cloud IoT Core settings you are using to environment variables:

export CLOUD_REGION=us-central1
export CLOUD_ZONE=us-central1-c
export GCLOUD_PROJECT=$(gcloud config list project --format "value(core.project)")
export IOT_TOPIC=[the Cloud Pub/Sub topic ID you set up with your IoT Core registry; the short ID, not the full path]

Though Cloud Shell has the Go language runtime pre-installed, you will need some other libraries. Install them with this command:

go get cloud.google.com/go/pubsub cloud.google.com/go/compute/metadata github.com/eclipse/paho.mqtt.golang

Clone the repository associated with the community tutorials:

git clone https://github.com/GoogleCloudPlatform/community.git

Create the on-premises broker

You will use the project private networking as a stand-in for a proper hybrid Cloud Interconnect setup.

Start by creating a VM to represent the on-premises broker instance. This will be running a basic version of RabbitMQ.

gcloud beta compute instances create-with-container on-prem-rabbit \
--zone=$CLOUD_ZONE \
--machine-type=g1-small \
--boot-disk-size=10GB \
--boot-disk-type=pd-standard \
--boot-disk-device-name=on-prem-rabbit \
--container-image=cyrilix/rabbitmq-mqtt \
--container-restart-policy=always \
--labels=container-vm=cos-stable-66-10452-89-0 \
--container-env=RABBITMQ_DEFAULT_USER=user,RABBITMQ_DEFAULT_PASS=abc123

For the purpose of this tutorial, you want to be able to quickly demonstrate some of the hybrid nature of this pattern. So, you are going to expose this VM to the internet but will only relay data to it over the instance private IP address, simulating the private network link in the above architecture diagram.

To allow you to connect to this broker to verify traffic flow, allow connections to it with a firewall rule:

gcloud compute firewall-rules create mqtt --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:1883 --source-ranges=0.0.0.0/0

Set up the relay

For the relay to receive messages from IoT Core and Cloud Pub/Sub, it will need a dedicated subscription:

gcloud pubsub subscriptions create relay --topic $IOT_TOPIC

The relay demonstrated here is built with Go. The Cloud Shell environment recommended above should already have the tools needed to build the relay. The source for the relay is compact and simple:

package main

import (
    "context"
    "fmt"
    "log"
    "os"

    "cloud.google.com/go/compute/metadata"
    "cloud.google.com/go/pubsub"
    MQTT "github.com/eclipse/paho.mqtt.golang"
)

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
    //create a ClientOptions struct setting the broker address, clientid, turn
    //off trace output and set the default message handler
    opts := MQTT.NewClientOptions().AddBroker("tcp://on-prem-rabbit:1883")
    opts.SetClientID("go-simple")
    opts.SetUsername("user")
    opts.SetPassword("abc123")
    opts.SetDefaultPublishHandler(f)

    //create and start a client using the above ClientOptions
    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    ctx := context.Background()
    var err error
    proj := os.Getenv("GOOGLE_CLOUD_PROJECT")
    if proj == "" {
        proj, err = metadata.ProjectID()
        if err != nil {
            fmt.Fprintf(os.Stderr, "GOOGLE_CLOUD_PROJECT environment variable must be set.\n")
            os.Exit(1)
        }
    }
    client, err := pubsub.NewClient(ctx, proj)
    if err != nil {
        log.Fatalf("Could not create pubsub Client: %v", err)
    }
    sub := client.Subscription("relay")
    // use async pull to get messages from devices
    err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
        fmt.Printf("Got message: %q\n", string(msg.Data))
        fmt.Println(msg.Attributes["subFolder"])
        // publish the message to the corresponding MQTT topic in the target broker
        token := c.Publish(msg.Attributes["subFolder"], 0, false, msg.Data)
        go func(pubsource *pubsub.Message, token MQTT.Token) {
            token.Wait()
            fmt.Printf("confirmed %s\n", "message id")
            msg.Ack()
        }(msg, token)
    })

    if err != nil {
        log.Fatal(err)
    }

    c.Disconnect(250)
}

The goal of the relay is to efficiently pull messages from Cloud Pub/Sub and then re-publish to the on-prem MQTT broker. It takes advantage of asynchronous pull of Cloud Pub/Sub (sometimes called streaming pull), which uses streaming gRPC messages to reduce latency and increase throughput.

Create a VM to run as the relay:

gcloud compute instances create telemetry-relay \
--zone=$CLOUD_ZONE \
--machine-type=g1-small

To build the relay:

cd community/tutorials/cloud-iot-hybrid/go-relay
bash install.sh

This script builds the binary, installs it on the relay VM, and starts it as a relay service.

Verifying traffic flow

You can use an MQTT client to connect to the stand-in for the on-premises broker. This represents some part of the on-premises IoT application. One simple browser-based tool you can use in Chrome is MQTTLens.

Use the public IP address of the on-prem-rabbit instance, along with a username of user and password of abc123 to connect.

Create a subscription to an MQTT topic of sample.

Now you can use one of the IoT Core quickstart samples to send test messages to this topic: /devices/${deviceId}/events/sample

You can also quickly emulate this by publishing directly to the Cloud Pub/Sub topic directly with a subFolder attribute.

gcloud pubsub topics publish $IOT_TOPIC --attribute=subFolder=sample --message "hello"

The relay republishes this to the corresponding MQTT topic on the on-premises broker over the private network, in this case using the project-level private network DNS to lookup on-prem-rabbit host. (Refer to the architecture figure above for a refresher of the data flow.)

Next steps

This relay service could be adapted in a number of ways:

  • Scaled out to consume more messages in parallel from Cloud Pub/Sub, perhaps deployed to Google Kubernetes Engine.
  • Use rate limiting to protect the on-premises broker from overload, using the Cloud Pub/Sub subscription as a surge tank.

You might also consider having the relay pulling from Cloud Pub/Sub and writing more directly to alternate services on-premises, instead of through an on-premises MQTT broker. However, by relaying MQTT, it lets the on-premises development proceed if MQTT was already built into the solution, or if a looser coupling is wanted.

Cleaning up

# Remove the relay subscription - this will not destroy the topic
gcloud pubsub subscriptions delete relay

# Delete the on-prem broker stand-in
gcloud compute instances delete --zone $CLOUD_ZONE on-prem-rabbit

# Delete the relay VM
gcloud compute instances delete --zone $CLOUD_ZONE telemetry-relay
See more by @{[{ username }]} and more tagged {[{ tag }]}{[{ $last ? '' : ', ' }]}

Submit a Tutorial

Share step-by-step guides

SUBMIT A TUTORIAL

Request a Tutorial

Ask for community help

SUBMIT A REQUEST

GCP Tutorials

Tutorials published by GCP

VIEW TUTORIALS

Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see our Site Policies. Java is a registered trademark of Oracle and/or its affiliates.