Dataflow components

The DataflowPythonJobOp allows you to create a Vertex AI Pipelines component that prepares data by submitting an Apache Beam job written in Python to Dataflow for execution.

The Python Beam code is run with Dataflow Runner. When you run your pipeline with the Dataflow service, the runner uploads your executable code (specified by python_module_path) parameter and dependencies to a Cloud Storage bucket (specified by temp_location) and creates a Dataflow job which executes your apache beam pipeline on managed resources in Google Cloud.

To learn more about the Dataflow Runner, see Using the Dataflow Runner.

The Dataflow Python component accepts a list of arguments that are passed via the Beam Runner to your Apache Beam code. These arguments are specified by args. For example, you can use these arguments to set the apache_beam.options.pipeline_options to specify a network, a subnetwork, customer-managed encryption key (CMEK), and other options when you run Dataflow jobs.

WaitGcpResourcesOp

Dataflow jobs can often take long time to complete. The costs of a busy-wait container - the container that launches Dataflow job and wait for the result - can become expensive.

Instead, after submitting the Dataflow job via the Beam runner, the DataflowPythonJobOp component will terminate immediately. A job_id is returned by the component as a serialized gcp_resources proto. This output can be passed on to a WaitGcpResourcesOp component to wait for the Dataflow job completion.

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines optimizes the WaitGcpResourcesOp to execute it in a serverless fashion, and has zero cost.

If DataflowPythonJobOp doesn't meet your requirements (for example, if you need to preprocess before calling Beam runner, or uses a different language like Beam Java), you can also create your own component that outputs the gcp_resources parameter, to leverage the WaitGcpResourcesOp component. For details on how to create gcp_resources output parameter, see the GCP Resource Proto section of the Kubeflow Pipelines repository.

API reference

For component reference, see the google_cloud_pipeline_components SDK reference.

Tutorials

Version history and changelog

Date Version Notes
11/2021 GCPC v0.2 Experimental release of the components.

Technical Support Contacts

If you have any questions, please reach out to kubeflow-pipelines-components@google.com.