Dataflow コンポーネント

DataflowPythonJobOp を使用すると、Python で記述された Apache Beam ジョブを Dataflow に送信して、実行可能な Vertex AI Pipelines コンポーネントを作成できます。

Python Beam コードは Dataflow Runner で実行されます。Dataflow サービスでパイプラインを実行すると、ランナーは実行可能コード(python_module_path で指定)のパラメータと依存関係を Cloud Storage バケット(temp_location で指定)にアップロードします。さらに、Google Cloud のマネージド リソースで Apache Beam パイプラインを実行する Dataflow ジョブを作成します。

Dataflow Runner の詳細については、Dataflow Runner の使用をご覧ください。

Dataflow Python コンポーネントは、Beam Runner を介して Apache Beam コードに渡される引数のリストを受け取ります。これらの引数は、args によって指定されます。これらの引数を使用すると、apache_beam.options.pipeline_options を設定してネットワーク、サブネットワーク、顧客管理の暗号鍵(CMEK)、Dataflow ジョブの実行時のその他のオプションを指定できます。

WaitGcpResourcesOp

Dataflow ジョブの完了までに時間がかかることがよくあります。このため、busy-wait コンテナ(Dataflow ジョブを起動して結果を待機するコンテナ)の費用が増加する可能性があります。

Beam ランナーを介して Dataflow ジョブを送信すると、DataflowPythonJobOp コンポーネントはすぐに終了します。コンポーネントからシリアル化された gcp_resources proto として job_id が返されます。この出力を WaitGcpResourcesOp コンポーネントに渡して、Dataflow ジョブの完了を待機できます。

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines は、WaitGcpResourcesOp をサーバーレスで実行するように最適化するため、コストはかかりません。

DataflowPythonJobOp が要件を満たしていない場合(たとえば、Beam ランナーを呼び出す前に前処理が必要な場合や、Beam Java のような別の言語を使用する場合)は、gcp_resources パラメータを出力する独自のコンポーネントを作成して、WaitGcpResourcesOp コンポーネントを利用することもできます。gcp_resources 出力パラメータの作成方法については、Kubeflow Pipelines リポジトリの GCP Resource Proto セクションをご覧ください。

API リファレンス

コンポーネントのリファレンスについては、google_cloud_pipeline_components SDK リファレンスをご覧ください。

チュートリアル

バージョン履歴と変更履歴

日付 バージョン メモ
11/2021 GCPC v0.2 コンポーネントの試験運用版リリース。

テクニカル サポートの連絡先

ご不明な点がございましたら、kubeflow-pipelines-components@google.com にお問い合わせください。