여러 PCollection 처리

일부 Dataflow SDK 변환PCollection 객체 여러 개를 입력으로 적용하거나, PCollection 객체 여러 개를 출력으로 생성할 수 있습니다. Dataflow SDK는 여러 PCollection 객체를 함께 번들링할 수 있는 여러 가지 방법을 제공합니다.

같은 데이터 유형을 저장하는 PCollection 객체의 경우, Dataflow SDK는 Flatten 또는 Partition 변환도 제공합니다. Flatten은 여러 PCollection 객체를 단일 논리 PCollection으로 병합하는 반면, Partition은 단일 PCollection을 고정된 숫자의 작은 컬렉션으로 분할합니다.

같은 유형을 저장하는 PCollection

자바

모두 같은 데이터 유형을 저장하는 여러 개의 PCollection 객체(예: 자바의 PCollection<String>)를 PCollectionList 클래스를 사용하여 캡슐화할 수 있습니다.

PCollectionListString, Integer 등 같은 데이터 유형을 저장하는 PCollection 객체 컬렉션을 나타냅니다. 예를 들어, Flatten 변환은 PCollectionList를 적용하고 모든 컬렉션에 있는 모든 요소를 단일 논리 PCollection으로 결합합니다.

마찬가지로 Partition 변환은 파티션 나누기 함수를 기준으로 단일 PCollectionPCollection 객체 여러 개를 포함하는 PCollectionList로 분할할 수 있습니다(예: 입력을 백분위 그룹으로 분할).

다음 코드 예시는 String 요소를 포함하는 개별 PCollection 객체로부터 PCollectionList를 생성하는 방법을 보여줍니다.

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  // Create a PCollectionList with three PCollections:
  PCollectionList<String> pcs = PCollectionList.of(pc1).and(pc2).and(pc3);

다음 코드 샘플에서처럼 색인(색인은 0부터 시작)을 사용하여 PCollectionList에서 개별 PCollection 객체에 액세스할 수 있습니다.

  PCollectionList<String> pcs = ...;

  // Get the first PCollection from the PCollection List.
  PCollection<String> firstPc = pcs.get(0);

PCollectionList에 있는 모든 PCollection 객체는 같은 데이터 유형을 포함해야 하지만, 같은 데이터 인코딩을 사용할 필요는 없습니다. PCollectionList에는 서로 다른 두 개의 코더(예: big endian 및 little endian)를 사용하는 두 개의 PCollection<Integer> 객체가 있을 수 있습니다.

서로 다른 유형을 저장하는 PCollection

일부 고급 변형은 여러 개의 입력을 적용하고 여러 개의 서로 다른 유형의 출력을 생성할 수 있습니다. 예를 들어 부차 출력이 있는 ParDo 변환은 서로 다른 데이터 유형을 포함하는 여러 개의 PCollection 객체를 생성할 수 있습니다.

자바

Dataflow 자바 SDK는 태그된 튜플 시스템을 사용하여 PCollection 객체 컬렉션을 나타냅니다. 태그된 튜플은 유형 안정성을 유지하는 데 도움이 됩니다. PCollection 객체는 PCollectionTuple 클래스에 포함되어 있습니다. 튜플에 있는 각 PCollection에 대해 연관된 TupleTag를 생성해야 합니다. TupleTagPCollectionTuple에서 각 PCollection을 검색하고, 식별하고, 색인을 생성하는 데 사용됩니다.

참고: PCollectionTuple, PCollection와 같은 서로 다른 유형을 저장할 수 있는 PCollection<String> 객체 그룹을 나타내는 경우에는 PCollection<Integer>을 사용해야 합니다. PCollection 객체 모두가 같은 데이터 유형을 포함하고 있는 경우에는 PCollectionList 사용을 고려해 보세요.

Dataflow 자바 SDK에서 각 PCollectionTupleTupleTag에 의해 키가 지정됩니다. TupleTag는 서로 다른 유형의 튜플에 대한 색인을 생성하기 위해 Dataflow 자바 SDK에 특별히 포함된 클래스입니다. 튜플 클래스와 TupleTag 객체를 조합해서 서로 다른 유형의 튜플을 변환의 입력 및 출력으로 사용하면 상당한 수준의 유형 안정성을 제공할 수 있습니다.

PCollectionTuple을 생성할 때, 튜플에 포함된 각 PCollection에 대한 TupleTag도 생성해야 합니다. 각 TupleTag 유형은 튜플에 있는 각 PCollection의 유형과 일치해야 합니다.

TupleTag 유형을 통해 튜플에서 각 PCollection의 정적 유형을 추적할 수 있습니다.

다음 코드 예시는 각각 String, Integer, Iterable<String> 값을 포함하는 세 개의 PCollection 객체가 있는 PCollectionTuple을 생성하는 방법을 보여줍니다.

  // The PCollections to be contained in the tuple.
  PCollection<String> pc1 = ...;
  PCollection<Integer> pc2 = ...;
  PCollection<Iterable<String>> pc3 = ...;

  // Create TupleTags for each of the PCollections to put in the PCollectionTuple.
  TupleTag<String> tag1 = new TupleTag<>();
  TupleTag<Integer> tag2 = new TupleTag<>();
  TupleTag<Iterable<String>> tag3 = new TupleTag<>();

  // Create a PCollectionTuple with the three PCollections and their associated tags.
  PCollectionTuple pcs =
      PCollectionTuple.of(tag1, pc1)
                      .and(tag2, pc2)
                      .and(tag3, pc3);

튜플에서 특정 PCollection을 추출하려면 튜플을 생성할 때 해당 컬렉션에 대해 사용한 TupleTag를 전달하는 PCollectionTuple.get 메소드를 사용합니다.

  // Get PCollections out of a PCollectionTuple, using the tags
  // that were used to put them in.

  PCollection<Integer> pcX = pcs.get(tag2);
  PCollection<String> pcY = pcs.get(tag1);
  PCollection<Iterable<String>> pcZ = pcs.get(tag3);

빈 튜플을 생성해야 하는 경우에는 PCollectionTuple.empty 메소드를 사용할 수 있습니다. 이 메소드는 지정된 파이프라인과 연관된 빈 튜플을 생성합니다.

  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  PCollectionTuple pcTuple = PCollectionTuple.empty(p);

PCollectionTuple.getAll 메소드를 사용하여 튜플에 있는 모든 PCollection 객체의 Map과 관련 TupleTag 객체를 가져올 수 있습니다.

  Map<TupleTag<?>, PCollection<?>> allCollections = pcs.getAll();

PCollectionTuple.has 메소드를 사용하여 해당 TupleTag와 연관된 PCollection이 튜플에 포함되어 있는지를 확인할 수 있습니다.

  TupleTag<String> tag1 = new TupleTag<>();
  boolean hasStringCollection = pcs.has(tag1);

Flatten으로 PCollection 병합

같은 데이터 유형을 포함하는 여러 개의 PCollection 객체가 파이프라인에 있는 경우에는 Flatten 변환을 사용하여 단일 논리 PCollection으로 병합할 수 있습니다.

Flatten 변환 적용

자바

Flatten은 해당 유형의 PCollection 객체에 원하는 만큼 PCollectionList를 적용하고, 해당 목록에 있는 PCollection 객체의 모든 요소를 포함하는 단일 PCollection을 반환합니다.

다음 코드 예시는 Flatten 변환을 apply하여 여러 PCollection<String> 객체를 단일 PCollection<String>으로 병합하는 방법을 보여줍니다. 이 예시는 병합할 모든 PCollection 객체를 포함하는 PCollectionList를 먼저 생성합니다.

  PCollection<String> pc1 = ...;
  PCollection<String> pc2 = ...;
  PCollection<String> pc3 = ...;

  PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);

  PCollection<String> merged = collections.apply(Flatten.<String>pCollections());

자바 7에서는 일반 팩토리 메소드 Flatten.pCollections를 사용할 때 각 입력 PCollection에 포함된 요소의 유형에 해당하는 유형 매개변수를 지정해야 합니다.

병합된 컬렉션의 데이터 인코딩

기본적으로 출력 PCollection코더는 입력 PCollectionList에 있는 첫 번째 PCollection의 코더와 동일합니다. 하지만 입력 PCollection 객체는 각각 다른 코더를 사용할 수 있습니다. 단, 모두가 같은 데이터 유형을 선택된 언어로 포함하고 있어야 합니다.

윈도우 설정된 컬렉션 병합

Flatten을 사용하여 윈도우 전략이 적용된 PCollection 객체를 병합하는 경우에는 병합할 모든 PCollection 객체가 호환되는 윈도우 전략과 윈도우 크기 조정을 사용해야 합니다. 예를 들어 병합 중인 모든 컬렉션은 가설적으로 모두 동일한 5분 고정 윈도우 또는 30초마다 시작하는 4분 슬라이딩 윈도우를 사용해야 합니다.

파이프라인이 Flatten을 사용하여 PCollection 객체를 호환되지 않는 윈도우와 병합하려고 시도하면 파이프라인이 생성될 때 Dataflow에서 IllegalStateException 오류가 발생합니다.

Partition으로 PCollection 분할

Partition 핵심 변환을 사용하여 단일 논리 PCollection에 있는 요소를 N개의 파티션으로 나눌 수 있습니다. 이로 인해 생기는 파티션은 PCollection이며, 해당 파티션은 PCollection 객체 목록으로 함께 번들링됩니다.

Partition을 사용하여 단일 PCollection을 백분위 그룹과 같은 여러 논리 그룹으로 나눌 수 있습니다. 이 방법은 파이프라인이 각각의 백분위 그룹을 서로 다르게 처리해야 하는 경우 등에 유용할 수 있습니다.

Partition 변환 적용

Partition은 제공되는 파티션 나누기 함수에 따라 PCollection의 요소를 나눕니다. 파티션 나누기 함수는 입력 PCollection의 요소를 각각의 결과 파티션 PCollection으로 분할하는 방법을 결정하는 논리를 포함하고 있습니다.

참고: 파티션 수는 그래프 생성 시 결정해야 합니다. 예를 들어 런타임 시 파티션 수를 명령줄 옵션으로 전달할 수는 있지만(이 경우 나중에 파이프라인 그래프를 빌드하는 데 사용), 파이프라인 도중에는 파이프라인 그래프가 생성된 후에 계산되는 데이터 기준으로 파티션 수를 결정할 수 없습니다.

자바

다음 코드 예시는 Student 유형 객체의 PCollection을 백분위 그룹으로 나눕니다.

  PCollection<Student> students = ...;
  // Split students up into 10 partitions, by percentile:
  PCollectionList<Student> studentsByPercentile =
      students.apply(Partition.of(10, new PartitionFn<Student>() {
          public int partitionFor(Student student, int numPartitions) {
              return student.getPercentile()  // 0..99
                   * numPartitions / 100;
          }}));

apply할 인수로 Partition 변환을 전달할 때는 원하는 결과 파티션 수를 나타내는 int 값과 파티션 나누기 함수를 나타내는 PartitionFn을 제공해야 합니다. 이 예시에서는 줄 안에서 PartitionFn을 정의합니다.

apply의 반환 값은 각각의 결과 파티션을 개별 PCollection 객체로 포함하고 있는 PCollectionList입니다. 다음과 같이 get 메소드를 사용하여 PCollectionList에서 각 파티션을 추출할 수 있습니다.

  PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.