O modelo MQTT para Pub/Sub é um pipeline de streaming que lê mensagens de um tópico MQTT e as grava no Pub/Sub.
Ele inclui os parâmetros opcionais username
e password
caso a autenticação seja exigida pelo servidor MQTT.
Se o pipeline não receber nenhuma mensagem do tópico MQTT por mais de 90 minutos, ocorrerá uma StackOverflowError
.
Como solução alternativa, é possível mudar o número de workers a cada 90 minutos.
Para mais informações sobre como alterar o número de workers sem interromper o job, consulte Atualização de opções de jobs em trânsito.
Requisitos do pipeline
- O nome do tópico de saída do Pub/Sub precisa existir.
- O IP do host MQTT precisa existir e ter a configuração de rede adequada para que as máquinas de worker alcancem o host MQTT.
- O tópico do MQTT do qual os dados são extraídos precisa ter um nome.
Parâmetros do modelo
Parâmetros obrigatórios
- inputTopic : o nome do tópico do MQTT do qual os dados são lidos. (Exemplo: tópico).
- outputTopic : o nome do tópico de saída do Pub/Sub em que os dados são gravados. (Exemplo: projects/your-project-id/topics/your-topic-name).
- username : o nome de usuário a ser usado na autenticação no servidor MQTT. (Exemplo: sampleusername).
- password : a senha associada ao nome de usuário fornecido. (Exemplo: samplepassword).
Parâmetros opcionais
- brokerServer : O IP ou host do servidor do corretor MQTT. (Exemplo: tcp://host:1883).
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione MQTT to Pub/Sub template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \ --parameters \ brokerServer=MQTT_SERVER,\ inputTopic=INPUT_TOPIC,\ outputTopic=OUTPUT_TOPIC,\ username=USERNAME,\ password=PASSWORD
Substitua os valores a seguir neste exemplo:
- Substitua YOUR_PROJECT_ID pelo ID do projeto.
- Substitua pelo nome da região do Dataflow. Por exemplo,
us-central1
. - Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular
[a-z]([-a-z0-9]{0,38}[a-z0-9])?
para ser válido. - Substitua INPUT_TOPIC pelo nome do tópico de entrada do servidor MQTT. Por exemplo,
testtopic
. - Substitua MQTT_SERVER pelos endereços do servidor MQTT. Por exemplo:
tcp://10.128.0.62:1883
- Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo,
projects/myproject/topics/testoutput
. - Substitua USERNAME pelo nome de usuário do servidor MQTT. Por exemplo,
testuser
. - Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor MQTT.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "brokerServer": "MQTT_SERVER", "inputTopic": "INPUT_TOPIC", "outputTopic": "OUTPUT_TOPIC", "username": "USERNAME", "password": "PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub", } }
Substitua os valores a seguir neste exemplo:
- Substitua YOUR_PROJECT_ID pelo ID do projeto.
- Substitua pelo nome da região do Dataflow. Por exemplo,
us-central1
. - Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular
[a-z]([-a-z0-9]{0,38}[a-z0-9])?
para ser válido. - Substitua INPUT_TOPIC pelo nome do tópico de entrada do servidor MQTT. Por exemplo,
testtopic
. - Substitua MQTT_SERVER pelos endereços do servidor MQTT. Por exemplo:
tcp://10.128.0.62:1883
- Substitua OUTPUT_TOPIC pelo nome do tópico de saída do Pub/Sub. Por exemplo,
projects/myproject/topics/testoutput
. - Substitua USERNAME pelo nome de usuário do servidor MQTT. Por exemplo,
testuser
. - Substitua PASSWORD pela senha correspondente ao nome de usuário usado no servidor MQTT.
A seguir
- Saiba mais sobre os modelos do Dataflow.
- Confira a lista de modelos fornecidos pelo Google.