App Engine 맵리듀스에서 Apache Beam 및 Dataflow로 마이그레이션

이 가이드는 App Engine 맵리듀스 사용자를 위해 작성되었으며, App Engine 맵리듀스에서 Apache Beam과 Dataflow로 마이그레이션하는 방법을 보여줍니다.

마이그레이션이 필요한 이유

App Engine MapReduce는 대량의 데이터를 병렬 및 분산 방식으로 처리하기 위한 프로그래밍 모델입니다. 다음과 같이 단일 요청의 범위 내에서 처리할 수 없는 크고 오래 실행되는 작업에 유용합니다.

  • 애플리케이션 로그 분석
  • 외부 소스로부터 관련 데이터 집계
  • 한 형식에서 다른 형식으로 데이터 변환
  • 외부 분석을 위해 데이터 내보내기

하지만 App Engine MapReduce는 App Engine 서비스를 기반으로 커뮤니티에서 관리하는 오픈소스 라이브러리이며 더 이상 Google에서 지원하지 않습니다.

반면 Dataflow는 Google이 완벽하게 지원하며 App Engine 맵리듀스와 비교하여 확장된 기능을 제공합니다.

마이그레이션 사례

다음은 App Engine 맵리듀스에서 Apache Beam 및 Dataflow로 마이그레이션함으로써 이점을 얻을 수 있는 몇 가지 사례입니다.

  • SQL을 사용하는 분석 처리를 위해 Datastore 데이터베이스 애플리케이션 데이터를 BigQuery 데이터웨어 하우스에 저장합니다.
  • Datastore 데이터 세트의 유지보수나 업데이트를 위해 App Engine 맵리듀스 대신 Dataflow를 사용합니다.
  • Datastore 데이터베이스의 일부를 Cloud Storage에 백업합니다.

Dataflow 및 Apache Beam이란 무엇인가요?

Dataflow는 다양한 데이터 처리 패턴을 실행하는 관리형 서비스입니다. Apache Beam은 데이터 처리 워크플로를 정의하기 위한 SDK를 제공하는 통합 프로그래밍 모델입니다. Apache Beam을 사용하면 일괄 처리 및 스트리밍을 위해 복잡한 파이프라인을 생성하고 Dataflow에서 실행할 수 있습니다.

Dataflow 및 Apache Beam 시작하기

시작하려면 다음 중에서 원하는 빠른 시작을 따르세요.

파이프라인 생성 및 실행

App Engine 맵리듀스를 사용할 때는 데이터 처리 클래스를 생성하고 맵리듀스 라이브러리를 추가합니다. 그리고 작업의 사양과 설정이 정의되면 해당 작업 클래스에서 정적 start() 메서드를 사용하여 한 번에 작업을 생성하고 시작합니다.

매핑 작업의 경우 InputOutput 클래스 그리고 매핑을 수행할 Map 클래스를 생성합니다. App Engine 맵리듀스 작업의 경우 InputOutput 클래스를 생성하고 데이터 변환을 위한 MapperReducer 클래스를 정의합니다.

Apache Beam에서는 방법이 약간 다릅니다. 먼저 파이프라인을 생성합니다. 입력 및 출력 커넥터를 사용하여 데이터 소스 및 싱크에서 데이터를 읽고 씁니다. 사전 정의된 데이터 변환을 사용하거나 또는 직접 작성하여 데이터 변환을 구현합니다. 그런 다음 코드가 준비되면 파이프라인을 Dataflow 서비스에 배포합니다.

App Engine 맵리듀스 작업을 Apache Beam 파이프라인으로 전환

다음 테이블은 App Engine 맵리듀스 모델의 매핑, 셔플, 리듀스 단계에 해당하는 Apache Beam 클래스를 보여줍니다.

자바

App Engine 맵리듀스 해당하는 Apache Beam 클래스
지도 MapElements<InputT,OutputT>
셔플 GroupByKey<K,V>
리듀스 Combine.GroupedValues<K,InputT,OutputT>

일반적인 방법은 Combine.PerKey<K,InputT,OutputT>GroupByKeyCombineValues 대신 사용하는 것입니다.

Python

App Engine 맵리듀스 해당하는 Apache Beam 클래스
지도 beam.Map
셔플 beam.GroupByKey
리듀스 beam.CombineValues

일반적인 방법은 beam.CombinePerKeybeam.GroupByKeybeam.CombineValues 대신 사용하는 것입니다.

Go

App Engine 맵리듀스 해당하는 Apache Beam 클래스
지도 beam.ParDo
셔플 beam.GroupByKey
리듀스 beam.Combine


다음 샘플 코드는 Apache Beam에서 App Engine 맵리듀스 모델을 구현하는 방법을 보여줍니다.

자바

MinimalWordCount.java에서 발췌:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

wordcount_minimal.py에서 발췌:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

minimal_wordcount.go에서 발췌:
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

Apache Beam 및 Dataflow의 추가적인 이점

App Engine 맵리듀스 작업을 Apache Beam 파이프라인으로 마이그레이션하면 Apache Beam과 Dataflow에서 제공하는 여러 가지 기능을 활용할 수 있습니다.

Cloud Dataflow 작업 예약

App Engine 태스크 큐에 익숙하면 Cron을 사용하여 되풀이 작업을 예약할 수 있습니다. 이 는 App Engine Cron을 사용하여 Apache Beam 파이프라인을 예약하는 방법을 보여줍니다.

파이프라인 실행을 예약할 수 있는 여러 가지 추가적인 방법이 있습니다. 다음과 같은 작업을 수행할 수 있습니다.

Cloud Dataflow 작업 모니터링

App Engine 맵리듀스 작업을 모니터링하려면 appspot.com에서 호스팅되는 URL을 사용합니다.

Dataflow 관리 서비스를 사용하여 파이프라인을 실행할 때는 편리한 Dataflow 웹 기반 모니터링 사용자 인터페이스를 사용하여 파이프라인을 모니터링할 수 있습니다. Cloud Monitoring을 사용하여 파이프라인에 대한 추가 정보를 확인할 수도 있습니다.

읽기 및 쓰기

Apache Beam에서는 App Engine 맵리듀스의 리더 및 작성기를 데이터 소스 및 싱크 또는 I/O 커넥터라고 합니다.

Apache Beam에는 Bigtable, BigQuery, Datastore, Cloud SQL 등 여러 Google Cloud 서비스용 I/O 커넥터가 많이 있습니다. Apache Cassandra와 MongoDB 등 Google 이외 서비스를 위해 Apache Beam 참여자들이 생성한 I/O 커넥터도 있습니다.

다음 단계