Como migrar do SDK 1.x do Dataflow para Java

Este documento destaca as principais alterações entre as versões 1.x e 2.x do SDK do Dataflow para Java.

Aviso de suspensão da utilização do SDK do Dataflow: o SDK do Dataflow 2.5.0 é a última versão do SDK do Dataflow separada das versões do SDK do Apache Beam. O serviço Dataflow é totalmente compatível com versões oficiais do SDK do Apache Beam. O serviço Dataflow também é compatível com SDKs do Apache Beam lançados anteriormente, começando com a versão 2.0.0 e superior. Consulte a página de suporte do Dataflow para ver o status de suporte de vários SDKs. A página de downloads do Apache Beam contém notas das versões do SDK do Apache Beam.

Como migrar de 1.x para 2.x

Para instalar e usar o SDK do Apache Beam para Java 2.x, consulte o guia de instalação do SDK do Apache Beam.

Principais mudanças da versão 1.x para 2.x

OBSERVAÇÃO: todos os usuários precisam estar cientes dessas mudanças para fazer upgrade para versões 2.x.

Renomeação e reestruturação de pacotes

Como parte da generalização do Apache Beam para que funcione bem em ambientes fora do Google Cloud Platform, o código do SDK foi renomeado e reestruturado.

O item com.google.cloud.dataflow foi renomeado como org.apache.beam.

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-78 (em inglês)

O SDK agora é declarado no pacote org.apache.beam em vez de com.google.cloud.dataflow. É necessário atualizar todas as declarações de importação com essa mudança.

Novos subpacotes: runners.dataflow , runners.direct e io.gcp

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-77 (em inglês)

Os executores foram reorganizados nos próprios pacotes, então muitas coisas de com.google.cloud.dataflow.sdk.runners mudaram para org.apache.beam.runners.direct ou org.apache.beam.runners.dataflow.

As opções de pipeline específicas para execução no serviço Dataflow foram movidas de com.google.cloud.dataflow.sdk.options para org.apache.beam.runners.dataflow.options.

A maioria dos conectores de E/S para serviços do Google Cloud Platform foi movida para subpacotes. Por exemplo, BigQueryIO foi movido de com.google.cloud.dataflow.sdk.io para org.apache.beam.sdk.io.gcp.bigquery.

A maioria das IDEs ajuda a identificar as novas localizações. Para verificar o novo local para arquivos específicos, use t para pesquisar o código no GitHub. O SDK 1.x do Dataflow para versões Java é criado a partir do repositório GoogleCloudPlatform/DataflowJavaSDK (ramificação master-1.x). As versões do SDK 2.x para Java do Dataflow correspondem ao código do repositório apache/beam.

Executores

Pipeline removido dos nomes de executor

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1185 (em inglês)

Os nomes de todos os executores foram reduzidos removendo Pipeline dos nomes. Por exemplo, DirectPipelineRunner agora é DirectRunner e DataflowPipelineRunner agora é DataflowRunner.

Exigir a configuração de --tempLocation para um caminho do Google Cloud Storage

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-430 (em inglês)

Em vez de permitir que você especifique apenas --stagingLocation ou --tempLocation e Dataflow inferindo o outro, o serviço do Dataflow exige que --gcpTempLocation seja definido como um caminho do Google Cloud Storage, mas ele pode ser inferido a partir do--tempLocation mais geral. A menos que substituído, ele também será usado para --stagingLocation.

InProcessPipelineRunner removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-243 (em inglês)

O DirectRunner continua sendo executado na máquina local de um usuário, mas agora também suporta execução multissegmentada, PCollections ilimitadas e acionadores para saídas especulativas e atrasadas. Ele se alinha mais ao modelo documentado do Beam e pode causar (corretamente) falhas dos outros testes de unidade.

Como essa funcionalidade agora está em DirectRunner, o InProcessPipelineRunner (SDK do Dataflow 1.6+ para Java) foi removido.

"BlockingDataflowPipelineRunner" substituído por "PipelineResult.waitToFinish()"

Usuários afetados: todos | Impacto: erro de compilação

O BlockingDataflowPipelineRunner foi removido. Se seu código programaticamente espera executar um pipeline e aguardar até que ele seja encerrado, ele deve usar o DataflowRunner e chamar explicitamente pipeline.run().waitToFinish().

Se você usou --runner BlockingDataflowPipelineRunner na linha de comando para induzir interativamente seu programa principal a bloquear até que o pipeline seja encerrado, isso é uma preocupação do programa principal. Ele deve fornecer uma opção, como --blockOnRun que fará com que ele chame waitToFinish().

"TemplatingDataflowPipelineRunner" substituído por "--templateLocation"

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-551 (em inglês)

A funcionalidade em TemplatingDataflowPipelineRunner (SDK do fluxo de dados 1.9+ para Java) foi substituída por --templateLocation com DataflowRunner.

ParDo e DoFn

DoFns usam anotações em vez de substituições de métodos

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-37(em inglês)

Para permitir mais flexibilidade e personalização, DoFn agora usa anotações de método para personalizar o processamento em vez de exigir que os usuários modifiquem métodos específicos.

As diferenças entre o novo DoFn e o antigo são demonstradas no exemplo de código a seguir. Antes, com o SDK 1.x do Dataflow para Java, o código seria assim:

new DoFn<Foo, Baz>() {
      @Override
      public void processElement(ProcessContext c) { … }
    }
    

Agora, com o SDK 2.x do Apache Beam para Java, seu código será assim:

new DoFn<Foo, Baz>() {
      @ProcessElement   // <-- This is the only difference
      public void processElement(ProcessContext c) { … }
    }
    

Se seu DoFn acessou ProcessContext#window(), haverá uma alteração adicional. Em vez disto:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
      @Override
      public void processElement(ProcessContext c) {
        … (MyWindowType) c.window() …
      }
    }
    

escreva isto:

public class MyDoFn extends DoFn<Foo, Baz> {
      @ProcessElement
      public void processElement(ProcessContext c, MyWindowType window) {
        … window …
      }
    }
    

ou

return new DoFn<Foo, Baz>() {
      @ProcessElement
      public void processElement(ProcessContext c, MyWindowType window) {
        … window …
      }
    }
    

O ambiente de execução fornecerá automaticamente a janela DoFn.

DoFns são reutilizados em vários pacotes

Usuários afetados: todos | Impacto: pode causar resultados inesperados | Problema no JIRA: BEAM-38 (em inglês)

Para permitir melhorias de desempenho, o mesmo DoFn agora pode ser reutilizado para processar vários pacotes de elementos, em vez de garantir uma nova instância por pacote. Qualquer DoFn que mantém o estado local (por exemplo, variáveis de instância) além do final de um pacote pode encontrar alterações comportamentais, pois o próximo pacote agora começará com esse estado em vez de uma nova cópia.

Para gerenciar o ciclo de vida, novos métodos @Setup e @Teardown foram adicionados. Este é o ciclo de vida completo, embora uma falha o trunque em qualquer ponto:

  • @Setup: inicialização por instância de DoFn, como a abertura de conexões reutilizáveis.
  • Qualquer número da sequência:
    • @StartBundle: inicialização por pacote, como a redefinição do estado de DoFn.
    • @ProcessElement: o processamento normal do elemento.
    • @FinishBundle: etapas finais por agrupamento, como efeitos colaterais de liberação.
  • @Teardown: desmontagem por instância dos recursos retidos por DoFn, como o fechamento de conexões reutilizáveis.

Observação: espera-se que essa mudança tenha impacto limitado na prática. No entanto, ela não gera um erro de tempo de compilação e pode causar resultados inesperados.

Alteração da ordem de parâmetros ao especificar entradas ou saídas laterais

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1422 (em inglês)

Agora o DoFn precisa ser especificado primeiro ao aplicar um ParDo. Em vez disto:

foos.apply(ParDo
        .withSideInputs(sideA, sideB)
        .withOutputTags(mainTag, sideTag)
        .of(new MyDoFn()))
    

escreva isto:

foos.apply(ParDo
        .of(new MyDoFn())
        .withSideInputs(sideA, sideB)
        .withOutputTags(mainTag, sideTag))
    

PTransforms

.named() removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-370 (em inglês)

Remova os métodos .named() de PTransforms e subclasses. Em vez disso, use PCollection.apply(“name”, PTransform).

O item PTransform.apply() foi renomeado como PTransform.expand().

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-438 (em inglês)

PTransform.apply() foi renomeado como PTransform.expand() para evitar confusão com PCollection.apply(). Todas as transformações compostas gravadas pelo usuário precisarão renomear o método apply() substituído para expand(). Não há mudanças no modo como os pipelines são construídos.

Grandes mudanças adicionais

Veja a seguir uma lista de mudanças adicionais significativas e as próximas mudanças.

Mudanças de API individuais

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-725 (em inglês)

Os seguintes GcpOptions: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile foram removidos.

Use GoogleCredentials.fromStream(InputStream for credential). O stream pode conter um arquivo de chave da conta de serviço em formato JSON do Google Developers Console ou uma credencial do usuário armazenada usando o formato aceito pelo SDK do Cloud.

--enableProfilingAgent alterado para --saveProfilesToGcs

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1122 (em inglês)

--updatefoi movido para DataflowPipelineOptions.

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-81 (em inglês)

Mova --update PipelineOption de DataflowPipelineDebugOptions para DataflowPipelineOptions.

BoundedSource.producesSortedKeys() removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1201 (em inglês)

Remover producesSortedKeys() de BoundedSource

API PubsubIO alterada

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-974 e BEAM-1415 (links em inglês)

Começando com 2.0.0-beta2, PubsubIO.Read e PubsubIO.Write devem ser instanciados usando PubsubIO.<T>read() e PubsubIO.<T>write() em vez dos métodos estáticos de fábrica, como PubsubIO.Read.topic(String).

Os métodos para configurar PubsubIO foram renomeados. Por exemplo, PubsubIO.read().topic(String) foi renomeado para PubsubIO.read().fromTopic(). Da mesma forma: subscription() a fromSubscription(), timestampLabel e idLabel respectivamente para withTimestampAttribute e withIdAttribute, PubsubIO.write().topic() a PubsubIO.write().to().

Em vez de especificar Coder para analisar o payload da mensagem, PubsubIO expõe funções para leitura e escrita de strings, Avro messages e Protobuf messages, eg PubsubIO.readStrings(), PubsubIO.writeAvros(). Para ler e gravar tipos personalizados, use PubsubIO.read/writeMessages() (e PubsubIO.readMessagesWithAttributes se os atributos da mensagem precisarem ser incluídos) e use ParDo ou MapElements para converter entre seu tipo personalizado e PubsubMessage.

Remoção do suporte a DatastoreIO para a API v1beta2 incompatível

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-354 (em inglês)

Agora, o DatastoreIO é baseado na Cloud Datastore API v1.

DisplayData.Builder alterado

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-745 (em inglês)

DisplayData.Builder.include(..) tem um novo parâmetro de caminho obrigatório para registrar dados de exibição de subcomponentes. Agora, as APIs do Google Builder retornam um DisplayData.ItemSpec<>, em vez de DisplayData.Item.

FileBasedSink.getWriterResultCoder() obrigatório

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-887 (em inglês)

Transformou FileBasedSink.getWriterResultCoder em um método abstrato, que deve ser fornecido.

O item Filter.byPredicate() foi renomeado como Filter.by().

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-342 (em inglês)

IntraBundleParallelization removido

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-414 (em inglês)

O item RemoveDuplicates foi renomeado como Distinct.

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-239 (em inglês)

TextIO alterado para usar uma sintaxe diferente e operar somente em strings

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1354 (em inglês)

TextIO.Read.from() é alterado para TextIO.read().from(). Da mesma forma, TextIO.Write.to() é alterado para TextIO.write().to().

TextIO.Read agora sempre retorna um PCollection<String> e não precisa de .withCoder() para analisar as sequências. Em vez disso, analise as strings aplicando um ParDo ou MapElements à coleção. Da mesma forma, TextIO.Write agora sempre leva um PCollection<String>, e para escrever outra coisa para TextIOé preciso convertê-lo em String usando um ParDo ou MapElements.

Alterou AvroIO para usar uma sintaxe diferente

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1402

Para leitura e gravação de tipos gerados pelo Avro, AvroIO.Read.from().withSchema(Foo.class) é alterado para AvroIO.read(Foo.class).from(), da mesma forma AvroIO.Write.

Para ler e gravar registros genéricos do Avro usando um esquema especificado, AvroIO.Read.from().withSchema(Schema or String) é alterado para AvroIO.readGenericRecords().from(), da mesma forma AvroIO.Write.

Alteração de KafkaIO para especificar parâmetros de tipo explicitamente e usar serializadores/desserializadores Kafka

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1573 e BEAM-2221 (links em inglês)

No KafkaIO, agora você precisa especificar explicitamente os parâmetros de tipo key e value, por exemplo, KafkaIO.<Foo, Bar>read() e KafkaIO.<Foo, Bar>write().

Em vez de usar Coder para interpretar bytes de chave e valor, use as classes padrão Serializer e Deserializer do Kafka. Por exemplo: em vez de KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), use KafkaIO.read().withKeyDeserializer(StringDeserializer.class), da mesma forma para KafkaIO.write().

Sintaxe alterada para BigQueryIO

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1427

Em vez de BigQueryIO.Read.from() e BigQueryIO.Write.to(), use BigQueryIO.read().from() e BigQueryIO.write().to().

Sintaxe alterada para KinesisIO.Read

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1428 (em inglês)

Em vez de KinesisIO.Read.from().using(), use KinesisIO.read().from().withClientProvider().

Sintaxe alterada para TFRecordIO

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1913 (em inglês)

Em vez de TFRecordIO.Read.from() e TFRecordIO.Write.to(), use TFRecordIO.read().from() e TFRecordIO.write().to().

XmlSource e XmlSink consolidadas em XmlIO

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1914 (em inglês)

Em vez de usar XmlSource e XmlSink diretamente, use XmlIO.

Por exemplo: em vez de Read.from(XmlSource.from()), use XmlIO.read().from(); em vez de Write.to(XmlSink.writeOf()), use XmlIO.write().to().

CountingInput renomeado para GenerateSequence e generalizado

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1414 (em inglês)

Em vez de CountingInput.unbounded(), use GenerateSequence.from(0). Em vez de CountingInput.upTo(n), use GenerateSequence.from(0).to(n).

Count, Latest e Sample alterados

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1417, BEAM-1421 e BEAM-1423 (links em inglês)

Classes Count.PerElement, Count.PerKey e Count.Globally agora são particulares, então você tem que usar as funções de fábrica, como Count.perElement() (embora anteriormente você pudesse usar new Count.PerElement()). Além disso, se você quiser, por exemplo, usar .withHotKeyFanout() no resultado da transformação, você não pode fazer isso diretamente em um resultado de .apply(Count.perElement()) e mais. Em vez disso, Count expõe a função de combinação como Count.combineFn() e você deve inscrever Combine.globally(Count.combineFn()) você mesmo.

Alterações semelhantes são aplicadas às transformações Latest e Sample.

Alteração da ordem dos parâmetros para MapElements e FlatMapElements

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1418 (em inglês)

Ao usar MapElements e FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), agora o descritor deve ser especificado primeiro: por exemplo, FlatMapElements.into(descriptor).via(fn).

Alteração de Window ao configurar parâmetros adicionais

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1425 (em inglês)

Ao usar Window para configurar algo diferente de WindowFn propriamente dito (Window.into()), use Window.configure(). Por exemplo: em vez de Window.triggering(...), use Window.configure().triggering(...).

O item Write.Bound foi renomeado como Write.

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1416 (em inglês)

Classe Write.Bound agora é simplesmente Write. Isso é importante apenas se você estiver extraindo aplicativos de Write.to(Sink) em uma variável. O tipo costumava ser Write.Bound<...>, agora será Write<...>.

Classes de transformação Flatten renomeadas

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1419 (em inglês)

As classes Flatten.FlattenIterables e Flatten.FlattenPCollectionList são renomeadas, respectivamente, como Flatten.Iterables e Flatten.PCollections.

Dividir GroupByKey.create(boolean) em dois métodos

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1420 (em inglês)

GroupByKey.create(boolean fewKeys) agora é apenas GroupByKey.create() e GroupByKey.createWithFewKeys().

SortValues alterado

Usuários afetados: todos | Impacto: erro de compilação | Problema no JIRA: BEAM-1426 (em inglês)

Os métodos "setter" BufferedExternalSorter.Options são renomeados de setSomeProperty a withSomeProperty.

Dependência adicional da API do Google

A partir do SDK versão 2.0.0, também é necessário habilitar a Cloud Resource Manager API.

Upgrades de dependência

A versão 2.x atualiza as versões fixadas da maioria das dependências, incluindo Avro, protobuf e gRPC. Algumas dessas dependências podem ter criado alterações significativas por conta própria, o que pode causar problemas se seu código também depender diretamente da dependência. As versões usadas na versão 2.0.0 podem ser encontradas no pom.xml ou usando mvn dependency:tree.

Refatorações internas

Houve mudanças significativas na estrutura interna do SDK. Os usuários que se baseavam em fatores além da API pública como classes ou métodos que terminam em pacotes Internal ou util, por exemplo, podem constatar isso.

Se você estava usando StateInternals ou TimerInternals: essas APIs internas foram removidas. Agora você pode usar as APIs State e Timer experimentais para DoFn.