MQTT to Pub/Sub テンプレートは、MQTT トピックからメッセージを読み取り、Pub/Sub に書き込むストリーミング パイプラインです。これには、MQTT サーバーで認証が必要な場合は、オプションのパラメータ username
と password
が含まれます。
パイプラインが 90 分以上 MQTT トピックからメッセージを受信しなかった場合は、StackOverflowError
が発生します。回避策として、ワーカーの数を 90 分ごとに変更できます。ジョブを停止せずにワーカーの数を変更する方法については、処理中のジョブ オプションの更新をご覧ください。
パイプラインの要件
- Pub/Sub 出力トピック名が存在していること。
- MQTT ホスト IP が存在し、ワーカーマシンが MQTT ホストにアクセスするための適切なネットワーク構成が必要です。
- データを抽出する MQTT トピックに名前が付けられていること。
テンプレートのパラメータ
必須パラメータ
- inputTopic: データが読み取られる MQTT トピックの名前。例:
topic
- outputTopic: データが書き込まれる Pub/Sub の出力トピックの名前。例:
projects/your-project-id/topics/your-topic-name
- username: MQTT サーバーでの認証に使用するユーザー名。例:
sampleusername
- password: 指定したユーザー名に関連付けられているパスワード。例:
samplepassword
オプション パラメータ
- brokerServer: MQTT ブローカー サーバーの IP またはホスト。例:
tcp://host:1883
テンプレートを実行する
コンソール
- Dataflow の [テンプレートからジョブを作成] ページに移動します。 [テンプレートからジョブを作成] に移動
- [ジョブ名] フィールドに、固有のジョブ名を入力します。
- (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは
us-central1
です。Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。
- [Dataflow テンプレート] プルダウン メニューから、[ MQTT to Pub/Sub template] を選択します。
- 表示されたパラメータ フィールドに、パラメータ値を入力します。
- [ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルで、テンプレートを実行します。
gcloud dataflow flex-template run JOB_NAME \ --project=YOUR_PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Mqtt_to_PubSub \ --parameters \ brokerServer=MQTT_SERVER,\ inputTopic=INPUT_TOPIC,\ outputTopic=OUTPUT_TOPIC,\ username=USERNAME,\ password=PASSWORD
このサンプルの次の値は置き換える必要があります。
- YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
- は、Dataflow リージョン名に置き換えます。例:
us-central1
。 - JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現
[a-z]([-a-z0-9]{0,38}[a-z0-9])?
と一致する必要があります。 - INPUT_TOPIC は、MQTT サーバーの入力トピックの名前に置き換えます。例:
testtopic
。 - MQTT_SERVER は、MQTT サーバーのアドレスに置き換えます。例:
tcp://10.128.0.62:1883
- OUTPUT_TOPIC は、Pub/Sub の出力トピックの名前に置き換えます。例:
projects/myproject/topics/testoutput
。 - USERNAME は、MQTT サーバーのユーザー名に置き換えます。例:
testuser
。 - PASSWORD は、MQTT サーバーで使用するユーザー名のパスワードに置き換えます。
API
REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch
をご覧ください。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "brokerServer": "MQTT_SERVER", "inputTopic": "INPUT_TOPIC", "outputTopic": "OUTPUT_TOPIC", "username": "USERNAME", "password": "PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Mqtt_to_PubSub", } }
このサンプルの次の値は置き換える必要があります。
- YOUR_PROJECT_ID は、実際のプロジェクト ID に置き換えます。
- は、Dataflow リージョン名に置き換えます。例:
us-central1
。 - JOB_NAME を任意のジョブ名に置き換えます。ジョブ名は有効な正規表現
[a-z]([-a-z0-9]{0,38}[a-z0-9])?
と一致する必要があります。 - INPUT_TOPIC は、MQTT サーバーの入力トピックの名前に置き換えます。例:
testtopic
。 - MQTT_SERVER は、MQTT サーバーのアドレスに置き換えます。例:
tcp://10.128.0.62:1883
- OUTPUT_TOPIC は、Pub/Sub の出力トピックの名前に置き換えます。例:
projects/myproject/topics/testoutput
。 - USERNAME は、MQTT サーバーのユーザー名に置き換えます。例:
testuser
。 - PASSWORD は、MQTT サーバーで使用するユーザー名のパスワードに置き換えます。
次のステップ
- Dataflow テンプレートについて学習する。
- Google 提供のテンプレートのリストを確認する。