Dataflow コンポーネント

Dataflow コンポーネントを使用すると、Apache Beam ジョブを Dataflow に送信して実行できます。Dataflow では、Job リソースは Dataflow ジョブを表します。

Google Cloud Pipeline Components SDK には、Job リソースを作成してその実行をモニタリングするための次の演算子が含まれています。

また、Google Cloud Pipeline Components SDK には WaitGcpResourcesOp コンポーネントが含まれています。このコンポーネントを使用すると、Dataflow ジョブの実行中にコストを削減できます。

DataflowFlexTemplateJobOp

DataflowFlexTemplateJobOp 演算子を使用すると、Vertex AI Pipelines コンポーネントを作成して Dataflow Flex テンプレートを起動できます。

Dataflow では、LaunchFlexTemplateParameter リソースは起動する Flex テンプレートを表します。このコンポーネントは LaunchFlexTemplateParameter リソースを作成し、テンプレートを起動してジョブを作成するよう Dataflow にリクエストします。テンプレートが正常に起動すると、Dataflow は Job リソースを返します。

Dataflow Flex テンプレート コンポーネントは、Dataflow から Job リソースを受け取ると終了します。コンポーネントは、シリアル化された gcp_resources protoとして job_id を出力します。このパラメータを WaitGcpResourcesOp コンポーネントに渡して、Dataflow ジョブが完了するまで待機できます。

DataflowPythonJobOp

The DataflowPythonJobOp 演算子を使用すると、Python ベースの Apache Beam ジョブを Dataflow に送信して、実行可能な Vertex AI Pipelines コンポーネントを作成できます。

Apache Beam ジョブの Python コードは Dataflow Runner で実行されます。Dataflow サービスでパイプラインを実行すると、ランナーは python_module_path パラメータで指定された場所に実行可能コードをアップロードし、依存関係を Cloud Storage バケット(temp_location で指定)にアップロードします。さらに、Google Cloud のマネージド リソースで Apache Beam パイプラインを実行する Dataflow ジョブを作成します。

Dataflow Runner の詳細については、Dataflow Runner の使用をご覧ください。

Dataflow Python コンポーネントは、Beam Runner を介して Apache Beam コードに渡される引数のリストを受け取ります。これらの引数は、args によって指定されます。これらの引数を使用すると、apache_beam.options.pipeline_options を設定してネットワーク、サブネットワーク、顧客管理の暗号鍵(CMEK)、Dataflow ジョブの実行時のその他のオプションを指定できます。

WaitGcpResourcesOp

Dataflow ジョブの完了までに時間がかかることがよくあります。このため、busy-wait コンテナ(Dataflow ジョブを起動して結果を待機するコンテナ)の費用が増加する可能性があります。

Beam ランナーを介して Dataflow ジョブを送信すると、DataflowPythonJobOp コンポーネントは直ちに終了し、job_id 出力パラメータをシリアル化された gcp_resources proto として返します。このパラメータを WaitGcpResourcesOp コンポーネントに渡して、Dataflow ジョブが完了するまで待機できます。

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines は、WaitGcpResourcesOp をサーバーレスで実行するように最適化するため、コストはかかりません。

DataflowPythonJobOpDataflowFlexTemplateJobOp が要件を満たしていない場合は、gcp_resources パラメータを出力して WaitGcpResourcesOp コンポーネントに渡す独自のコンポーネントを作成することもできます。

gcp_resources 出力パラメータの作成方法については、Google Cloud コンソール リンクを表示するコンポーネントを作成するをご覧ください。

API リファレンス

チュートリアル

変更履歴とリリースノート

Google Cloud パイプライン コンポーネント SDK の変更履歴と変更点については、Google Cloud パイプライン コンポーネント SDK リリースノートをご覧ください。

テクニカル サポートの連絡先

不明な点がございましたら、kubeflow-pipelines-components@google.com までお問い合わせください。