Dataflow 구성요소

Dataflow 구성요소를 사용하면 Apache Beam 작업을 실행할 Dataflow에 제출할 수 있습니다. Dataflow에서 Job 리소스는 Dataflow 작업을 나타냅니다.

Google Cloud 파이프라인 구성요소 SDK에는 Job 리소스를 만들고 실행을 모니터링하기 위한 다음 연산자가 포함됩니다.

또한 Google Cloud 파이프라인 구성요소 SDK에는 WaitGcpResourcesOp 구성요소가 포함되어 있으며 Dataflow 작업을 실행하는 동안 이 구성요소를 사용하여 비용을 줄일 수 있습니다.

DataflowFlexTemplateJobOp

DataflowFlexTemplateJobOp 연산자를 사용하면 Dataflow Flex 템플릿을 실행할 Vertex AI Pipelines 구성요소를 만들 수 있습니다.

Dataflow에서 LaunchFlexTemplateParameter 리소스는 실행할 Flex 템플릿을 나타냅니다. 이 구성요소는 LaunchFlexTemplateParameter 리소스를 만든 후 Dataflow에 템플릿을 실행하여 작업을 만들도록 요청합니다. 템플릿이 성공적으로 실행되면 Dataflow는 Job 리소스를 반환합니다.

Dataflow Flex 템플릿 구성요소는 Dataflow에서 Job 리소스를 수신하면 종료됩니다. 이 구성요소는 job_id직렬화된 gcp_resources proto로 출력합니다. 개발자는 이 매개변수를 WaitGcpResourcesOp 구성요소에 전달하여 Dataflow 작업이 완료될 때까지 기다릴 수 있습니다.

DataflowPythonJobOp

DataflowPythonJobOp 연산자를 사용하면 Python 기반 Apache Beam 작업을 실행할 Dataflow에 제출하여 데이터를 준비하는 Vertex AI Pipelines 구성요소를 만들 수 있습니다.

Apache Beam 작업의 Python 코드는 Dataflow Runner에서 실행됩니다. Dataflow 서비스로 파이프라인을 실행할 때 실행기는 실행 가능한 코드를 python_module_path 매개변수로 지정한 장소로 업로드하고 종속 항목을 Cloud Storage 버킷(temp_location으로 지정됨)에 업로드한 다음, Google Cloud의 관리형 리소스에서 Apache Beam 파이프라인을 실행하는 Dataflow 작업을 만듭니다.

Dataflow Runner에 대해 자세히 알아보려면 Dataflow Runner 사용을 참조하세요.

Dataflow Python 구성요소에는 Beam Runner를 사용해 Apache Beam 코드로 전달되는 인수 목록이 사용됩니다. 이러한 인수는 args로 지정됩니다. 예를 들어 이러한 인수를 사용하여 Dataflow 작업을 실행할 때 네트워크, 서브네트워크, 고객 관리 암호화 키(CMEK), 기타 옵션을 지정하도록 apache_beam.options.pipeline_options를 설정할 수 있습니다.

WaitGcpResourcesOp

Dataflow 작업은 종종 완료하는 데 시간이 오래 걸릴 수 있습니다. busy-wait 컨테이너(Dataflow 작업을 실행하고 결과를 기다리는 컨테이너)의 비용이 높을 수도 있습니다.

Beam 실행기를 사용해 Dataflow 작업을 제출한 후 DataflowPythonJobOp 구성요소는 즉시 종료되고 직렬화된 gcp_resources proto로서 job_id 출력 매개변수를 반환합니다. 개발자는 이 매개변수를 WaitGcpResourcesOp 구성요소에 전달하여 Dataflow 작업이 완료될 때까지 기다릴 수 있습니다.

    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
  
    dataflow_wait_op =  WaitGcpResourcesOp(
        gcp_resources = dataflow_python_op.outputs["gcp_resources"]
    )

Vertex AI Pipelines는 서버 리스 방식으로 실행하도록 WaitGcpResourcesOp를 최적화하고, 제로 비용을 포함합니다.

DataflowPythonJobOpDataflowFlexTemplateJobOp가 요구사항을 충족하지 않으면 gcp_resources 매개변수를 출력하는 자체 구성요소를 만들고 이를 WaitGcpResourcesOp 구성요소에 전달할 수도 있습니다.

gcp_resources 출력 매개변수를 만드는 방법에 대한 자세한 내용은 Google Cloud 콘솔 링크를 표시하는 구성요소 작성을 참조하세요.

API 참조

튜토리얼

버전 기록 및 출시 노트

Google Cloud 파이프라인 구성요소 SDK의 버전 기록 및 변경사항을 자세히 알아보려면 Google Cloud 파이프라인 구성요소 SDK 출시 노트를 참조하세요.

기술 지원 담당자

궁금한 점이 있으면 kubeflow-pipelines-components@google.com으로 문의해 주세요.