Como migrar do SDK do Dataflow para Java, versão 1.x

Este documento destaca as principais alterações entre as versões 1.x e 2.x do SDK do Dataflow para Java.

Aviso de suspensão de uso do SDK do Dataflow: o SDK do Dataflow 2.5.0 é a última versão do SDK do Dataflow separada das versões do SDK do Apache Beam. O serviço Dataflow é totalmente compatível com versões oficiais do SDK do Apache Beam. O serviço Dataflow também é compatível com SDKs do Apache Beam lançados anteriormente, a partir da versão 2.0.0. Consulte a página de suporte do Dataflow para ver o status de suporte de vários SDKs. A página de downloads do Apache Beam contém notas das versões do SDK do Apache Beam.

Como migrar de 1.x para 2.x

Para instalar e usar o SDK do Apache Beam para Java versão 2.x, consulte o guia de instalação do SDK do Apache Beam.

Principais mudanças da versão 1.x para 2.x

OBSERVAÇÃO: todos os usuários precisam estar cientes dessas mudanças para fazer upgrade para versões 2.x.

Renomeação e reestruturação de pacotes

Como parte da generalização do Apache Beam para que ele funcione bem em ambientes fora do Google Cloud Platform, o código do SDK foi renomeado e reestruturado.

O item com.google.cloud.dataflow foi renomeado como org.apache.beam.

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-78

O SDK agora é declarado no pacote org.apache.beam em vez de com.google.cloud.dataflow. É necessário atualizar todas as declarações de importação com essa mudança.

Novos subpacotes: runners.dataflow, runners.direct e io.gcp

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-77

Os executores foram reorganizados nos seus próprios pacotes, então muitos aspectos de com.google.cloud.dataflow.sdk.runners mudaram para org.apache.beam.runners.direct ou org.apache.beam.runners.dataflow.

As opções de pipeline específicas para execução no serviço Dataflow foram movidas de com.google.cloud.dataflow.sdk.options para org.apache.beam.runners.dataflow.options.

A maioria dos conectores de E/S para serviços do Google Cloud Platform foi movida para subpacotes. Por exemplo, BigQueryIO passou de com.google.cloud.dataflow.sdk.io para org.apache.beam.sdk.io.gcp.bigquery.

A maioria das IDEs ajuda a identificar as novas localizações. Para verificar a nova localização de arquivos específicos, use t para pesquisar o código no GitHub. As versões do SDK 1.x do Dataflow para Java são criadas a partir do repositório GoogleCloudPlatform/DataflowJavaSDK (ramificação master-1.x). As versões do SDK 2.x do Dataflow para Java correspondem ao código do repositório apache/beam.

Executores

Pipeline removido dos nomes de executores

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1185

Os nomes de todos os executores foram reduzidos com a remoção de Pipeline. Por exemplo, DirectPipelineRunner agora é DirectRunner e DataflowPipelineRunner agora é DataflowRunner.

Exigir a configuração de --tempLocation para um caminho do Google Cloud Storage

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-430

Em vez de permitir que você especifique apenas --stagingLocation ou --tempLocation e que o Dataflow deduza o outro, o serviço do Dataflow exige que --gcpTempLocation seja definido como um caminho do Google Cloud Storage, mas ele pode ser inferido a partir do --tempLocation mais geral. A menos que seja substituído, ele também será usado para --stagingLocation.

InProcessPipelineRunner removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-243

O DirectRunner continua sendo executado na máquina local de um usuário, mas agora também suporta execução multithread, PCollections ilimitadas e gatilhos para saídas especulativas e atrasadas. Ele se alinha mais ao modelo documentado do Beam e pode causar (corretamente) falhas dos outros testes de unidade.

Como essa funcionalidade agora está no DirectRunner, o InProcessPipelineRunner (SDK do Dataflow para Java versão 1.6+) foi removido.

"BlockingDataflowPipelineRunner" substituído por "PipelineResult.waitToFinish()"

Usuários afetados: todos | Impacto: erro de compilação

O BlockingDataflowPipelineRunner foi removido. Se seu código espera, por programação, executar um pipeline e aguardar até que ele seja encerrado, ele deve usar o DataflowRunner e chamar explicitamente pipeline.run().waitToFinish().

Se você usou --runner BlockingDataflowPipelineRunner na linha de comando para induzir interativamente o bloqueio do seu programa principal até que o pipeline seja encerrado, essa será uma preocupação do programa principal. Ele deve fornecer uma opção, como --blockOnRun, que fará com que ele chame waitToFinish().

"TemplatingDataflowPipelineRunner" substituído por "--templateLocation"

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-551

A funcionalidade em TemplatingDataflowPipelineRunner (SDK do Dataflow para Java versão 1.9+) foi substituída ao usar --templateLocation com DataflowRunner.

ParDo e DoFn

DoFns usam anotações em vez de substituições de métodos

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-37

Para permitir mais flexibilidade e personalização, DoFn agora usa anotações de método para personalizar o processamento em vez de exigir que os usuários modifiquem métodos específicos.

As diferenças entre o novo DoFn e o antigo são demonstradas no exemplo de código a seguir. Antes, com o SDK 1.x do Dataflow para Java, seu código seria assim:

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

Agora, com o SDK 2.x do Apache Beam para Java, seu código será assim:

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

Se o DoFn acessou ProcessContext#window(), haverá mais uma alteração. Em vez disto:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

escreva isto:

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

ou:

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

O ambiente de execução fornecerá automaticamente a janela para DoFn.

DoFns são reutilizados em vários pacotes

Usuários afetados: todos | Impacto: pode causar resultados inesperados | Problema no JIRA: BEAM-38

Para permitir melhorias de desempenho, o mesmo DoFn agora pode ser reutilizado para processar vários pacotes de elementos, em vez de garantir uma nova instância por pacote. Qualquer DoFn que mantenha o estado local (por exemplo, variáveis de instância) além do final de um pacote pode encontrar alterações comportamentais, pois o próximo pacote agora vai começar com esse estado em vez de uma nova cópia.

Para gerenciar o ciclo de vida, novos métodos @Setup e @Teardown foram adicionados. Este é o ciclo de vida completo, embora uma falha possa causar interrupções em qualquer ponto:

  • @Setup: inicialização por instância de DoFn, como a abertura de conexões reutilizáveis.
  • Qualquer número da sequência:
    • @StartBundle: inicialização por pacote, como a redefinição do estado de DoFn.
    • @ProcessElement: o processamento normal do elemento.
    • @FinishBundle: etapas finais por pacote, como efeitos colaterais da limpeza.
  • @Teardown: desmontagem por instância dos recursos retidos por DoFn, como o fechamento de conexões reutilizáveis.

Nota: espera-se que essa mudança tenha impacto limitado na prática. No entanto, ela não gera um erro de tempo de compilação e pode causar resultados inesperados.

Alteração da ordem de parâmetros ao especificar entradas ou saídas laterais

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1422

Agora o DoFn precisa ser especificado primeiro ao aplicar um ParDo. Em vez de:

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

escreva isto:

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransforms

.named() removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-370

Remova os métodos .named() de PTransforms e subclasses. Use PCollection.apply(“name”, PTransform).

O item PTransform.apply() foi renomeado como PTransform.expand().

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-438

PTransform.apply() foi renomeado como PTransform.expand() para evitar confusão com PCollection.apply(). Todas as transformações compostas gravadas pelo usuário precisarão renomear o método apply() substituído como expand(). O modo como os pipelines são construídos não muda.

Grandes mudanças adicionais

Veja a seguir uma lista de mudanças adicionais significativas e as próximas mudanças.

Alterações de API individuais

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-725

Os GcpOptions a seguir foram removidos: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

Use GoogleCredentials.fromStream(InputStream for credential). O stream pode conter um arquivo de chave da conta de serviço em formato JSON do Google Developers Console ou uma credencial do usuário armazenada com o formato aceito pelo SDK do Cloud.

--enableProfilingAgent alterado para --saveProfilesToGcs

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1122

--update foi movido para DataflowPipelineOptions

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-81

Mova --update PipelineOption de DataflowPipelineDebugOptions para DataflowPipelineOptions.

BoundedSource.producesSortedKeys() removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1201

Remova producesSortedKeys() de BoundedSource

API PubsubIO alterada

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-974 e BEAM-1415

Começando com a versão 2.0.0-beta2, PubsubIO.Read e PubsubIO.Write precisam ser instanciados usando PubsubIO.<T>read() e PubsubIO.<T>write(), e não os métodos estáticos de fábrica, como PubsubIO.Read.topic(String).

Os métodos para configurar PubsubIO foram renomeados. Por exemplo, PubsubIO.read().topic(String) foi renomeado como PubsubIO.read().fromTopic(). Da mesma forma: subscription() como fromSubscription(), timestampLabel e idLabel respectivamente como withTimestampAttribute e withIdAttribute, PubsubIO.write().topic() como PubsubIO.write().to().

Em vez de especificar Coder para analisar o payload da mensagem, PubsubIO expõe funções para leitura e gravação de strings e mensagens do Avro e do Protobuf, por exemplo, PubsubIO.readStrings(), PubsubIO.writeAvros(). Para ler e gravar tipos personalizados, use PubsubIO.read/writeMessages() (e PubsubIO.readMessagesWithAttributes se precisar incluir atributos da mensagem) e ParDo ou MapElements para fazer conversões entre seu tipo personalizado e PubsubMessage.

Remoção do suporte a DatastoreIO para a API v1beta2 incompatível

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-354

Agora, o DatastoreIO é baseado na API do Cloud Datastore v1.

DisplayData.Builder alterado

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-745

DisplayData.Builder.include(..) tem um novo parâmetro de caminho obrigatório para registrar dados de exibição de subcomponentes. Agora, as APIs do Google Builder retornam DisplayData.ItemSpec<>, em vez de DisplayData.Item.

FileBasedSink.getWriterResultCoder() obrigatório

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-887

Transformou FileBasedSink.getWriterResultCoder em um método abstrato, que precisa ser fornecido.

O item Filter.byPredicate() foi renomeado como Filter.by().

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-342

IntraBundleParallelization removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-414

O item RemoveDuplicates foi renomeado como Distinct

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-239

TextIO alterado para usar uma sintaxe diferente e operar somente em strings

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1354

TextIO.Read.from() é alterado para TextIO.read().from(). Da mesma forma, TextIO.Write.to() é alterado para TextIO.write().to().

Agora, TextIO.Read sempre retorna um PCollection<String> e não precisa de .withCoder() para analisar strings. Como alternativa, analise as strings aplicando ParDo ou MapElements à coleção. Da mesma forma, agora TextIO.Write sempre exige PCollection<String>, e para gravar outra coisa em TextIOé preciso convertê-lo em String usando ParDo ou MapElements.

AvroIO alterado para usar uma sintaxe diferente

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1402

Para leitura e gravação de tipos gerados pelo Avro, AvroIO.Read.from().withSchema(Foo.class) é alterado para AvroIO.read(Foo.class).from(), da mesma forma que AvroIO.Write.

Para ler e gravar registros genéricos do Avro usando um esquema especificado, AvroIO.Read.from().withSchema(Schema or String) é alterado para AvroIO.readGenericRecords().from(), da mesma forma que AvroIO.Write.

Alteração de KafkaIO para especificar parâmetros de tipo explicitamente e usar serializadores/desserializadores Kafka

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1573 e BEAM-2221

No KafkaIO, agora você precisa especificar de maneira explícita os parâmetros de tipo key e value, por exemplo, KafkaIO.<Foo, Bar>read() e KafkaIO.<Foo, Bar>write().

Em vez de usar Coder para interpretar bytes de key e value, use as classes padrão Serializer e Deserializer do Kafka. Por exemplo: em vez de KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), use KafkaIO.read().withKeyDeserializer(StringDeserializer.class), da mesma forma para KafkaIO.write().

Sintaxe alterada para BigQueryIO

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1427

Em vez de BigQueryIO.Read.from() e BigQueryIO.Write.to(), use BigQueryIO.read().from() e BigQueryIO.write().to().

Sintaxe alterada para KinesisIO.Read

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1428

Em vez de KinesisIO.Read.from().using(), use KinesisIO.read().from().withClientProvider().

Sintaxe alterada para TFRecordIO

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1913

Em vez de TFRecordIO.Read.from() e TFRecordIO.Write.to(), use TFRecordIO.read().from() e TFRecordIO.write().to().

XmlSource e XmlSink consolidados em XmlIO

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1914

Em vez de usar XmlSource e XmlSink diretamente, use XmlIO.

Por exemplo: em vez de Read.from(XmlSource.from()), use XmlIO.read().from(); em vez de Write.to(XmlSink.writeOf()), use XmlIO.write().to().

CountingInput renomeado como GenerateSequence e generalizado

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1414

Em vez de CountingInput.unbounded(), use GenerateSequence.from(0). Em vez de CountingInput.upTo(n), use GenerateSequence.from(0).to(n).

Alterações em Count, Latest, Sample

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1417, BEAM-1421 e BEAM-1423

Agora as classes Count.PerElement, Count.PerKey e Count.Globally são particulares, então é preciso usar as funções de fábrica, como Count.perElement() (enquanto antes você poderia usar new Count.PerElement()). Além disso, se você quiser, por exemplo, usar .withHotKeyFanout() no resultado da transformação, não poderá fazer isso diretamente em um resultado de .apply(Count.perElement()) e semelhantes. Em vez disso, Count expõe a função de combinação como Count.combineFn() e você precisará aplicar Combine.globally(Count.combineFn()) por conta própria.

Alterações semelhantes são aplicadas às transformações Latest e Sample.

Alteração da ordem dos parâmetros para MapElements e FlatMapElements

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1418

Ao usar MapElements e FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), agora o descritor precisa ser especificado primeiro: por exemplo, FlatMapElements.into(descriptor).via(fn).

Alteração de Window ao configurar parâmetros adicionais

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1425

Ao usar Window para configurar algo diferente de WindowFn (Window.into()), use Window.configure(). Por exemplo: em vez de Window.triggering(...), use Window.configure().triggering(...).

O item Write.Bound foi renomeado como Write.

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1416

A classe Write.Bound agora é simplesmente Write. Isso só é importante quando você está extraindo aplicativos de Write.to(Sink) para uma variável. O tipo costumava ser Write.Bound<...>, agora será Write<...>.

Classes de transformação Flatten renomeadas

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1419

As classes Flatten.FlattenIterables e Flatten.FlattenPCollectionList são renomeadas, respectivamente, como Flatten.Iterables e Flatten.PCollections.

Dividir GroupByKey.create(boolean) em dois métodos

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1420

GroupByKey.create(boolean fewKeys) agora é simplesmente GroupByKey.create() e GroupByKey.createWithFewKeys().

SortValues alterado

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1426

Os métodos "setter" BufferedExternalSorter.Options são renomeados de setSomeProperty como withSomeProperty.

Dependência adicional da API do Google

A partir da versão 2.0.0 do SDK, também é necessário ativar a API do Cloud Resource Manager.

Upgrades de dependência

A versão 2.x faz upgrade das versões fixadas da maioria das dependências, incluindo Avro, protobuf e gRPC. Algumas dessas dependências podem ter criado suas próprias alterações interruptivas. Isso pode causar problemas quando seu código também depender diretamente dela. As versões usadas na 2.0.0 podem ser encontradas em pom.xml ou usando mvn dependency:tree.

Refatorações internas

Houve mudanças significativas na estrutura interna do SDK. Os usuários que se baseavam em fatores além da API pública, como classes ou métodos que terminam em pacotes Internal ou util, por exemplo, podem constatar isso.

Se você estava usando StateInternals ou TimerInternals: essas APIs internas foram removidas. Agora é possível usar as APIs State e Timer experimentais para DoFn.