Dataflow の Streaming Engine は、パイプラインの実行をワーカー仮想マシン(VM)から Dataflow サービスのバックエンドに移動します。ストリーミング ジョブに Streaming Engine を使用しない場合、Dataflow ランナーはストリーミング パイプラインのステップをすべてワーカー VM で実行し、ワーカーの CPU、メモリ、永続ディスク ストレージを消費します。
次のパイプラインでは Streaming Engine がデフォルトで有効になっています。
- Apache Beam Python SDK バージョン 2.21.0 以降と Python 3 を使用するストリーミング パイプライン。
- Apache Beam Go SDK バージョン 2.33.0 以降を使用するストリーミング パイプライン。
Streaming Engine の実装の詳細については、Streaming Engine: 高スケーラビリティ、低レイテンシのデータ処理の実行モデルをご覧ください。
利点
Streaming Engine モデルには、次の利点があります。
- ワーカー VM の CPU、メモリ、永続ディスク ストレージ リソースの消費量が削減されます。Streaming Engine は、より小さなワーカー マシンタイプで最適に動作します(
n1-standard-4
よりもn1-standard-2
)。小さいワーカーのブートディスクを超える永続ディスクを必要としないため、リソースと割り当ての消費量が少なくなります。 - 着信データ量の変動に対応する、レスポンスに優れた水平自動スケーリング。Streaming Engine によって、ワーカーのスケーリングがよりスムーズできめ細かになります。
- サービスの更新を適用するためにパイプラインを再デプロイする必要がないので、サポート性が向上しました。
ワーカー リソースの削減のほとんどは、処理を Dataflow サービスにオフロードすることによって実現されます。そのため、Streaming Engine の使用に関する料金がかかります。
サポートと制限事項
- Java SDK の場合、Streaming Engine には Apache Beam SDK バージョン 2.10.0 以降が必要です。
- Python SDK の場合、Streaming Engine には Apache Beam SDK バージョン 2.16.0 以降が必要です。
- Go SDK の場合、Streaming Engine には Apache Beam SDK バージョン 2.33.0 以降が必要です。
- Streaming Engine を使用してすでに実行されているパイプラインを更新することはできません。本番環境でパイプラインが Streaming Engine を使用せずに実行されていて、Streaming Engine を使用したい場合は、Dataflow のドレイン オプションを使用してパイプラインを停止します。その後、Streaming Engine パラメータを指定してパイプラインを再実行します。
- Streaming Engine を使用するジョブの場合、開いているウィンドウの集計入力データにキーあたり 60 GB の上限が適用されます。集計入力データには、バッファリングされた要素とカスタム状態の両方が含まれます。パイプラインがこの上限を超えると、パイプラインのシステムラグが長くなり、上限を超えたことを示すメッセージがジョブログに記録されます。結果としてキーのサイズが大きくなるパイプライン設計は避けることをおすすめします。詳細については、スケーラビリティを考慮した Dataflow パイプラインの記述をご覧ください。
- 顧客管理の暗号鍵(CMEK)をサポートします。
Streaming Engine を使用する
この機能は、Dataflow がサポートされているすべてのリージョンで使用できます。使用可能なロケーションを確認するには、Dataflow のロケーションをご覧ください。
Java
Streaming Engine には、Apache Beam Java SDK バージョン 2.10.0 以降が必要です。
ストリーミング パイプラインに Streaming Engine を使用するには、次のパラメータを指定します。
- Apache Beam SDK for Java バージョン 2.11.0 以降を使用している場合は、
--enableStreamingEngine
。 - Apache Beam SDK for Java バージョン 2.10.0 を使用している場合は、
--experiments=enable_streaming_engine
。
パイプラインに Dataflow Streaming Engine を使用する場合は、--zone
パラメータを指定しないでください。代わりに、--region
パラメータを指定し、その値をサポートされているリージョンに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone
パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。
Streaming Engine は、小規模なコア ワーカー マシンタイプで最適に動作します。ジョブタイプを使用して、ハイメモリ ワーカー マシンタイプを使用するかどうかを決定します。推奨するマシンタイプの例には --workerMachineType=n1-standard-2
と --workerMachineType=n1-highmem-2
があります。また、Streaming Engine はワーカーのブートイメージ用とローカルログ用の容量だけを必要とするので、--diskSizeGb=30
を設定することもできます。これらの値はデフォルト値です。
Python
Streaming Engine には、Apache Beam Python SDK バージョン 2.16.0 以降が必要です。
新しい Dataflow ストリーミング パイプラインでは、次の条件が満たされると、Streaming Engine がデフォルトで有効になります。
- パイプラインは、Apache Beam Python SDK バージョン 2.21.0 以降と Python 3 を使用します。
- 顧客管理の暗号鍵は使用されていません。
- Dataflow ワーカーは、Dataflow ジョブと同じリージョンにあります。
Python SDK バージョン 2.45.0 以降では、ストリーミング パイプライン用に Streaming Engine を無効にすることはできません。Python SDK バージョン 2.44.0 以前で Streaming Engine を無効にするには、次のパラメータを指定します。
--experiments=disable_streaming_engine
Python 2 を使用している場合、Streaming Engine を有効にするには、次のパラメータを指定します。
--enable_streaming_engine
パイプラインで Dataflow Streaming Engine を使用する場合は、--zone
パラメータを指定しないでください。代わりに、--region
パラメータを指定し、その値をサポートされているリージョンに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone
パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。
Streaming Engine は、小規模なコア ワーカー マシンタイプで最適に動作します。ジョブタイプを使用して、ハイメモリ ワーカー マシンタイプを使用するかどうかを決定します。推奨するマシンタイプの例には --workerMachineType=n1-standard-2
と --workerMachineType=n1-highmem-2
があります。また、Streaming Engine はワーカーのブートイメージ用とローカルログ用の容量だけを必要とするので、--disk_size_gb=30
を設定することもできます。これらの値はデフォルト値です。
Go
Streaming Engine には、Apache Beam Go SDK バージョン 2.33.0 以降が必要です。
Apache Beam Go SDK を使用する新しい Dataflow ストリーミング パイプラインでは、Streaming Engine がデフォルトで有効になっています。
Go ストリーミング パイプラインで Streaming Engine を無効にするには、次のパラメータを指定します。このパラメータは、Streaming Engine を無効にするたびに指定する必要があります。
--experiments=disable_streaming_engine
パイプラインで Dataflow Streaming Engine を使用する場合は、--zone
パラメータを指定しないでください。代わりに、--region
パラメータを指定し、その値をサポートされているリージョンに設定します。Dataflow は、指定されたリージョンのゾーンを自動的に選択します。--zone
パラメータを指定する場合、使用可能なリージョンの外部にあるゾーンにそれが設定されると、Dataflow はエラーを報告します。
Streaming Engine は、小規模なコア ワーカー マシンタイプで最適に動作します。ジョブタイプを使用して、ハイメモリ ワーカー マシンタイプを使用するかどうかを決定します。推奨するマシンタイプの例には --workerMachineType=n1-standard-2
と --workerMachineType=n1-highmem-2
があります。また、Streaming Engine はワーカーのブートイメージ用とローカルログ用の容量だけを必要とするので、--disk_size_gb=30
を設定することもできます。これらの値はデフォルト値です。
gcloud CLI
gcloud dataflow jobs run
コマンドまたは gcloud dataflow flex-template run
コマンドを使用してパイプラインを実行します。ストリーミングを有効にする場合は次のフラグを使用します。
--enable-streaming-engine
Streaming Engine を無効にするには、次のフラグを使用します。
--additional-experiments=disable_streaming_engine
REST
REST API で projects.locations.jobs.create
メソッドを使用してパイプラインを実行する場合は、Job
リソースを使用して Streaming Engine を有効または無効にします。Streaming Engine を有効にするには、environment
で experiments
フィールドを enable_streaming_engine
に設定します。
"environment": {
"experiments": "enable_streaming_engine"
}
Streaming Engine を無効にするには、environment
で experiments
フィールドを disable_streaming_engine
に設定します。
"environment": {
"experiments": "disable_streaming_engine"
}
料金
Dataflow Streaming Engine は、リソースベースの課金モデルで、ジョブで使用されたリソースの合計に対して課金されます。リソースベースの課金では、ジョブによって使用された Streaming Engine リソースが、Streaming Engine コンピューティング単位数で測定されます。ワーカー CPU、ワーカーメモリ、Streaming Engine コンピューティング単位数に対して課金されます。
リソースベースの課金を使用する
リソースベースの課金を使用するには、ジョブを開始または更新するときに次の Dataflow サービス オプションを含めます。
Java
--dataflowServiceOptions=enable_streaming_engine_resource_based_billing
Python
--dataflow_service_options=enable_streaming_engine_resource_based_billing
Go
--dataflow_service_options=enable_streaming_engine_resource_based_billing
データ処理課金(従来型)
リソースベースの課金を有効にしない限り、ジョブは従来のデータ処理課金を使用して課金されます。
課金モデルを確認する
Dataflow Prime を使用する場合を除き、リソースベースの課金を使用するジョブがある場合、請求書には SKU Streaming Engine Compute Unit
が記載されます。データ処理課金を使用するジョブがある場合、請求書には SKU Streaming Engine data processed
が記載されます。リソースベースの課金を使用するジョブとデータ処理課金を使用するジョブがある場合、請求書には両方の SKU が記載されます。
リソースベースの課金で Dataflow Prime を使用する場合、Data Compute Unit(DCU)SKU が使用されます。
ジョブで使用される料金モデルを確認するには、Dataflow モニタリング インターフェースでジョブを選択します。ジョブでリソースベースの課金を使用している場合、[ジョブ情報] サイドパネルに [Streaming Engine コンピューティング単位数] フィールドが表示されます。
課金についてご不明な点がございましたら、Cloud カスタマーケアまでお問い合わせください。