このページでは、Dataflow で自動スケーリング対象のバッチ パイプラインに対して Flexible Resource Scheduling(FlexRS)を有効にする方法を説明します。
FlexRS
FlexRS は、高度なスケジューリング技術、Dataflow Shuffle サービス、プリエンプティブル仮想マシン(VM)インスタンスと通常の VM の組み合わせを使用することで、バッチ処理コストを削減します。プリエンプティブル VM と通常の VM を並列実行することで、システム イベント中に Compute Engine がプリエンプティブル VM インスタンスを停止した場合の Dataflow のユーザー エクスペリエンスが向上します。FlexRS は、プリエンプティブル VM が Compute Engine によってプリエンプトされたときに、パイプラインの処理を続行して以前の作業を確実に保持するのに効果的です。
FlexRS を使用するジョブでは、参加とグループ化のためにサービスベースの Dataflow Shuffle を使用します。その結果、FlexRS ジョブでは一時的な計算結果を保存するために Persistent Disk リソースを使用しません。Dataflow Shuffle を使用すると、Dataflow サービスで残りのワーカーにデータを再分配する必要がないため、FlexRS ではワーカー VM のプリエンプションの処理が向上します。各 Dataflow ワーカーでは、マシンイメージと一時ログを保存するために、さらに 25 GB の Persistent Disk ボリュームが必要です。
FlexRS ジョブではスケジューリングが遅延します。そのため、FlexRS は、特定の時間枠内に完了できる毎日行われるジョブや毎週行われるジョブなど、時間の制約が厳しくないワークロードに最適です。
遅延したスケジューリング
FlexRS ジョブを送信すると、Dataflow サービスでそのジョブがキューに入れられ、ジョブの作成から 6 時間以内に実行のために送信されます。Dataflow では、利用可能な容量やその他の要因に基づいて、時間枠内のジョブを開始するのに最適な時刻が決定されます。
FlexRS ジョブを送信すると、Dataflow サービスで次の手順を実行します。
- ジョブの送信直後にジョブ ID を返します。
- 早期検証を実行します。
早期検証の結果を使用して、次のステップを決定します。
- 成功すると、遅延している開始を待機するためにジョブをキューに入れます。
- それ以外の場合、ジョブは失敗し、Dataflow サービスでエラーが報告されます。
そのため、FlexRS ジョブを送信した後、ジョブが成功すると Dataflow モニタリング インターフェースに [キューに格納済み] の ID とステータスが表示され、それ以外の場合は [失敗] と表示されます。
早期検証
FlexRS ジョブは送信後すぐには開始されません。早期検証では、Dataflow サービスで実行パラメータと IAM のロールやネットワーク構成などの Google Cloud 環境設定が検証されます。Dataflow によって、ジョブの送信時に可能な限りジョブが検証され、潜在的なエラーが報告されます。この早期検証プロセスに対しては請求されません。
早期検証ステップでは、ユーザーコードは実行されません。Apache Beam Direct Runner または FlexRS 以外のジョブを使用して問題を確認するには、コードを検証する必要があります。ジョブの作成からそのジョブの遅延されたスケジューリングまでの間に Google Cloud 環境の変更がある場合、そのジョブは検証の初期段階で成功する可能性がありますが、それでも開始時に失敗する可能性があります。
FlexRS を有効にする
FlexRS ジョブを作成すると、そのジョブのステータスがキューに格納済みであっても、同時ジョブの割り当てが取得されます。初期検証プロセスでは、他の割り当ての確認や予約は行われません。したがって、FlexRS を有効にする前に、ジョブを開始するために十分な Google Cloud プロジェクト リソースの割り当てがあることを確認してください。パブリック IP パラメータをオフにしない限り、割り当てにプリエンプティブル CPU、通常の CPU、IP アドレスの追加割り当てが含まれます。
十分な割り当てがない場合は、FlexRS ジョブのデプロイ時にアカウントに十分なリソースがない可能性があります。Dataflow では、デフォルトでワーカープール内の 90% のワーカーにプリエンプティブル VM が選択されます。CPU 割り当てを計画するときは、十分なプリエンプティブル VM 割り当てがあることを確認してください。デフォルトでは、プリエンプティブル VM 専用の割り当てはありません。明示的にプリエンプティブル VM 割り当てをリクエストする必要があります。そうしないと、FlexRS ジョブを遅延なく実行するためのリソースが不足します。
料金
FlexRS ジョブでは、次のリソースに対して請求されます。
- 通常の CPU とプリエンプティブル CPU
- メモリリソース
- Dataflow Shuffle リソース
- ワーカーあたり 25 GB の Persistent Disk リソース
Dataflow ではプリエンプティブル ワーカーと通常のワーカーの両方を使用して FlexRS ジョブを実行しますが、ワーカーのタイプにかかわらず料金は均一であり、通常の Dataflow の料金と比べると割安です。Dataflow Shuffle と Persistent Disk のリソースの料金は割引されません。
詳しくは、Dataflow の料金の詳細ページをご覧ください。
FlexRS の要件
FlexRS には以下の機能が必要です。
- Apache Beam SDK for Java 2.12.0 以降、Apache Beam SDK for Python 2.12.0 以降、任意のバージョンの Apache Beam SDK for Go。
- Dataflow Shuffle。FlexRS をオンにすると、Dataflow Shuffle も自動的に有効になります。
パイプライン オプション
Java
FlexRS ジョブを有効にするには、パイプライン オプション --flexRSGoal=COST_OPTIMIZED
を使用します。コスト最適化目標は、Dataflow サービスで使用可能な割引リソースを選択するか、実行時間を短縮するために最適化する --flexRSGoal=SPEED_OPTIMIZED
を選択します。
FlexRS ジョブは以下の実行パラメータに影響します。
numWorkers
は初期ワーカー数のみを設定します。ただし、コスト管理のためにmaxNumWorkers
を設定できます。autoscalingAlgorithm=NONE
は設定できません。- FlexRS ジョブには
zone
フラグを指定できません。Dataflow サービスでは、region
パラメータで指定したリージョン内のすべての FlexRS ジョブに対してゾーンを選択します。 region
として Dataflow リージョン エンドポイントを選択する必要があります。- デフォルトの
n1-standard-2
を使用するか、workerMachineType
に対してn1-highmem-16
を選択する必要があります。
次の例は、FlexRS を使用するために通常のパイプライン パラメータにパラメータを追加する方法を示しています。
--flexRSGoal=COST_OPTIMIZED \ --region=europe-west1 \ --maxNumWorkers=10 \ --workerMachineType=n1-highmem-16
region
、maxNumWorkers
、workerMachineType
を省略した場合、Dataflow サービスによりデフォルト値が決定されます。
Python
FlexRS ジョブを有効にするには、パイプライン オプション --flexrs_goal=COST_OPTIMIZED
を使用します。コスト最適化目標は、Dataflow サービスで使用可能な割引リソースを選択するか、実行時間を短縮するために最適化する --flexrs_goal=SPEED_OPTIMIZED
を選択します。
FlexRS ジョブは以下の実行パラメータに影響します。
num_workers
は初期ワーカー数のみを設定します。ただし、コスト管理のためにmax_num_workers
を設定できます。autoscaling_algorithm=NONE
は設定できません。- FlexRS ジョブには
zone
フラグを指定できません。Dataflow サービスでは、region
パラメータで指定したリージョン内のすべての FlexRS ジョブに対してゾーンを選択します。 region
として Dataflow リージョン エンドポイントを選択する必要があります。- デフォルトの
n1-standard-2
を使用するか、machine_type
に対してn1-highmem-16
を選択する必要があります。
次の例は、FlexRS を使用するために通常のパイプライン パラメータにパラメータを追加する方法を示しています。
--flexrs_goal=COST_OPTIMIZED \ --region=europe-west1 \ --max_num_workers=10 \ --machine_type=n1-highmem-16
region
、max_num_workers
、machine_type
を省略した場合、Dataflow サービスによりデフォルト値が決定されます。
Go
FlexRS ジョブを有効にするには、パイプライン オプション --flexrs_goal=COST_OPTIMIZED
を使用します。コスト最適化目標は、Dataflow サービスで使用可能な割引リソースを選択するか、実行時間を短縮するために最適化する --flexrs_goal=SPEED_OPTIMIZED
を選択します。
FlexRS ジョブは以下の実行パラメータに影響します。
num_workers
は初期ワーカー数のみを設定します。ただし、コスト管理のためにmax_num_workers
を設定できます。autoscaling_algorithm=NONE
は設定できません。- FlexRS ジョブには
zone
フラグを指定できません。Dataflow サービスでは、region
パラメータで指定したリージョン内のすべての FlexRS ジョブに対してゾーンを選択します。 region
として Dataflow リージョン エンドポイントを選択する必要があります。- デフォルトの
n1-standard-2
を使用するか、worker_machine_type
に対してn1-highmem-16
を選択する必要があります。
次の例は、FlexRS を使用するために通常のパイプライン パラメータにパラメータを追加する方法を示しています。
--flexrs_goal=COST_OPTIMIZED \ --region=europe-west1 \ --max_num_workers=10 \ --machine_type=n1-highmem-16
region
、max_num_workers
、machine_type
を省略した場合、Dataflow サービスによりデフォルト値が決定されます。
FlexRS ジョブのモニタリング
Google Cloud コンソールの 2 つの場所で、FlexRS ジョブのステータスをモニタリングできます。
- すべてのジョブが表示される [ジョブ] ページ。
- 送信したジョブの [Monitoring Interface] ページ。
[ジョブ] ページで、開始されていないジョブのステータスは [キューに格納済み] になります。

モニタリング インターフェース ページの [ジョブグラフ] タブに、キュー内で待機しているジョブが「ジョブ開始後にグラフが表示されます」というメッセージを表示します。
