PCollection

Dataflow SDK는 파이프라인의 데이터를 나타내기 위해 PCollection이라는 특수 클래스를 사용합니다. PCollection은 다중 요소 데이터 세트를 나타냅니다.

PCollection을 '파이프라인' 데이터라고 생각하면 됩니다. Dataflow의 변환PCollection을 입력 및 출력으로 사용합니다. 즉, 파이프라인 내의 데이터로 작업하려면 데이터가 PCollection 형식이어야 합니다. 각 PCollection은 특정 Pipeline 객체가 소유하며 해당 Pipeline 객체만 이를 사용할 수 있습니다.

중요: 이 문서에는 제한되지 않은 PCollection윈도우에 대한 정보가 포함되어 있습니다. 이러한 개념은 Dataflow 자바 SDK에만 적용되며 Dataflow Python SDK에서는 아직 적용되지 않습니다.

PCollection 특성

PCollection은 잠재적으로 큰 불변 요소의 '묶음'을 나타냅니다. PCollection에 포함할 수 있는 요소 수에 제한은 없습니다. PCollection은 메모리 크기에 맞출 수 있고 영구 데이터 저장소에 백업된 대용량 데이터 세트를 나타낼 수도 있습니다.

자바

어떠한 유형도 PCollection의 요소가 될 수 있지만 모든 요소는 동일한 유형이어야 합니다. 단 Dataflow는 분산 처리를 지원하기 위해 개별 요소를 바이트 문자열로 인코딩할 수 있어야 합니다. Dataflow SDK는 일반적으로 사용되는 유형에 대한 내장형 인코딩을 포함한 데이터 인코딩 메커니즘을 제공하며 필요에 따라 커스텀 인코딩을 지정할 수 있도록 지원합니다. 임의의 유형에 유효한 인코딩을 작성하는 것은 어려울 수 있지만 단순 구조화 유형에 대한 커스텀 인코딩을 구성할 수 있습니다.

대규모 데이터 처리에서 중요한 데이터 유형은 키-값 쌍입니다. Dataflow SDK는 키-값 쌍을 나타내기 위해 KV<K, V> 클래스를 사용합니다.

Python

대규모 데이터 처리에서 중요한 데이터 유형은 키-값 쌍입니다. Dataflow Python SDK는 키-값 쌍을 나타내기 위해 2-튜플을 사용합니다.

PCollection 제한사항

PCollection에는 일반 컬렉션 클래스와 다른 몇 가지 주요 사항이 있습니다.

  • PCollection은 변경 불가능합니다. 한 번 생성한 다음에는 개별 요소를 추가, 제거, 변경할 수 없습니다.
  • PCollection은 개별 요소에 대한 무작위 액세스를 지원하지 않습니다.
  • PCollection은 자신이 생성된 파이프라인에 종속됩니다. Pipeline 객체 간에 PCollection을 공유할 수 없습니다.

PCollection은 기존 저장소의 데이터로 물리적으로 백업되거나 아직 계산하지 않은 데이터를 나타낼 수 있습니다. 따라서 PCollection의 데이터는 변경 불가능합니다. PCollection은 새 파이프라인 데이터를 새 PCollection으로 생성하는 계산에 사용할 수 있습니다. 단, 일단 생성한 기존 PCollection의 요소는 변경할 수 없습니다.

PCollection은 엄밀히 말하자면 데이터를 저장하지 않습니다. PCollection에는 요소가 너무 많아 Dataflow 프로그램을 실행하는 로컬 메모리로 저장할 수 없을 수 있습니다. PCollection을 생성하거나 변환할 때 데이터는 일반적인 컨테이너 클래스처럼 복사되거나 메모리로 이동되지 않습니다. 대신 PCollection은 클라우드에서 잠재적으로 대용량인 데이터 세트를 나타냅니다.

제한된 PCollection과 제한되지 않은 PCollection

PCollection의 크기는 제한되거나 제한되지 않을 수 있으며, 제한 또는 무제한 여부는 PCollection 생성 시에 결정됩니다. 일부 루트 변환은 제한적인 PCollections을 생성하고, 다른 루트 변환은 제한되지 않은 PCollections을 생성합니다. 이 차이는 입력 데이터의 소스에 따라 결정됩니다.

제한된 PCollection

PCollection은 변경되지 않는 알려진 크기를 가진 고정 데이터 세트를 나타내면 제한적입니다. 고정 데이터 세트의 예로는 '10월 서버 로그' 또는 '지난 주에 처리한 모든 주문'이 있습니다. TextIOBigQueryIO 루트 변환은 제한된 PCollection을 만듭니다.

제한적인 PCollection을 생성하는 데이터 소스는 다음과 같습니다.

자바

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • 커스텀 소스 API를 사용하여 생성하는 제한된 커스텀 데이터 소스

Python

  • TextIO
  • BigQueryIO
  • 커스텀 소스 API를 사용하여 생성하는 제한된 커스텀 데이터 소스

제한된 PCollection을 허용하는 데이터 싱크는 다음과 같습니다.

자바

  • TextIO
  • BigQueryIO
  • DatastoreIO
  • 커스텀 싱크 API를 사용하여 생성하는 제한된 커스텀 데이터 싱크

Python

  • TextIO
  • BigQueryIO
  • 커스텀 싱크 API를 사용하여 생성하는 제한된 커스텀 데이터 싱크

제한되지 않은 PCollection

PCollection지속적으로 업데이트되는 데이터 세트 또는 스트리밍 데이터를 나타내면 제한되지 않습니다. 지속적으로 업데이트되는 데이터 세트의 예에는 '생성된 서버 로그' 또는 '처리되는 모든 새로운 주문'이 있습니다. PubsubIO 루트 변환은 제한되지 않은 PCollection을 만듭니다.

일부 소스, 특히 제한되지 않은 PCollection을 생성하는 소스(예: PubsubIO)는 컬렉션의 각 요소에 타임스탬프를 자동으로 추가합니다.

제한되지 않은 PCollection을 생성하는 데이터 소스는 다음과 같습니다.

  • PubsubIO
  • 커스텀 소스 API를 사용하여 생성하는 제한되지 않은 커스텀 데이터 소스

제한되지 않은 PCollection을 허용하는 데이터 싱크는 다음과 같습니다.

  • PubsubIO
  • BigQueryIO

처리 특성

PCollection의 제한된(또는 제한되지 않은) 특성은 Dataflow가 데이터를 처리하는 방법에 영향을 줍니다. 제한된 PCollection은 전체 데이터 세트를 한 번 읽고 유한 작업으로 처리를 수행하는 배치 작업을 사용하여 처리할 수 있습니다. 제한되지 않은 PCollection은 전체 컬렉션을 하나의 시점에서 처리할 수 없으므로 스트리밍 작업으로 처리해야 합니다.

제한되지 않은 PCollection을 그룹화하는 경우, 지속적으로 업데이트되는 데이터 세트를 유한한 크기의 논리적 윈도우로 분할하기 위해 Dataflow에는 윈도우라는 개념이 필요합니다. Dataflow는 각 기간을 번들로 처리하며 데이터 세트가 생성될 때 처리는 지속됩니다. 자세한 내용은 타임스탬프 및 기간 설정 섹션을 참조하세요.

PCollection 요소 타임스탬프

PCollection의 각 요소에는 연결된 타임스탬프가 있습니다. 타임스탬프는 고유한 시간 개념이 있는 요소를 포함하는 PCollection에 유용합니다. 예를 들어 처리할 주문의 PCollection은 주문이 생성된 시간을 요소 타임스탬프로 사용할 수 있습니다.

각 요소의 타임스탬프는 PCollection을 생성하는 소스에서 처음 지정됩니다. 제한되지 않은 PCollection을 생성하는 소스는 종종 제한되지 않은 PCollection에 새 요소가 추가되었을 때 이 새 요소 각각에 타임스탬프를 지정합니다.

자바

BigQueryIO 또는 TextIO와 같이 고정 데이터 세트를 제공하는 데이터 소스도 각 요소에 타임스탬프를 지정합니다. 단, 이러한 데이터 소스는 일반적으로 각 요소에 동일한 타임스탬프(Long.MIN_VALUE)를 지정합니다.

PCollection의 요소에 타임스탬프를 수동으로 지정할 수 있습니다. 이는 요소에 고유 타임스탬프가 있을 경우 일반적으로 수행하는 작업이지만 해당 타임스탬프를 요소 구조 밖으로 파싱하는 등의 방법으로 계산해야 합니다. 타임스탬프를 수동으로 지정하려면 ParDo 변환을 사용합니다. ParDo 변환 내에서 DoFn은 타임스탬프가 있는 출력 요소를 생성할 수 있습니다. 자세한 내용은 타임스탬프 지정을 참조하세요.

Python

PCollection의 요소에 타임스탬프를 수동으로 지정할 수 있습니다. 이는 요소에 고유 타임스탬프가 있을 경우 일반적으로 수행하는 작업이지만 해당 타임스탬프를 요소 구조 밖으로 파싱하는 등의 방법으로 계산해야 합니다. 타임스탬프를 수동으로 지정하려면 ParDo 변환을 사용합니다. ParDo 변환 내에서 DoFn은 타임스탬프가 있는 출력 요소를 생성할 수 있습니다.

기간 설정

PCollection의 각 요소에 연결된 타임스탬프는 윈도우라는 개념에 사용됩니다. 기간 설정은 PCollection의 요소를 타임스탬프에 따라 분할합니다. 윈도우는 모든 PCollection에 사용할 수 있습니다. 하지만 제한되지 않은 PCollection에서는 지속적인 데이터 스트림을 유한한 단위로 분할해 처리해야 하므로, 이를 계산하기 위해 윈도우가 필요합니다.

파이프라인에서 Dataflow의 기간 설정 개념을 사용하는 방법에 대한 자세한 내용은 기간 설정 섹션을 참조하세요.

PCollection 생성

Cloud Dataflow 파이프라인 내의 데이터 세트로 작업하려면 저장된 위치에 상관없이 데이터를 나타내는 PCollection을 생성해야 합니다. Dataflow SDK는 초기 PCollection을 생성하는 두 가지 주요 방법을 제공합니다.

  • 파일 등의 외부 데이터 소스에서 데이터를 읽을 수 있습니다.
  • 메모리 내 컬렉션 클래스에 저장된 데이터의 PCollection을 생성할 수 있습니다.

외부 데이터 읽기

외부 데이터 소스에서 데이터를 읽는 방법에 대한 자세한 내용은 파이프라인 I/O를 참조하세요.

로컬 메모리 내 데이터에서 PCollection 생성

로컬 메모리 내 데이터에서 PCollection을 생성해 파이프라인 변환 내에서 해당 데이터를 사용할 수 있습니다. 일반적으로 소규모 데이터 세트로 파이프라인을 테스트하고 테스트 중 외부 I/O에 대한 파이프라인의 종속성을 줄이기 위해 로컬 메모리의 데이터를 사용합니다.

자바

메모리 내 자바 Collection에서 PCollection을 생성하려면 Create 변환을 apply합니다. Create는 자바용 Dataflow SDK에서 제공하는 루트 PTransform입니다. CreateCollection 내 요소의 인코딩 방법을 지정하는 자바 CollectionCoder 객체를 허용합니다.

다음 코드 예시는 자바 List에서 개별 텍스트 줄을 나타내는 String PCollection을 생성합니다.

  // Create a Java Collection, in this case a List of Strings.
  static final List<String> LINES = Arrays.asList(
      "To be, or not to be: that is the question: ",
      "Whether 'tis nobler in the mind to suffer ",
      "The slings and arrows of outrageous fortune, ",
      "Or to take arms against a sea of troubles, ");

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of())   // create the PCollection

위 코드는 지정된 요소를 포함하는 PCollection을 생성하는 Create.of를 사용합니다. 파이프라인이 윈도우를 사용하는 경우에는 대신 Create.timestamped를 사용해야 합니다. Create.timestamped는 타임스탬프가 지정된 특정 요소를 포함하는 PCollection을 생성합니다.

Python

PCollection을 생성하려면 Create 변환을 적용합니다. Create는 Dataflow Python SDK에서 제공하는 표준 변환입니다.

with beam.Pipeline(options=pipeline_options) as p:

  lines = (p
           | beam.Create([
               'To be, or not to be: that is the question: ',
               'Whether \'tis nobler in the mind to suffer ',
               'The slings and arrows of outrageous fortune, ',
               'Or to take arms against a sea of troubles, ']))

커스텀 데이터 유형과 함께 PCollection 사용

커스텀 데이터 유형을 요소 유형으로 제공하여 PCollection을 생성할 수 있습니다. 이 기능은 고객의 이름, 주소, 전화번호를 저장하는 자바 클래스와 같이 특정 필드를 가지는 자체 클래스 또는 구조의 컬렉션을 생성해야 할 때 유용합니다.

커스텀 유형 PCollection을 생성할 때 해당 커스텀 유형의 Coder를 제공해야 합니다. Coder는 데이터 세트가 여러 개의 파이프라인 작업자 인스턴스에 병렬화 및 분할됨에 따라 PCollection의 요소를 직렬화 및 역직렬화하는 방법을 Dataflow 서비스에 전달합니다. 자세한 내용은 데이터 인코딩을 참조하세요.

Dataflow는 명시적으로 Coder로 설정하지 않은 PCollection에 대해 Coder를 유추합니다. 커스텀 유형의 기본 CoderSerializableCoder이며, 자바 직렬화를 사용합니다. 하지만 Dataflow는 가능한 경우 CoderAvroCoder를 사용할 것을 권장합니다.

Pipeline 객체의 CoderRegistry를 사용하여 AvroCoder를 데이터 유형의 기본 코더로 등록할 수 있습니다. 클래스에 다음과 같은 주석을 추가합니다.

자바

  @DefaultCoder(AvroCoder.class)
  public class MyClass {
    ...
 }

커스텀 클래스가 AvroCoder와 호환되는지 확인하려면 주석을 추가해야 합니다. 예를 들어, 데이터 유형의 null 필드에 org.apache.avro.reflect.Nullable을 사용하여 주석을 추가해야 합니다. 자세한 내용은 AvroCoder의 자바용 API 참조 문서와 org.apache.avro.reflect의 패키지 문서를 참조하세요.

Dataflow의 TrafficRoutes 예 파이프라인StationSpeed라는 커스텀 클래스를 요소 유형으로 가지는 PCollection을 만듭니다. StationSpeed는 다음과 같이 AvroCoder를 기본 코더로 등록합니다.

자바

  /**
   * This class holds information about a station reading's average speed.
   */
  @DefaultCoder(AvroCoder.class)
  static class StationSpeed {
    @Nullable String stationId;
    @Nullable Double avgSpeed;

    public StationSpeed() {}

    public StationSpeed(String stationId, Double avgSpeed) {
      this.stationId = stationId;
      this.avgSpeed = avgSpeed;
    }

    public String getStationId() {
      return this.stationId;
    }
    public Double getAvgSpeed() {
      return this.avgSpeed;
    }
  }
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.