MQTT to Pub/Sub テンプレート

MQTT to Pub/Sub テンプレートは、MQTT トピックからメッセージを読み取り、Pub/Sub に書き込むストリーミング パイプラインです。これには、MQTT サーバーで認証が必要な場合は、オプションのパラメータ usernamepassword が含まれます。

パイプラインが 90 分以上 MQTT トピックからメッセージを受信しなかった場合は、StackOverflowError が発生します。回避策として、ワーカーの数を 90 分ごとに変更できます。ジョブを停止せずにワーカーの数を変更する方法については、処理中のジョブ オプションの更新をご覧ください。

パイプラインの要件

  • Pub/Sub 出力トピック名が存在していること。
  • MQTT ホスト IP が存在し、ワーカーマシンが MQTT ホストにアクセスするための適切なネットワークが構成されていること。
  • データを抽出する MQTT トピックに名前が付けられていること。

テンプレートのパラメータ

パラメータ 説明
brokerServer MQTT ブローカー サーバーの IP またはホスト。例: tcp://10.0.0.1:1883
inputTopic データが読み取られる MQTT トピックの名前。
outputTopic データが書き込まれる Pub/Sub の出力トピックの名前。
username (省略可)MQTT サーバーでの認証に使用するユーザー名。
password (省略可)指定したユーザー名に関連付けられているパスワード。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ MQTT to Pub/Sub template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MQTT_To_Pubsub \
    --parameters \
brokerServer=MQTT_SERVER,\
inputTopic=INPUT_TOPIC,\
outputTopic=OUTPUT_TOPIC,\
username=USERNAME,\
password=PASSWORD
  

このサンプルでは、次のように値を置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • は、Dataflow リージョン名に置き換えます。例: us-central1
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? に一致する必要があります。
  • INPUT_TOPIC は、MQTT サーバーの入力トピックの名前に置き換えます。例: testtopic
  • MQTT_SERVER は、MQTT サーバーのアドレスに置き換えます。例: tcp://10.128.0.62:1883
  • OUTPUT_TOPIC は、Pub/Sub の出力トピックの名前に置き換えます。例: projects/myproject/topics/testoutput
  • USERNAME は、MQTT サーバーのユーザー名に置き換えます。例: testuser
  • PASSWORD は、MQTT サーバーで使用するユーザー名のパスワードに置き換えます。

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "brokerServer": "MQTT_SERVER",
          "inputTopic": "INPUT_TOPIC",
          "outputTopic": "OUTPUT_TOPIC",
          "username": "USERNAME",
          "password": "PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MQTT_To_Pubsub",
   }
}
  

このサンプルでは、次のように値を置き換える必要があります。

  • YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • は、Dataflow リージョン名に置き換えます。例: us-central1
  • JOB_NAME は、任意のジョブ名に置き換えます。ジョブ名は正規表現 [a-z]([-a-z0-9]{0,38}[a-z0-9])? に一致する必要があります。
  • INPUT_TOPIC は、MQTT サーバーの入力トピックの名前に置き換えます。例: testtopic
  • MQTT_SERVER は、MQTT サーバーのアドレスに置き換えます。例: tcp://10.128.0.62:1883
  • OUTPUT_TOPIC は、Pub/Sub の出力トピックの名前に置き換えます。例: projects/myproject/topics/testoutput
  • USERNAME は、MQTT サーバーのユーザー名に置き換えます。例: testuser
  • PASSWORD は、MQTT サーバーで使用するユーザー名のパスワードに置き換えます。

次のステップ