MQTT para modelo do Pub/Sub

O modelo MQTT para Pub/Sub é um pipeline de streaming que lê mensagens de um tópico MQTT e as grava no Pub/Sub. Ele inclui os parâmetros opcionais username e password caso a autenticação seja exigida pelo servidor MQTT.

Se o pipeline não receber nenhuma mensagem do tópico MQTT por mais de 90 minutos, ocorrerá uma StackOverflowError. Como solução alternativa, é possível mudar o número de workers a cada 90 minutos. Para mais informações sobre como alterar o número de workers sem interromper o job, consulte Atualização de opções de jobs em trânsito.

Requisitos do pipeline

  • O nome do tópico de saída do Pub/Sub precisa existir.
  • O IP do host MQTT precisa existir e ter a configuração de rede adequada para que as máquinas de worker alcancem o host MQTT.
  • O tópico do MQTT do qual os dados são extraídos precisa ter um nome.

Parâmetros do modelo

Parâmetro Descrição
brokerServer O IP ou host do servidor do corretor MQTT. Por exemplo, tcp://10.0.0.1:1883.
inputTopic O nome do tópico do MQTT do qual os dados são lidos.
outputTopic O nome do tópico de saída do Pub/Sub em que os dados são gravados.
username (Opcional) O nome de usuário a ser usado na autenticação no servidor MQTT.
password (Opcional) A senha associada ao nome de usuário fornecido.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione MQTT to Pub/Sub template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MQTT_To_Pubsub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua pelo nome da região do Dataflow. Por exemplo, us-central1.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua INPUT_TOPIC pelo nome do tópico de entrada do servidor MQTT. Por exemplo, testtopic.
  • Substitua MQTT_SERVER pelos endereços do servidor MQTT. Por exemplo: tcp://10.128.0.62:1883
  • Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo, projects/myproject/topics/testoutput.
  • Substitua USERNAME pelo nome de usuário do servidor MQTT. Por exemplo, testuser.
  • Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor MQTT.

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MQTT_To_Pubsub",
   }
}
  

Substitua os valores a seguir neste exemplo:

  • Substitua YOUR_PROJECT_ID pelo ID do projeto.
  • Substitua pelo nome da região do Dataflow. Por exemplo, us-central1.
  • Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
  • Substitua INPUT_TOPIC pelo nome do tópico de entrada do servidor MQTT. Por exemplo, testtopic.
  • Substitua MQTT_SERVER pelos endereços do servidor MQTT. Por exemplo: tcp://10.128.0.62:1883
  • Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo, projects/myproject/topics/testoutput.
  • Substitua USERNAME pelo nome de usuário do servidor MQTT. Por exemplo, testuser.
  • Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor MQTT.

A seguir