Dataflow は、ストリーミング ジョブに対して 2 つのモードをサポートしています。
- 1 回限りのモード。このモードは、すべての Dataflow ストリーミング ジョブのデフォルトです。このモードの Dataflow では、データがパイプラインを移動する際にレコードの削除や重複が発生しません。
- 1 回以上のモード。このモードでは、レコードが少なくとも 1 回処理されます(入力レコードが失われることはありません)。ただし、このモードでは重複するレコードが発生する可能性があります。重複を許容できるユースケースの場合、1 回以上のモードを使用すると、ジョブのコストとレイテンシを大幅に削減できます。
使用するストリーミング モードを選択する
パイプラインからの正確な結果と予測可能なセマンティクスを確保する必要がある場合は、1 回限りのモードを選択してください。次に例を示します。
- カウント、合計、平均などの集計を使用するパイプライン。
- レコードが 1 回だけ処理されるビジネス クリティカルなユースケース。たとえば、不正行為の検出、ネットワーク脅威の検出、e コマース インベントリのダッシュボードなどです。
ワークロードが重複レコードを許容でき、費用やレイテンシの削減のメリットを得られる可能性がある場合は、1 回以上のストリーミング モードを選択します。次に例を示します。
- Dataflow のダウンストリームで重複除去が実行されるワークロード。たとえば、BigQuery や SQL データストアに書き込むパイプラインなどです。
- 集計なしのマップのみのパイプライン。たとえば、ログ処理、変更データ キャプチャ、抽出、変換、読み込み(ETL)ジョブなどです。これらのジョブでは、パイプラインはスキーマ変換など、要素ごとの変換のみを実行します。
- Pub/Sub など、出力シンクが 1 回限りの配信を保証できないパイプライン。その場合、パイプライン内での重複除去が不要になり、1 回以上のストリーミング モードによるコストとレイテンシの削減の恩恵を受けることができます。
- Pub/Sub から読み取るパイプライン。Pub/Sub からの読み取りは、1 回以上のモードを使用すると大幅に最適化されます。
その他の考慮事項
1 回以上のモードでは、パイプラインのコストとレイテンシを大幅に削減できます。正確な影響はパイプラインの詳細によって異なります。現実的な負荷で 1 回以上のストリーミングをテストして、影響を評価します。
1 回以上のモードを使用する場合、レコードの重複率は再試行回数によって異なります。ベースラインのレートは通常低くなります(1% 未満)。ただし、ワーカーノードで障害が発生した場合や、他の条件によって RPC 呼び出しが繰り返されると、急増することがあります。
ストリーミング モードは、Streaming Engine がレコードを処理する方法に影響しますが、I/O コネクタのセマンティクスは変更されません。I/O セマンティクスをストリーミング モードに合わせることをおすすめします。たとえば、BigQuery I/O コネクタで 1 回以上のストリーミング モードを使用する場合は、書き込みモードを
STORAGE_API_AT_LEAST_ONCE
に設定します。1 回以上のストリーミングを使用すると、Google 提供の Dataflow テンプレートでこのオプションが自動的に有効になります。Map
などの要素全体の変換は、常にべき等であるとは限りません。たとえば、メッセージを受け取って現在のタイムスタンプを付加する関数について考えてみましょう。この場合、重複するレコードから異なる出力が複数生成される可能性があります。このパイプラインには、1 回以上のモードは適さない場合があります。
ストリーミング モードを設定する
1 回限りの処理は、すべての Dataflow ジョブのデフォルト設定です。1 回以上のストリーミング モードを有効にするには、streaming_mode_at_least_once
サービス オプションを設定します。
Java
--dataflowServiceOptions=streaming_mode_at_least_once
Python
--dataflow_service_options=streaming_mode_at_least_once
Go
--dataflow_service_options=streaming_mode_at_least_once
streaming_mode_at_least_once
オプションを指定しない場合、Dataflow は 1 回限りのストリーミング モードを使用します。
streaming_mode_at_least_once
オプションを設定すると、Dataflow はリソースベースの課金で Streaming Engine を自動的に有効にします。
実行中のジョブのストリーミング モードを更新するには、既存のジョブを停止して置換ジョブを実行します。詳細については、置換ジョブを起動するをご覧ください。
テンプレートのストリーミング モードを選択する
Dataflow ストリーミング テンプレートの実行時にストリーミング モードを選択するには、次の手順を行います。
コンソール
Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[テンプレートからジョブを作成] をクリックします。
[Dataflow テンプレート] プルダウン メニューから、実行するテンプレートを選択します。
[ストリーミング モード] でストリーミング モードを選択します。テンプレートが 1 つのモードのみをサポートしている場合、このオプションは無効になります。
gcloud
1 回以上モードを有効にするには、additional-experiments
フラグで streaming_mode_at_least_once
オプションを設定します。
--additional-experiments=streaming_mode_at_least_once
1 回限りモードを有効にするには、additional-experiments
フラグで streaming_mode_exactly_once
オプションを設定します。
--additional-experiments=streaming_mode_exactly_once
この 2 つのオプションは相互に排他的です。これらのオプションのいずれも設定しない場合、テンプレートは、テンプレート メタデータによって決定されるストリーミング モードにデフォルト設定されます。詳細については、カスタム テンプレートをご覧ください。
REST
FlexTemplateRuntimeEnvironment
(Flex テンプレート)または RuntimeEnvironment
(クラシック テンプレート)のオブジェクトの additionalExperiments
フィールドを使用します。
{
additionalExperiments : ["streaming_mode_at_least_once"]
...
}
カスタム テンプレート
1 回以上の処理をサポートするカスタム テンプレートを作成する場合は、テンプレート メタデータ ファイルに次の最上位フィールドを追加します。
{
"streaming": true,
"supportsAtLeastOnce": true,
"supportsExactlyOnce": true,
"defaultStreamingMode": "AT_LEAST_ONCE"
}
これらのメタデータ フィールドを使用すると、ユーザーは Google Cloud コンソールでテンプレートをデプロイするときにストリーミング モードを選択できます。defaultStreamingMode
フィールドは省略可能であり、テンプレートのデフォルトのストリーミング モードを指定します。defaultStreamingModeg
を指定せず、テンプレートが両方のモードをサポートしている場合は、1 回限りモードがデフォルト設定です。
詳細については、Dataflow テンプレート ドキュメントの次のセクションをご覧ください。
- Flex テンプレート: メタデータ
- クラシック テンプレート: パイプライン コードでメタデータを使用する
ジョブのストリーミング モードを表示する
ジョブのストリーミング モードを表示するには、Google Cloud コンソールの [ジョブ] ページに移動します。
ストリーミング モードは、[ジョブの詳細] ページの [ジョブ情報] パネルにも表示されます。
制限事項
1 回以上のストリーミング モードでは、リソースベースの課金を使用する Streaming Engine が必要です。
料金
1 回以上モードでは、常にリソースベースの課金が使用されます。ジョブで使用される合計リソースに対して課金されます。
Streaming Engine コンピューティング単位の単位あたりの費用は、ストリーミング モードに関係なく同じです。ただし、ほとんどの場合、1 回以上モードを使用すると、パイプラインの合計リソースの消費量が大幅に減少します。
次のステップ
- 1 回限りの処理の詳細を確認する。