Dataflow コンポーネントを使用すると、Apache Beam ジョブを Dataflow に送信して実行できます。Dataflow では、Job
リソースは Dataflow ジョブを表します。
Google Cloud Pipeline Components SDK には、Job
リソースを作成してその実行をモニタリングするための次の演算子が含まれています。
また、Google Cloud Pipeline Components SDK には WaitGcpResourcesOp
コンポーネントが含まれています。このコンポーネントを使用すると、Dataflow ジョブの実行中にコストを削減できます。
DataflowFlexTemplateJobOp
DataflowFlexTemplateJobOp
演算子を使用すると、Vertex AI Pipelines コンポーネントを作成して Dataflow Flex テンプレートを起動できます。
Dataflow では、LaunchFlexTemplateParameter
リソースは起動する Flex テンプレートを表します。このコンポーネントは LaunchFlexTemplateParameter
リソースを作成し、テンプレートを起動してジョブを作成するよう Dataflow にリクエストします。テンプレートが正常に起動すると、Dataflow は Job
リソースを返します。
Dataflow Flex テンプレート コンポーネントは、Dataflow から Job
リソースを受け取ると終了します。コンポーネントは、シリアル化された gcp_resources
protoとして job_id
を出力します。このパラメータを WaitGcpResourcesOp
コンポーネントに渡して、Dataflow ジョブが完了するまで待機できます。
DataflowPythonJobOp
The DataflowPythonJobOp
演算子を使用すると、Python ベースの Apache Beam ジョブを Dataflow に送信して、実行可能な Vertex AI Pipelines コンポーネントを作成できます。
Apache Beam ジョブの Python コードは Dataflow Runner で実行されます。Dataflow サービスでパイプラインを実行すると、ランナーは python_module_path
パラメータで指定された場所に実行可能コードをアップロードし、依存関係を Cloud Storage バケット(temp_location
で指定)にアップロードします。さらに、Google Cloud のマネージド リソースで Apache Beam パイプラインを実行する Dataflow ジョブを作成します。
Dataflow Runner の詳細については、Dataflow Runner の使用をご覧ください。
Dataflow Python コンポーネントは、Beam Runner を介して Apache Beam コードに渡される引数のリストを受け取ります。これらの引数は、args
によって指定されます。これらの引数を使用すると、apache_beam.options.pipeline_options
を設定してネットワーク、サブネットワーク、顧客管理の暗号鍵(CMEK)、Dataflow ジョブの実行時のその他のオプションを指定できます。
WaitGcpResourcesOp
Dataflow ジョブの完了までに時間がかかることがよくあります。このため、busy-wait
コンテナ(Dataflow ジョブを起動して結果を待機するコンテナ)の費用が増加する可能性があります。
Beam ランナーを介して Dataflow ジョブを送信すると、DataflowPythonJobOp
コンポーネントは直ちに終了し、job_id
出力パラメータをシリアル化された gcp_resources
proto として返します。このパラメータを WaitGcpResourcesOp
コンポーネントに渡して、Dataflow ジョブが完了するまで待機できます。
dataflow_python_op = DataflowPythonJobOp( project=project_id, location=location, python_module_path=python_file_path, temp_location = staging_dir, requirements_file_path = requirements_file_path, args = ['--output', OUTPUT_FILE], ) dataflow_wait_op = WaitGcpResourcesOp( gcp_resources = dataflow_python_op.outputs["gcp_resources"] )
Vertex AI Pipelines は、WaitGcpResourcesOp
をサーバーレスで実行するように最適化するため、コストはかかりません。
DataflowPythonJobOp
と DataflowFlexTemplateJobOp
が要件を満たしていない場合は、gcp_resources
パラメータを出力して WaitGcpResourcesOp
コンポーネントに渡す独自のコンポーネントを作成することもできます。
gcp_resources
出力パラメータの作成方法については、Google Cloud コンソール リンクを表示するコンポーネントを作成するをご覧ください。
API リファレンス
コンポーネントのリファレンスについては、Dataflow コンポーネントの Google Cloud Pipeline コンポーネント SDK リファレンスをご覧ください。
Dataflow リソースのリファレンスについては、次の API リファレンス ページをご覧ください。
Job
リソース
チュートリアル
- Dataflow Flex テンプレート コンポーネントを使ってみる
- Dataflow Python ジョブ コンポーネントを使ってみる
- ネットワークとサブネットワークを指定する
- 顧客管理の暗号鍵(CMEK)の使用
変更履歴とリリースノート
Google Cloud パイプライン コンポーネント SDK の変更履歴と変更点については、Google Cloud パイプライン コンポーネント SDK リリースノートをご覧ください。
テクニカル サポートの連絡先
不明な点がございましたら、kubeflow-pipelines-components@google.com までお問い合わせください。