Migrar do App Engine MapReduce para o Apache Beam e o Dataflow

Este tutorial é voltado a usuários do App Engine MapReduce. Nele, você verá como migrar do App Engine MapReduce para o Apache Beam e o Dataflow.

Por que migrar?

O App Engine MapReduce é um modelo de programação que processa grandes quantidades de dados de modo paralelo e distribuído. Ele é útil para tarefas grandes e de longa duração que não são processadas dentro do escopo de uma única solicitação, como:

  • analisar registros de aplicativos;
  • agregar dados relacionados de fontes externas;
  • transformar dados de um formato para outro;
  • exportar dados para análise externa.

No entanto, o App Engine MapReduce é uma biblioteca de código aberto mantida pela comunidade, criada com base nos serviços do App Engine e não é mais compatível com o Google.

O Dataflow, por outro lado, é totalmente compatível com o Google e oferece mais funcionalidades em comparação com o App Engine MapReduce.

Casos de migração

Veja a seguir alguns exemplos de como se beneficiar da migração do App Engine MapReduce para o Apache Beam e o Dataflow:

  • Armazene os dados de aplicativo do banco de dados do Datastore em um armazenamento de dados do BigQuery para processamento analítico usando SQL.
  • Use o Dataflow como uma alternativa ao App Engine MapReduce para manutenção e/ou atualizações do conjunto de dados do Datastore.
  • Faça backup de partes do seu banco de dados do Datastore para o Cloud Storage.

O que são o Dataflow e o Apache Beam?

O Dataflow é um serviço gerenciado para executar uma ampla variedade de padrões de processamento de dados. O Apache Beam é um modelo de programação unificado que fornece SDKs para definir fluxos de trabalho de processamento de dados. Use o Apache Beam para criar pipelines complexos para lotes e streaming e para executá-los no Dataflow.

Primeiros passos com o Dataflow e o Apache Beam

Para começar, siga o guia de início rápido de sua escolha:

Como criar e executar um canal

Ao usar o App Engine MapReduce, crie classes de processamento de dados, adicione a biblioteca do MapReduce e, assim que a especificação e as configurações do job estiverem definidas, crie e inicie o job em uma etapa usando o método start() estático na classe de job apropriada.

Em jobs de Map, crie as classes Input e Output e a classe Map que faz o mapeamento. Em jobs do App Engine MapReduce, crie as classes Input e Output e defina as classes Mapper e Reducer para transformações de dados.

Com o Apache Beam, faça as coisas de maneira um pouco diferente: crie um pipeline. Use conectores de entrada e saída para ler e gravar a partir de suas fontes de dados e coletores. Use as transformações de dados predefinidas (ou grave sua própria) para implementar as transformações de dados. Depois que o código estiver pronto, implante o pipeline no serviço do Dataflow.

Como converter jobs do App Engine MapReduce em pipelines do Apache Beam

A tabela a seguir apresenta os equivalentes do Apache Beam nas etapas mapear, reproduzir aleatoriamente e reduzir do modelo do App Engine MapReduce.

Java

App Engine MapReduce Equivalente ao Apache Beam
Mapa MapElements<InputT,OutputT>
Embaralhamento GroupByKey<K,V>
Reduzir Combine.GroupedValues<K,InputT,OutputT>

Uma prática comum é usar Combine.PerKey<K,InputT,OutputT> em vez de GroupByKey e CombineValues.

Python

App Engine MapReduce Equivalente ao Apache Beam
Mapa beam.Map
Embaralhamento beam.GroupByKey
Reduzir beam.CombineValues

Uma prática comum é usar beam.CombinePerKey em vez de beam.GroupByKey e beam.CombineValues.

Go

App Engine MapReduce Equivalente ao Apache Beam
Mapa beam.ParDo
Embaralhamento beam.GroupByKey
Reduzir beam.Combine


No exemplo de código a seguir, você verá como implementar o modelo do App Engine MapReduce no Apache Beam:

Java

Extraído do MinimalWordCount.java:
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))

 // Apply a ParDo that returns a PCollection, where each element is an
 // individual word in Shakespeare's texts.
 .apply("ExtractWords", ParDo.of(new DoFn<String, String>() {
                   @ProcessElement
                   public void processElement(ProcessContext c) {
                     for (String word : c.element().split(ExampleUtils.TOKENIZER_PATTERN)) {
                       if (!word.isEmpty()) {
                         c.output(word);
                       }
                     }
                   }
                 }))

 // Apply the Count transform that returns a new PCollection of key/value pairs,
 // where each key represents a unique word in the text.
 .apply(Count.perElement())

 // Apply a MapElements transform that formats our PCollection of word counts
 // into a printable string, suitable for writing to an output file.
 .apply("FormatResults", MapElements.via(new SimpleFunction<KV<String, Long>, String>() {
                   @Override
                   public String apply(KV<String, Long> input) {
                     return input.getKey() + ": " + input.getValue();
                   }
                 }))

 // Apply a write transform that writes the contents of the PCollection to a
 // series of text files.
 .apply(TextIO.write().to("wordcounts"));

Python

Extraído do wordcount_minimal.py:
# Read the text file[pattern] into a PCollection.
lines = p | ReadFromText(known_args.input)

# Count the occurrences of each word.
counts = (
    lines
    | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
                  .with_output_types(unicode))
    | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
    | 'GroupAndSum' >> beam.CombinePerKey(sum))

# Format the counts into a PCollection of strings.
output = counts | 'Format' >> beam.Map(lambda (w, c): '%s: %s' % (w, c))

# Write the output using a "Write" transform that has side effects.
# pylint: disable=expression-not-assigned
output | WriteToText(known_args.output)

Go

Extraído do minimal_wordcount.go:
// beam.Init() is an initialization hook that must be called on startup.
beam.Init()

// Create the Pipeline object and root scope.
p := beam.NewPipeline()
s := p.Root()
lines := textio.Read(s, "gs://apache-beam-samples/shakespeare/kinglear.txt")

// Invoke a ParDo transform on our PCollection of text lines.
// This ParDo invokes a DoFn (defined in-line) on each element that
// tokenizes the text line into individual words. The ParDo returns a
// PCollection of type string, where each element is an individual word in
// Shakespeare's collected texts.
words := beam.ParDo(s, func(line string, emit func(string)) {
    for _, word := range wordRE.FindAllString(line, -1) {
        emit(word)
    }
}, lines)

// Invoke the stats.Count transform on our PCollection of
// individual words. The Count transform returns a new PCollection of
// key/value pairs, where each key represents a unique word in the text.
// The associated value is the occurrence count for that word.
counted := stats.Count(s, words)

// Use a ParDo to format our PCollection of word counts into a printable
// string, suitable for writing to an output file. When each element
// produces exactly one element, the DoFn can simply return it.
formatted := beam.ParDo(s, func(w string, c int) string {
    return fmt.Sprintf("%s: %v", w, c)
}, counted)

// Invoke textio.Write at the end of the pipeline to write
// the contents of a PCollection (in this case, our PCollection of
// formatted strings) to a text file.
textio.Write(s, "wordcounts.txt", formatted)

Outros benefícios do Apache Beam e do Dataflow

Se você optar por migrar os jobs do App Engine MapReduce para os pipelines do Apache Beam, você se beneficiará de vários recursos que o Apache Beam e o Dataflow têm a oferecer.

Como programar jobs do Cloud Dataflow

Se você estiver familiarizado com as filas de tarefas do App Engine, programe os jobs recorrentes usando o Cron. Este exemplo demonstra como usar o Cron do App Engine para programar seus canais do Apache Beam.

Há diversas outras formas de programar a execução do seu canal. É possível:

Como monitorar jobs do Cloud Dataflow

Para monitorar os jobs do App Engine MapReduce, você depende de um URL hospedado em appspot.com.

Ao executar os pipelines usando o serviço gerenciado do Dataflow, é possível monitorar os pipelines usando a conveniente interface do usuário de monitoramento baseada na Web do Dataflow. Use também o Cloud Monitoring para informações adicionais sobre os pipelines.

Como ler e gravar

No Apache Beam, os leitores e gravadores do MapReduce do App Engine são chamados de fontes de dados e coletores ou conectores de E/S.

O Apache Beam tem muitos conectores de E/S para vários serviços do Google Cloud, como Bigtable, BigQuery, Datastore, Cloud SQL e outros. Além disso, há conectores de E/S criados pelos colaboradores do Apache Beam para serviços que não pertencem ao Google, como Apache Cassandra e MongoDB.

Próximas etapas