Cómo migrar desde el SDK de Dataflow 1.x para Java

Este documento destaca los principales cambios entre el SDK de Dataflow para las versiones 1.x de Java y las versiones 2.x.

Aviso de baja del SDK de Dataflow: Dataflow SDK 2.5.0 es la última versión del SDK de Dataflow que es independiente de las versiones del SDK de Apache Beam. El servicio de Dataflow es totalmente compatible con las versiones oficiales del SDK de Apache Beam. El servicio de Dataflow también es compatible con los SDK de Apache Beam lanzados anteriormente a partir de la versión 2.0.0 y posteriores. Consulte la página de asistencia de Dataflow para conocer el estado de compatibilidad de varios SDK. La página Descargas de Apache Beam contiene notas de las versiones de los SDK de Apache Beam.

Migración de 1.x a 2.x

A fin de instalar y usar el SDK de Apache Beam 2.x para Java, consulta la guía de instalación del SDK de Apache Beam.

Cambios principales de 1.x a 2.x

NOTA: Todos los usuarios deben conocer estos cambios si quieren actualizarse a las versiones 2.x.

Renombramiento y reestructuramiento de paquetes

Como parte del proceso de generalización de Apache Beam para que trabaje bien con entornos más allá Google Cloud Platform, el código del SDK se volvió a nombrar y a estructurar.

Se cambió el nombre "com.google.cloud.dataflow" a "org.apache.beam".

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-78

El SDK ahora se declara en el paquete org.apache.beam en lugar de com.google.cloud.dataflow. Necesitas actualizar todas tus declaraciones de importación con este cambio.

Nuevos subpaquetes: runners.dataflow, runners.direct y io.gcp

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-77

Los corredores se reorganizaron en sus propios paquetes, por lo que muchos elementos de com.google.cloud.dataflow.sdk.runners pasaron a ser org.apache.beam.runners.direct o org.apache.beam.runners.dataflow.

Las opciones de canalización específicas para la ejecución en el servicio de Dataflow pasaron de com.google.cloud.dataflow.sdk.options a org.apache.beam.runners.dataflow.options.

La mayoría de los conectores de E/S a servicios de Google Cloud Platform se movieron a subpaquetes. Por ejemplo, BigQueryIO se movió de com.google.cloud.dataflow.sdk.io a org.apache.beam.sdk.io.gcp.bigquery.

La mayoría de los IDE podrán ayudarte a identificar las ubicaciones nuevas. Para verificar la nueva ubicación de archivos específicos, puedes usar t para buscar el código en GitHub. Los lanzamientos del SDK 1.x para Java de Dataflow se crean desde el repositorio GoogleCloudPlatform/DataflowJavaSDK (rama master-1.x). Las versiones del SDK 2.x de Java de Dataflow corresponden al código del repositorio apache/beam.

Ejecutores

Se quitó Pipeline de los nombres de Runner

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1185

Se acortaron los nombres de todos los corredores quitando Pipeline de los nombres. Por ejemplo, DirectPipelineRunner ahora es DirectRunner y DataflowPipelineRunner ahora es DataflowRunner.

Requerir configuración --tempLocation a una ruta de Google Cloud Storage

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-430

En lugar de permitirte especificar solo uno de --stagingLocation o --tempLocation y luego Dataflow inferir el otro, el servicio de Dataflow ahora requiere que --gcpTempLocation se establezca en una ruta de Google Cloud Storage, pero se puede inferir de el más general --tempLocation. A menos que se anule, esto también se usará para el --stagingLocation.

Se quitó InProcessPipelineRunner

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-243

El DirectRunner continúa ejecutándose en la máquina local de un usuario, pero ahora también admite ejecución de subprocesos múltiples, PCollections ilimitadas y activadores para resultados especulativos y tardíos. Se alinea de manera más próxima al modelo de Beam documentado y puede causar (de forma correcta) fallas de prueba de unidades adicionales.

Como esta funcionalidad ahora está en el DirectRunner, se quitó el InProcessPipelineRunner (SDK 1.6+ de Dataflow para Java).

Se reemplazó BlockingDataflowPipelineRunner por PipelineResult.waitToFinish()

Usuarios afectados: Todos | Impacto: Error de compilación

Se quitó BlockingDataflowPipelineRunner. Si su código espera ejecutar una canalización programáticamente y esperar hasta que finalice, debe usar DataflowRunner y llamar explícitamente a pipeline.run().waitToFinish().

Si usó --runner BlockingDataflowPipelineRunner en la línea de comandos para inducir de manera interactiva a su programa principal a bloquear hasta que la canalización haya finalizado, esto es una preocupación del programa principal; debe proporcionar una opción como --blockOnRun que lo induzca a llamar waitToFinish().

Se reemplazó TemplatingDataflowPipelineRunner por --templateLocation

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-551

La funcionalidad de TemplatingDataflowPipelineRunner (Dataflow SDK 1.9+ para Java) se reemplazó por --templateLocation con DataflowRunner.

ParDo y DoFn

DoFn s usa anotaciones en lugar de anulaciones de métodos

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-37

Para permitir una mayor flexibilidad y personalización, DoFn ahora usa anotaciones de método para personalizar el procesamiento en lugar de requerir que los usuarios anulen métodos específicos.

Las diferencias entre la nueva DoFn y la anterior se demuestran en el siguiente ejemplo de código. Anteriormente (con el SDK de Dataflow 1.x para Java), su código se vería así:

new DoFn<Foo, Baz>() {
      @Override
      public void processElement(ProcessContext c) { … }
    }
    

Ahora (con el SDK de Apache Beam 2.x para Java), tu código se verá de la siguiente manera:

new DoFn<Foo, Baz>() {
      @ProcessElement   // <-- This is the only difference
      public void processElement(ProcessContext c) { … }
    }
    

Si DoFn accediste ProcessContext#window(), hay otro cambio. En lugar de esto:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
      @Override
      public void processElement(ProcessContext c) {
        … (MyWindowType) c.window() …
      }
    }
    

escribirás esto:

public class MyDoFn extends DoFn<Foo, Baz> {
      @ProcessElement
      public void processElement(ProcessContext c, MyWindowType window) {
        … window …
      }
    }
    

o:

return new DoFn<Foo, Baz>() {
      @ProcessElement
      public void processElement(ProcessContext c, MyWindowType window) {
        … window …
      }
    }
    

El entorno de ejecución proporcionará automáticamente la ventana a tu DoFn.

DoFn s se reutilizan en varios paquetes

Usuarios afectados: Todos | Impacto: Puede causar resultados inesperados de forma silenciosa | Problema de JIRA: BEAM-38

Para permitir mejoras en el rendimiento, ahora se puede volver a usar el mismo DoFn para procesar varios paquetes de elementos, en lugar de garantizar una instancia nueva por paquete. Cualquier DoFn que mantenga el estado local (p.Ej. Variables de instancia) más allá del final de un paquete puede experimentar cambios de comportamiento, ya que el próximo paquete comenzará con ese estado en lugar de una copia nueva.

Para administrar el ciclo de vida, se agregaron nuevos métodos @Setup y @Teardown. El ciclo de vida completo es como se detalla a continuación (si bien una falla puede truncar el ciclo de vida en cualquier punto):

  • @Setup: Inicialización por instancia del DoFn, como la apertura de conexiones reutilizables.
  • Cualquier número de la siguiente secuencia:
    • @StartBundle: Inicialización por paquete, como restablecer el estado de DoFn.
    • @ProcessElement: El procesamiento de elementos habitual.
    • @FinishBundle: pasos de conclusión de paquetes, como la eliminación de efectos secundarios.
  • @Teardown: Desmantelamiento por instancia de los recursos que posee el DoFn, como el cierre de conexiones reutilizables.

Nota: Se espera que este cambio tenga un impacto limitado en la práctica. Sin embargo, no genera un error de tiempo de compilación y tiene el potencial de causar resultados inesperados de forma silenciosa.

Se cambió el orden de los parámetros cuando se especifican entradas o salidas adicionales

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1422

Ahora, DoFn siempre debe especificarse primero cuando se aplica un `ParDo. En lugar de esto:

foos.apply(ParDo
        .withSideInputs(sideA, sideB)
        .withOutputTags(mainTag, sideTag)
        .of(new MyDoFn()))
    

escribirás esto:

foos.apply(ParDo
        .of(new MyDoFn())
        .withSideInputs(sideA, sideB)
        .withOutputTags(mainTag, sideTag))
    

PTransforms

Se quitó .named()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-370

Elimina los métodos .named() de PTransforms y subclases. En cambio, usa PCollection.apply(“name”, PTransform).

Se cambió el nombre "PTransform.apply()" a "PTransform.expand()".

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-438

Se cambió el nombre de PTransform.apply() a PTransform.expand() para evitar confusiones con PCollection.apply(). Todas las transformaciones compuestas escritas por el usuario deberán cambiar el nombre del método apply() reemplazado por expand(). No hay cambios respecto a cómo se construyen las canalizaciones.

Cambios rotundos adicionales

La siguiente es una lista de cambios rotundos adicionales y cambios a futuro.

Cambios individuales de la API

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-725

Se eliminaron los siguientes GcpOptions: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

Usa GoogleCredentials.fromStream(InputStream for credential). La transmisión contiene un archivo de claves de la cuenta de servicio en el formato JSON de Google Developers Console o una credencial de usuario almacenada con el formato compatible con el SDK de Cloud.

Se cambió --enableProfilingAgent por --saveProfilesToGcs

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1122

Se movió --update a DataflowPipelineOptions

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-81

Mueve --update PipelineOption a DataflowPipelineOptions desde DataflowPipelineDebugOptions.

Se quitó BoundedSource.producesSortedKeys()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1201

Quitar producesSortedKeys() de BoundedSource.

API PubsubIO modificada

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-974, BEAM-1415

A partir de 2.0.0-beta2, PubsubIO.Read y PubsubIO.Write deben crearse instancias con PubsubIO.<T>read() y PubsubIO.<T>write() en lugar de los métodos estáticos de fábrica, como PubsubIO.Read.topic(String).

Se cambió el nombre de los métodos de configuración PubsubIO. Por ejemplo, PubsubIO.read().topic(String) se cambió por PubsubIO.read().fromTopic(). Del mismo modo: subscription() a fromSubscription(), timestampLabel y idLabel respectivamente a withTimestampAttribute y withIdAttribute, PubsubIO.write().topic() a PubsubIO.write().to().

En lugar de especificar Coder para analizar la carga útil del mensaje, PubsubIO expone funciones para leer y escribir strings, Avro messages y Protobuf messages, por ejemplo PubsubIO.readStrings(), PubsubIO.writeAvros(). Para leer y escribir tipos personalizados, usa PubsubIO.read/writeMessages() (y PubsubIO.readMessagesWithAttributes si se deben incluir atributos de mensaje) y usa ParDo o MapElements para convertir tu tipo personalizado a PubsubMessage.

Se quitó la compatibilidad de DatastoreIO para la API de v1beta2 no compatible

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-354

Ahora, DatastoreIO está basado en la API de Cloud Datastore v1.

modificado DisplayData.Builder

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-745

DisplayData.Builder.include(..) tiene un nuevo parámetro de ruta obligatorio para registrar datos de visualización de subcomponentes. Las API de Builder ahora muestran un DisplayData.ItemSpec<>, en lugar de DisplayData.Item.

Obligatorio FileBasedSink.getWriterResultCoder()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-887

Se transformó FileBasedSink.getWriterResultCoder en un método abstracto, que debe proporcionarse.

Se cambió el nombre "Filter.byPredicate()" a "Filter.by()".

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-342

Se quitó IntraBundleParallelization

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-414

Se cambió el nombre "RemoveDuplicates" a "Distinct"

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-239

Se cambió TextIO para usar una sintaxis diferente y operar solo en strings

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1354

TextIO.Read.from() se cambia a TextIO.read().from(); del mismo modo, TextIO.Write.to() se cambia a TextIO.write().to().

TextIO.Read ahora siempre muestra un PCollection<String> y no toma .withCoder() para analizar las strings. En su lugar, analiza las strings mediante la aplicación de un ParDo o MapElements a la colección. Del mismo modo, TextIO.Write ahora siempre lleva un PCollection<String>, y para escribir algo más para TextIO, convertirlo a String con un ParDo o MapElements.

Se cambió AvroIO para usar una sintaxis diferente.

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1402

Para leer y escribir los tipos generados Avro, AvroIO.Read.from().withSchema(Foo.class) se cambia a AvroIO.read(Foo.class).from(), del mismo modo AvroIO.Write.

Para leer y escribir registros genéricos Avro con un esquema especificado, AvroIO.Read.from().withSchema(Schema or String) se cambia a AvroIO.readGenericRecords().from(), del mismo modo AvroIO.Write.

Cambió KafkaIO para especificar parámetros de tipo de forma explícita y usar serializadores/deserializadores de Kafka.

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1573, BEAM-2221

En KafkaIO, ahora debe especificar los parámetros de clave y tipo de valor de forma explícita, por ejemplo, KafkaIO.<Foo, Bar>read() y KafkaIO.<Foo, Bar>write().

En lugar de usar Coder para interpretar los bytes clave y de valor, usa las clases Kafka Serializer y Deserializer estándar. Por ejemplo, en lugar de KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), use KafkaIO.read().withKeyDeserializer(StringDeserializer.class), del mismo modo para KafkaIO.write().

Sintaxis modificada para BigQueryIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1427

En lugar de BigQueryIO.Read.from() y BigQueryIO.Write.to(), usa BigQueryIO.read().from() y BigQueryIO.write().to().

Sintaxis modificada para KinesisIO.Read

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1428

En lugar de KinesisIO.Read.from().using(), usa KinesisIO.read().from().withClientProvider().

Sintaxis modificada para TFRecordIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1913

En lugar de TFRecordIO.Read.from() y TFRecordIO.Write.to(), usa TFRecordIO.read().from() y TFRecordIO.write().to().

Consolidado XmlSource y XmlSink bajo XmlIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1914

En lugar de usar XmlSource y XmlSink directamente, usa XmlIO.

Por ejemplo: en lugar de Read.from(XmlSource.from()), usa XmlIO.read().from(); en lugar de Write.to(XmlSink.writeOf()), usa XmlIO.write().to().

Cambió el nombre de CountingInput a GenerateSequence y se generalizó.

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1414

En lugar de CountingInput.unbounded(), usa GenerateSequence.from(0). En lugar de CountingInput.upTo(n), usa GenerateSequence.from(0).to(n).

modificado Count, Latest, Sample

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1417, BEAM-1421, BEAM-1423

Clases Count.PerElement , Count.PerKey , Count.Globally ahora son privadas, por lo que debe usar las funciones de fábrica, como Count.perElement() (mientras que antes podías usar new Count.PerElement() ). Además, si desea, por ejemplo, .withHotKeyFanout() en el resultado de la transformación, entonces no se puede hacer eso directamente en un resultado de .apply(Count.perElement()) y eso ya no es así. Count expone su función de combinación como Count.combineFn() y debe solicitar Combine.globally(Count.combineFn()) usted mismo.

Se aplican cambios similares a las transformaciones Latest y Sample.

Se cambió el orden de los parámetros para MapElements y FlatMapElements

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1418

Cuando se usa MapElements y FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), ahora el descriptor debe especificarse primero, por ejemplo, FlatMapElements.into(descriptor).via(fn).

Se modificó Window al configurar parámetros adicionales.

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1425

Cuando uses Window para configurar algo que no sea WindowFn (Window.into()), usa Window.configure(). Por ejemplo: en lugar de Window.triggering(...), usa Window.configure().triggering(...).

Se cambió el nombre "Write.Bound" a "Write".

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1416

La clase Write.Bound ahora es simplemente Write. Esto solo importa si extraes aplicaciones de Write.to(Sink) en una variable; su tipo solía ser Write.Bound<...>, ahora será Write<...>.

Se cambió el nombre de las clases de transformación Flatten

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1419

Las clases Flatten.FlattenIterables y Flatten.FlattenPCollectionList se renombran respectivamente a Flatten.Iterables y Flatten.PCollections.

Dividir GroupByKey.create(boolean) en dos métodos

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1420

GroupByKey.create(boolean fewKeys) ahora es simplemente GroupByKey.create() y GroupByKey.createWithFewKeys().

modificado SortValues

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1426

Los métodos BufferedExternalSorter.Options setter se renombran de setSomeProperty a withSomeProperty.

Dependencia adicional de la API de Google

A partir de la versión 2.0.0 del SDK, también debes habilitar la API de Cloud Resource Manager.

Actualizaciones de dependencias

La versión 2.x actualiza las versiones fijadas de la mayoría de las dependencias, como Avro, protobuf y gRPC. Es posible que algunas de estas dependencias hayan hecho cambios de última hora, lo que puede causar problemas si el código también depende directamente de la dependencia. Las versiones utilizadas en 2.0.0 se pueden encontrar en pom.xml o mediante mvn dependency:tree.

Refactorizaciones internas

Hubo cambios significativos en la estructura interna del SDK. Aquellos usuarios que se basaban en elementos más allá de la API pública (como las clases o métodos terminados en Internal o en paquetes util) quizás vean que varios elementos cambiaron de manera significativa.

Si usabas StateInternals o TimerInternals: estas API internas se han eliminado. Ahora puedes usar las State y las Timer API experimentales para DoFn.