MQTT to Pub/Sub template

The MQTT to Pub/Sub template is a streaming pipeline that reads messages from an MQTT topic and writes them to Pub/Sub. It includes the optional parameters username and password in case authentication is required by the MQTT server.

If the pipeline doesn't receive any message from the MQTT topic for more than 90 minutes, a StackOverflowError occurs. As a workaround, you can change the number of workers every 90 minutes. For more information about changing the number of workers without stopping your job, see In-flight job option update.

Pipeline Requirements

  • The Pub/Sub output topic name must exist.
  • The MQTT host IP must exist and have the proper network configuration for worker machines to reach the MQTT host.
  • The MQTT topic that data is extracted from must have a name.

Template parameters

Required parameters

  • inputTopic : The name of the MQTT topic that data is read from. (Example: topic).
  • outputTopic : The name of the output Pub/Sub topic that data is written to. (Example: projects/your-project-id/topics/your-topic-name).
  • username : The username to use for authentication on the MQTT server. (Example: sampleusername).
  • password : The password associated with the provided username. (Example: samplepassword).

Optional parameters

  • brokerServer : The MQTT broker server IP or host. (Example: tcp://host:1883).

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select MQTT to Pub/Sub template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace with Dataflow region name. For example: us-central1.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace INPUT_TOPIC with the name of the MQTT server input topic. For example: testtopic.
  • Replace MQTT_SERVER with the MQTT server addresses. For example: tcp://10.128.0.62:1883
  • Replace OUTPUT_TOPIC with the name of Pub/Sub output topic. For example: projects/myproject/topics/testoutput.
  • Replace USERNAME with the username for the MQTT server. For example: testuser.
  • Replace PASSWORD with the password that corresponds to the username used with the MQTT server.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub",
   }
}
  

You must replace the following values in this example:

  • Replace YOUR_PROJECT_ID with your project ID.
  • Replace with Dataflow region name. For example: us-central1.
  • Replace JOB_NAME with a job name of your choice. The job name must match the regular expression [a-z]([-a-z0-9]{0,38}[a-z0-9])? to be valid.
  • Replace INPUT_TOPIC with the name of the MQTT server input topic. For example: testtopic.
  • Replace MQTT_SERVER with the MQTT server addresses. For example: tcp://10.128.0.62:1883
  • Replace OUTPUT_TOPIC with the name of Pub/Sub output topic. For example: projects/myproject/topics/testoutput.
  • Replace USERNAME with the username for the MQTT server. For example: testuser.
  • Replace PASSWORD with the password that corresponds to the username used with the MQTT server.

What's next