파이프라인 구성

Dataflow 프로그램은 데이터 처리 파이프라인을 처음부터 끝까지 표현합니다. 이 섹션에서는 Dataflow SDK의 클래스를 사용하여 파이프라인을 구성하는 방식에 대해 설명합니다. Dataflow SDK의 클래스를 사용하여 파이프라인을 구성하려면 프로그램에서 다음과 같은 일반 단계를 수행해야 합니다.

  • Pipeline 객체를 만듭니다.
  • Read 또는 Create 변환을 사용하여 파이프라인 데이터에 대한 PCollection을 한 개 이상 만듭니다.
  • PCollection변환을 적용합니다. 변환은 PCollection의 요소를 변경, 필터링, 그룹화, 분석하거나 기타 방식으로 처리할 수 있습니다. 각 변환은 새 출력 PCollection을 생성하며, 처리가 완료될 때까지 여기에 추가 변환을 적용할 수 있습니다.
  • 최종적으로 변환된 PCollection작성하거나 기타 방식으로 출력합니다.
  • 파이프라인을 실행합니다.

각 일반 단계를 보여주는 전체 예제는 아래의 간단한 예제 파이프라인을 참조하세요.

파이프라인 객체 만들기

Dataflow 프로그램은 대개 Pipeline 객체를 만드는 것으로 시작됩니다.

Dataflow SDK에서 각 파이프라인은 Pipeline 유형의 명시적 객체로 표현됩니다. 각 Pipeline 객체는 파이프라인이 작업을 수행하는 데이터와 해당 데이터에 적용되는 변환을 모두 캡슐화하는 독립된 항목입니다.

자바

파이프라인을 만들려면 Pipeline 객체를 선언하고 이 객체에 몇 가지 구성 옵션을 전달합니다. 구성 옵션을 전달하려면 PipelineOptions 유형의 객체를 생성합니다. 이는 정적 메소드 PipelineOptionsFactory.create()를 사용하여 빌드할 수 있습니다.

// Start by defining the options for the pipeline.
PipelineOptions options = PipelineOptionsFactory.create();

// Then create the pipeline.
Pipeline p = Pipeline.create(options);

파이프라인 옵션 구성

파이프라인 옵션을 사용하여 파이프라인의 여러 측면을 구성할 수 있습니다. 여기에는 다음이 포함될 수 있습니다.

  • 파이프라인이 실행되는 위치
  • 파이프라인 작업이 파일을 준비하는 위치
  • 파이프라인과 연결되는 Cloud Platform 프로젝트
  • 파이프라인이 작업자로 사용하는 Compute Engine 인스턴스 수

파이프라인 옵션 속성에는 프로젝트 ID 및 Cloud Storage 스테이징 위치와 같이 Cloud Dataflow 서비스에 필요한 Cloud Platform 프로젝트에 대한 정보가 포함되어 있습니다. 또한 파이프라인 옵션을 사용하면 Dataflow 서비스가 파이프라인 작업에 할당해야 하는 작업자 수와 파이프라인 작업의 상태 메시지를 전달할 위치를 제어할 수도 있습니다.

파이프라인이 실행되는 위치(Cloud Dataflow 서비스 또는 로컬)를 결정하는 파이프라인 옵션의 주요 속성은 파이프라인 실행자입니다. 파이프라인 실행자 속성은 파이프라인이 비동기식 또는 블로킹식으로 실행되는지 여부도 지정합니다.

자바

setter 메소드(PipelineOptions.set[OptionName])를 사용하여 파이프라인 프로그램 내에서 PipelineOptions 객체의 속성을 직접 설정할 수 있지만 명령줄 옵션을 사용하여 값을 전달하는 것이 가장 좋습니다. 자바용 Dataflow SDK는 파이프라인에 전달된 명령줄 옵션을 파싱하고 유효성 검사하는 PipelineOptionsFactory 클래스를 제공합니다. 명령줄 옵션을 사용하여 런타임에서 PipelineRunnerPipelineOptions의 기타 필드를 결정하면 동일한 코드를 사용하여 로컬 및 클라우드에서 파이프라인을 구성하고 실행할 수 있습니다.

클라우드 또는 로컬 모드 실행을 위해 파이프라인 옵션을 프로그래밍 방식으로 설정하는 방법에 대한 자세한 내용은 실행 매개변수 지정을 참조하세요. WordCount 예제 파이프라인에서는 또한 명령줄 옵션을 사용하여 런타임에 파이프라인 옵션을 설정하는 방법도 보여줍니다.

파이프라인으로 데이터 읽어오기

파이프라인의 초기 PCollection을 만들려면 파이프라인 객체에 루트 변환을 적용합니다. 루트 변환은 외부 데이터 소스 또는 지정한 일부 로컬 데이터에서 PCollection을 만듭니다.

자바

Dataflow 자바 SDK에는 두 가지 종류의 루트 변환인 ReadCreate가 있습니다. Read 변환은 외부 소스(예: BigQuery 또는 Google Cloud Storage의 텍스트 파일)에서 데이터를 읽습니다. Create 변환은 PCollection을 메모리 내 java.util.Collection에서 만듭니다.

다음 예제 코드는 TextIO.Read 루트 변환을 apply하여 Google Cloud Storage의 텍스트 파일에서 데이터를 읽는 방법을 보여줍니다. 변환은 Pipeline 객체 p에 적용되며 파이프라인 데이터 세트를 PCollection<String> 형식으로 반환합니다.

PCollection<String> lines = p.apply(
  TextIO.Read.named("ReadMyFile").from("gs://some/inputData.txt"));

프로세스 파이프라인 데이터에 변환 적용

파이프라인에서 변환을 사용하려면 변환할 PCollection에 변환을 적용합니다.

자바

변환을 적용하려면 처리할 각 PCollection에서 apply 메소드를 호출하고, 원하는 변환 객체를 인수로 전달합니다.

Dataflow SDK에는 파이프라인의 PCollection에 적용할 수 있는 다양한 변환이 포함되어 있습니다. 여기에는 ParDo 또는 Combine과 같은 범용 핵심 변환이 포함됩니다. SDK에는 미리 작성된 복합 변환도 포함되어 있습니다. 이는 핵심 변환 하나 이상을 유용한 처리 패턴으로 결합한 것입니다(예: 컬렉션에 있는 요소를 계수 또는 결합). 파이프라인의 사용 사례에 맞게 나만의 더 복잡한 복합 변환을 정의할 수도 있습니다.

자바

Dataflow 자바 SDK에서 각 변환은 기본 클래스 PTransform의 하위 클래스입니다. PCollection에서 apply를 호출하면 사용할 PTransform을 인수로 전달합니다.

다음 코드는 문자열의 PCollection에 변환을 apply하는 방법을 보여줍니다. 변환은 사용자 정의된 커스텀 변환이며, 각 문자열의 내용을 역전하고 역전된 문자열을 포함하는 새로운 PCollection을 출력합니다.

입력은 words라는 PCollection<String>입니다. 이 코드는 ReverseWords라는 PTransform 객체의 인스턴스를 apply에 전달하고 반환 값을 reversedWords라는 PCollection<String>으로 저장합니다.

PCollection<String> words = ...;

PCollection<String> reversedWords = words.apply(new ReverseWords());

최종 파이프라인 데이터 쓰기 또는 출력

일반적으로 파이프라인이 모든 변환을 적용하면 결과를 출력해야 합니다. 파이프라인의 최종 PCollection을 출력하려면 해당 PCollectionWrite 변환을 적용합니다. Write 변환은 PCollection의 요소를 Google Cloud Storage의 파일 또는 BigQuery 테이블과 같은 외부 데이터 싱크에 출력할 수 있습니다. 일반적으로 파이프라인의 끝에서 데이터를 출력하지만 Write를 사용하여 언제든 파이프라인에서 PCollection을 출력할 수 있습니다.

자바

다음 예제 코드는 TextIO.Write 변환을 apply하여 Google Cloud Storage의 텍스트 파일에 StringPCollection을 쓰는 방법을 보여줍니다.

PCollection<String> filteredWords = ...;
filteredWords.apply(TextIO.Write.named("WriteMyFile").to("gs://some/outputData.txt"));

파이프라인 실행

파이프라인을 구성한 후에는 run 메소드를 사용하여 파이프라인을 실행합니다. 파이프라인은 비동기식으로 실행됩니다. 생성된 프로그램에서 파이프라인 사양을 파이프라인 실행자로 보내면 실제 일련의 파이프라인 작업이 구성 및 실행됩니다. 파이프라인 실행 시, 테스트 및 디버깅을 위해 로컬에서 실행하거나 Cloud Dataflow 관리형 서비스에서 실행할 수 있습니다. 파이프라인 실행자, 파이프라인 옵션 구성, 로컬 및 클라우드 실행에 대한 자세한 내용은 실행 매개변수 지정을 참조하세요.

Dataflow SDK에서는 Pipeline 객체를 만들 때 파이프라인 옵션에 PipelineRunner를 지정합니다. 파이프라인 구성을 완료했으면 다음과 같이 파이프라인 객체에서 run을 호출합니다.

자바

p.run();
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.