実行中のパイプラインの停止

Dataflow ジョブは削除できません。停止のみ可能です。

Dataflow ジョブを停止するには、Google Cloud ConsoleCloud ShellCloud SDK がインストールされているローカル ターミナル、または Dataflow REST API のいずれかを使用します。

次の 2 つの方法で Dataflow ジョブを停止できます。

  • ジョブをキャンセルする。この方法は、ストリーミング パイプラインとバッチ パイプラインの両方に適用されます。ジョブをキャンセルすると、Dataflow サービスはバッファデータなどのデータの処理を停止します。詳細については、ジョブをキャンセルするをご覧ください。

  • ジョブをドレインする。この方法は、ストリーミング パイプラインにのみ適用されます。ジョブをドレインすると、Dataflow サービスはバッファ内のデータの処理を完了すると同時に、新しいデータの取り込みを中止できます。詳細については、ジョブをドレインするをご覧ください。

ジョブをキャンセルする

ジョブをキャンセルすると、Dataflow サービスはジョブを直ちに停止します。

ジョブをキャンセルすると、次の処理が行われます。

  1. Dataflow サービスが、すべてのデータの取り込みと処理を停止します。

  2. Dataflow サービスが、ジョブに接続している Google Cloud リソースのクリーンアップを開始します。

    これらのリソースには、Compute Engine ワーカー インスタンスのシャットダウンと、I/O ソースまたはシンクへのアクティブな接続の終了が含まれることがあります。

ジョブのキャンセルに関する重要な情報

  • ジョブをキャンセルすると、パイプラインの処理が直ちに停止します。

  • ジョブのキャンセルにより、処理中のデータが失われる可能性があります。処理中のデータとは、すでに読み込まれているがパイプラインによって処理中のデータを指します。

  • ジョブをキャンセルする前にパイプラインから出力シンクに書き込まれたデータは、出力シンクでまだアクセス可能な場合があります。

  • データ損失が問題にならない場合は、ジョブをキャンセルすることで、ジョブに関連する Google Cloud リソースをできるだけ早くシャットダウンするようにします。

ジョブをドレインする

ジョブをドレインすると、Dataflow サービスは現在の状態でジョブを終了します。ストリーミング パイプラインの停止時にデータの消失を防ぐには、ジョブのドレインが最適な方法です。

ジョブをドレインすると、次の処理が行われます。

  1. ドレイン リクエストの受信後すぐに(通常、数分以内)入力ソースからの新しいデータの取り込みを停止します。

  2. Dataflow サービスは、既存のリソース(ワーカー インスタンスなど)を保持し、パイプラインのバッファ内のデータの処理と書き込みを終了します。

  3. 保留中の処理と書き込みオペレーションがすべて完了すると、Dataflow サービスはジョブに関連付けられた Google Cloud リソースをシャットダウンします。

ジョブのドレインに関する重要な情報

  • バッチ パイプラインではジョブのドレインはサポートされていません。

  • すべての処理と書き込みが完了するまで、パイプラインには、関連付けられた Google Cloud リソースの維持コストが発生します。

  • ストリーミング パイプライン コードにループタイマーが含まれている場合、ジョブをドレインできません。

  • ストリーミング パイプラインに Splittable DoFn が含まれている場合、ドレイン オプションの実行前に結果を切り捨てる必要があります。Splittable DoFn の切り捨ての詳細については、Apache Beam のドキュメントをご覧ください。

  • ドレイン中のパイプラインは更新できます。

  • パイプラインに大量のバッファ内データがある場合など、ジョブのドレインが完了するまでに非常に時間がかかることがあります。

  • ドレイン中のジョブをキャンセルできます。

  • 場合によっては、Dataflow ジョブでドレイン オペレーションを完了できないことがあります。ジョブのログを確認して根本原因を特定し、適切な措置を講じることができます。

ジョブのドレインの影響

ストリーミング パイプラインをドレインすると、Dataflow は処理中のウィンドウを直ちに終了し、すべてのトリガーを呼び出します。

システムは、ドレイン オペレーションで未処理の時間ベースのウィンドウの終了を待機しません

たとえば、ジョブのドレイン時にパイプラインが 2 時間のウィンドウまで残り 10 分の場合、Dataflow はウィンドウ終了までの残り時間を待機しません。部分的な結果でウィンドウを即時に終了します。Dataflow は、システム ウォーターマークを無期限まで進めることで、開いているウィンドウを終了します。この機能は、カスタム データソースでも機能します。

カスタム データソース クラスを使用するパイプラインをドレインすると、Dataflow は新規データのリクエスト発行を停止し、システム ウォーターマークを無期限まで進めて、最後のチェックポイントでソースの finalize() メソッドを呼び出します。

Google Cloud Console で、パイプラインの変換の詳細を確認できます。次の図は、処理中のドレイン オペレーションの影響を示しています。ウォーターマークは最大値まで進められていることに注意してください。

ドレイン オペレーションのステップビュー。

図 1. ドレイン オペレーションのステップビュー。

ジョブを停止する

ジョブを停止する前に、ジョブのキャンセルまたはドレインの影響を理解する必要があります。

Console

  1. Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. 停止するジョブをクリックします。

    ジョブを停止するには、ジョブのステータスが「実行中」でなければなりません。

  3. ジョブの詳細ページで、[停止] をクリックします。

  4. 次のいずれかを行います。

    • バッチ パイプラインの場合は、[キャンセル] をクリックします。

    • ストリーミング パイプラインの場合は、[キャンセル] または [ドレイン] をクリックします。

  5. 選択を確定するには、[ジョブの停止] をクリックします。

gcloud

Dataflow ジョブをドレインまたはキャンセルするには、Cloud Shell または Cloud SDK がインストールされたローカル ターミナルで gcloud dataflow jobs コマンドを使用します。

  1. シェルにログインします。

  2. 現在実行中の Dataflow ジョブのジョブ ID を一覧表示し、停止するジョブのジョブ ID をメモします。

    gcloud dataflow jobs list
    

    --region フラグが設定されていない場合、使用可能なすべてのリージョンの Dataflow ジョブが表示されます。

  3. 次のいずれかを行います。

    • ストリーミング ジョブをドレインするには:

       gcloud dataflow jobs drain JOB_ID
      

      JOB_ID を、先ほどコピーしたジョブ ID に置き換えます。

    • バッチまたはストリーミング ジョブをキャンセルするには:

      gcloud dataflow jobs cancel JOB_ID
      

      JOB_ID を、先ほどコピーしたジョブ ID に置き換えます。

API

Dataflow REST API を使用してジョブをキャンセルまたはドレインするには、projects.locations.jobs.update または projects.jobs.update を選択します。リクエストの本文で、選択した API のジョブ インスタンスの requestedState フィールドに必要なジョブの状態を渡します。

  • ジョブをキャンセルするには、ジョブのステータスを JOB_STATE_CANCELLED に設定します。

  • ジョブをドレインするには、ジョブのステータスを JOB_STATE_DRAINED に設定します。

次のステップ