Dataflow components

Stay organized with collections Save and categorize content based on your preferences.

The DataflowPythonJobOp allows you to create a Vertex AI Pipelines component that prepares data by submitting an Apache Beam job written in Python to Dataflow for execution.

The Python Beam code is run with Dataflow Runner. When you run your pipeline with the Dataflow service, the runner uploads your executable code (specified by python_module_path) parameter and dependencies to a Cloud Storage bucket (specified by temp_location) and creates a Dataflow job that executes your Apache Beam pipeline on managed resources in Google Cloud.

To learn more about the Dataflow Runner, see Using the Dataflow Runner.

The Dataflow Python component accepts a list of arguments that are passed via the Beam Runner to your Apache Beam code. These arguments are specified by args. For example, you can use these arguments to set the apache_beam.options.pipeline_options to specify a network, a subnetwork, customer-managed encryption key (CMEK), and other options when you run Dataflow jobs.

WaitGcpResourcesOp

Dataflow jobs can often take long time to complete. The costs of a busy-wait container - the container that launches Dataflow job and wait for the result - can become expensive.

Instead, after submitting the Dataflow job via the Beam runner, the DataflowPythonJobOp component will terminate immediately. A job_id is returned by the component as a serialized gcp_resources proto. This output can be passed on to a WaitGcpResourcesOp component to wait for the Dataflow job completion.

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines optimizes the WaitGcpResourcesOp to execute it in a serverless fashion, and has zero cost.

If DataflowPythonJobOp doesn't meet your requirements (for example, if you need to preprocess before calling Beam Runner, or uses a different language like Beam Java), you can also create your own component that outputs the gcp_resources parameter, to leverage the WaitGcpResourcesOp component. For details on how to create gcp_resources output parameter, see Write a component to show a Google Cloud console link.