Migra desde el SDK de Dataflow 1.x para Java

En este documento, se resaltan los cambios principales entre las versiones 1.x y las versiones 2.x del SDK de Dataflow para Java.

Aviso de baja del SDK de Dataflow: El SDK de Dataflow 2.5.0 es la última versión del SDK de Dataflow que es independiente de las versiones del SDK de Apache Beam. El servicio de Dataflow es totalmente compatible con las actualizaciones oficiales del SDK de Apache Beam. El servicio de Dataflow también es compatible con los SDK de Apache Beam de actualizaciones anteriores a partir de la versión 2.0.0 y posteriores. Consulta la página de asistencia de Dataflow para obtener el estado de compatibilidad de varios SDK. La página de descargas de Apache Beam contiene notas de las versiones de los SDK de Apache Beam.

Migración de 1.x a 2.x

A fin de instalar y usar el SDK de Apache Beam 2.x para Java, consulta la Guía de instalación del SDK de Apache Beam.

Cambios principales de 1.x a 2.x

NOTA: Todos los usuarios deben conocer estos cambios si quieren actualizarse a las versiones 2.x.

Renombramiento y reestructuramiento de paquetes

Como parte del proceso de generalización de Apache Beam para que trabaje bien con entornos más allá Google Cloud Platform, el código del SDK se volvió a estructurar y a cambiar de nombre.

Se cambió el nombre de com.google.cloud.dataflow a org.apache.beam

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-78

El SDK ahora se declara en el paquete org.apache.beam en lugar de com.google.cloud.dataflow. Necesitas actualizar todas tus declaraciones de importación con este cambio.

Subpaquetes nuevos: runners.dataflow, runners.directio.gcp

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-77

Los corredores se reorganizaron en sus propios paquetes, por lo que muchos elementos de com.google.cloud.dataflow.sdk.runners se movieron a org.apache.beam.runners.direct o org.apache.beam.runners.dataflow.

Las opciones de canalización específicas de las ejecuciones en el servicio de Dataflow se movieron de com.google.cloud.dataflow.sdk.options a org.apache.beam.runners.dataflow.options.

La mayoría de los conectores de E/S a servicios de Google Cloud Platform se movieron a subpaquetes. Por ejemplo, BigQueryIO se movió de com.google.cloud.dataflow.sdk.io a org.apache.beam.sdk.io.gcp.bigquery.

La mayoría de los IDE podrán ayudarte a identificar las ubicaciones nuevas. Para verificar la ubicación nueva de los archivos específicos, puedes usar t a fin de buscar el código en GitHub. Las versiones del SDK de Dataflow 1.x para Java se crean a partir del repositorio GoogleCloudPlatform/DataflowJavaSDK (rama master-1.x). Las versiones del SDK de Dataflow 2.x para Java corresponden al código del repositorio apache/beam.

Ejecutores

Se quitó Pipeline de los nombres de ejecutores

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1185

Se quitó Pipeline de los nombres de todos los ejecutores para acortarlos. Por ejemplo, DirectPipelineRunner ahora es DirectRunner y DataflowPipelineRunner ahora es DataflowRunner.

Se requiere establecer --tempLocation en una ruta de acceso de Google Cloud Storage

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-430

En lugar de que debas especificar solo --stagingLocation o --tempLocation y, luego, Dataflow infiera el otro, el servicio de Dataflow ahora requiere que --gcpTempLocation se establezca en una ruta de acceso de Google Cloud Storage del --tempLocation más general. A no ser que se anule, también se usará para el --stagingLocation.

Se quitó InProcessPipelineRunner

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-243

DirectRunner todavía se ejecuta en la máquina local de un usuario, pero ahora también es compatible con ejecuciones multiproceso, PCollections no delimitadas y activadores para resultados especulativos y tardíos. Se alinea de manera más próxima al modelo de Beam documentado y puede causar (de forma correcta) fallas de prueba de unidades adicionales.

Como esta funcionalidad ahora está en DirectRunner, se quitó InProcessPipelineRunner (el SDK de Dataflow 1.6+ para Java).

Se reemplazó BlockingDataflowPipelineRunner por PipelineResult.waitToFinish()

Usuarios afectados: Todos | Impacto: Error de compilación

Ahora se quitó BlockingDataflowPipelineRunner. Si tu código pretende ejecutar una canalización de manera programática y esperar hasta que finalice, debería usar DataflowRunner y llamar de forma explícita a pipeline.run().waitToFinish().

Si usaste --runner BlockingDataflowPipelineRunner en la línea de comandos a fin de inducir de forma interactiva que tu programa principal se bloquee hasta que finalice la canalización, esto es una preocupación del programa principal; debería proporcionar una opción como --blockOnRun que la inducirá para llamar a waitToFinish().

Se reemplazó TemplatingDataflowPipelineRunner por --templateLocation

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-551

La funcionalidad en TemplatingDataflowPipelineRunner (el SDK de Dataflow 1.9+ para Java) se reemplazó mediante --templateLocation con DataflowRunner.

ParDo y DoFn

Los DoFn usan anotaciones en lugar de anulaciones de métodos

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-37

A fin de permitir una mayor flexibilidad y personalización, DoFn ahora usa anotaciones de método para personalizar el procesamiento en lugar de requerir que los usuarios anulen métodos específicos.

Las diferencias entre el DoFn nuevo y el anterior se demuestran en el siguiente código de ejemplo. Antes (con el SDK de Dataflow 1.x para Java), tu código se vería de la siguiente manera:

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

Ahora (con el SDK de Apache Beam 2.x para Java), tu código se verá de la siguiente manera:

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

Si DoFn accedió a ProcessContext#window(), entonces hay más cambios. En lugar de esto:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

escribirás esto:

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

o:

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

El entorno de ejecución proporcionará la ventana a tu DoFn de forma automática.

Los DoFn se vuelven a usar en varios paquetes

Usuarios afectados: Todos | Impacto: Puede causar resultados inesperados de forma silenciosa | Problema de JIRA: BEAM-38

A fin de permitir mejoras de rendimiento, ahora se puede volver a usar el mismo DoFn para procesar varios paquetes de elementos, en lugar de garantizar una instancia nueva por paquete. Cualquier DoFn que mantenga el estado local (p. ej., variables de instancia) más allá del final de un paquete puede experimentar cambios de comportamiento, ya que el próximo paquete comenzará con ese estado en lugar de una copia nueva.

Para administrar el ciclo de vida, se agregaron métodos @Setup y @Teardown nuevos. El ciclo de vida completo es como se detalla a continuación (aunque una falla puede truncar el ciclo de vida en cualquier punto):

  • @Setup: Inicialización por instancia del DoFn, como la apertura de conexiones reutilizables.
  • Cualquier número de la siguiente secuencia:
    • @StartBundle: Inicialización por paquete, como restablecer el estado de DoFn.
    • @ProcessElement: El procesamiento de elementos habitual.
    • @FinishBundle: Pasos concluyentes por paquete, como limpiar los efectos secundarios.
  • @Teardown: Desmontaje de los recursos por instancia que mantiene DoFn, como cerrar conexiones reutilizables.

Nota: Se espera que este cambio tenga un impacto limitado en la práctica. Sin embargo, no genera un error de tiempo de compilación y tiene el potencial de causar resultados inesperados de forma silenciosa.

Se cambió el orden de los parámetros cuando se especifican entradas o salidas adicionales

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1422

Ahora, DoFn siempre debe especificarse primero cuando se aplica un ParDo. En lugar de esto:

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

escribirás esto:

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransforms

Se quitó .named()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-370

Quita los métodos .named() de PTransforms y subclases. En su lugar, usa PCollection.apply(“name”, PTransform).

Se cambió el nombre de PTransform.apply() a PTransform.expand()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-438

Se cambió el nombre de PTransform.apply() a PTransform.expand() para evitar confusiones con PCollection.apply(). Todas las transformaciones compuestas escritas por el usuario deberán cambiar el nombre del método apply() anulado a expand(). No hay cambios respecto a cómo se construyen las canalizaciones.

Cambios rotundos adicionales

La siguiente es una lista de cambios rotundos adicionales y cambios a futuro.

Cambios individuales de la API

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA:BEAM-725

Se quitaron los siguientes métodos GcpOptions: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

Usa GoogleCredentials.fromStream(InputStream for credential). La transmisión contiene un archivo de claves de la cuenta de servicio en el formato JSON de Google Developers Console o una credencial de usuario almacenada con el formato compatible con el SDK de Cloud.

Se cambió --enableProfilingAgent por --saveProfilesToGcs

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1122

Se movió --update a DataflowPipelineOptions

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-81

Mueve --update PipelineOption a DataflowPipelineOptions desde DataflowPipelineDebugOptions.

Se quitó BoundedSource.producesSortedKeys()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1201

Quita producesSortedKeys() de BoundedSource.

Se cambió la API de PubsubIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-974BEAM-1415

A partir de 2.0.0-beta2, con PubsubIO.Read y PubsubIO.Write, se debe crear una instancia mediante PubsubIO.<T>read() y PubsubIO.<T>write() en lugar de los métodos de fábrica estáticos, como PubsubIO.Read.topic(String).

Se cambiaron los nombres de los métodos para configurar PubsubIO, p. ej., se cambió el nombre de PubsubIO.read().topic(String) a PubsubIO.read().fromTopic(). Del mismo modo: subscription() a fromSubscription(), timestampLabel y idLabel a withTimestampAttribute y withIdAttribute respectivamente, y PubsubIO.write().topic() a PubsubIO.write().to().

En lugar de especificar Coder a fin de analizar la carga útil del mensaje, PubsubIO expone funciones para leer y escribir strings, mensajes de Avro y Protobuf, p. ej., PubsubIO.readStrings(), PubsubIO.writeAvros(). Para leer y escribir tipos personalizados, usa PubsubIO.read/writeMessages() (y PubsubIO.readMessagesWithAttributes si se deben incluir atributos de mensaje) y usa ParDo o MapElements para alternar entre tu tipo personalizado y PubsubMessage.

Se quitó la compatibilidad de DatastoreIO para la API de v1beta2 no compatible

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-354

Ahora, DatastoreIO está basado en la API de Cloud Datastore v1.

Se cambió DisplayData.Builder

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA:BEAM-745

DisplayData.Builder.include(..) requiere un nuevo parámetro de ruta para registrar datos de muestra de subcomponentes. Las API de compilador ahora muestran DisplayData.ItemSpec<>, en lugar de DisplayData.Item.

Se requiere FileBasedSink.getWriterResultCoder()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-887

Se transformó FileBasedSink.getWriterResultCoder en un método abstracto, que debe proporcionarse.

Se cambió el nombre de Filter.byPredicate() a Filter.by()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-342

Se quitó IntraBundleParallelization

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-414

Se cambió el nombre de RemoveDuplicates a Distinct

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-239

Se cambió TextIO para usar una sintaxis diferente y operar solo en strings

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1354

TextIO.Read.from() se cambia a TextIO.read().from(); del mismo modo, TextIO.Write.to() se cambia a TextIO.write().to().

TextIO.Read ahora siempre muestra PCollection<String> y no toma .withCoder() para analizar las strings. En su lugar, analiza las strings mediante la aplicación de ParDo o MapElements a la colección. Del mismo modo, TextIO.Write ahora lleva siempre una PCollection<String> y, para escribir algo más en TextIO, conviértelo en String mediante ParDo o MapElements.

Se cambió AvroIO para que use una sintaxis diferente

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1402

Para leer y escribir tipos generados de Avro, se cambió AvroIO.Read.from().withSchema(Foo.class) a AvroIO.read(Foo.class).from(), del mismo modo que AvroIO.Write.

Para leer y escribir registros genéricos de Avro mediante un esquema especificado, AvroIO.Read.from().withSchema(Schema or String) se cambia a AvroIO.readGenericRecords().from(), del mismo modo que AvroIO.Write.

Se cambió KafkaIO para especificar parámetros de tipo de forma explícita y usar los serializadores y deserializadores de Kafka

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1573BEAM-2221

Ahora, en KafkaIO, debes especificar los parámetros de tipo de clave y valor de forma explícita: p. ej., KafkaIO.<Foo, Bar>read() y KafkaIO.<Foo, Bar>write().

En lugar de usar Coder para interpretar bytes de clave y valor, usa las clases estándar Serializer y Deserializer de Kafka. P. ej., en lugar de KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), usa KafkaIO.read().withKeyDeserializer(StringDeserializer.class), del mismo modo que para KafkaIO.write().

Se cambió la sintaxis de BigQueryIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1427

En lugar de BigQueryIO.Read.from() y BigQueryIO.Write.to(), usa BigQueryIO.read().from() y BigQueryIO.write().to().

Se cambió la sintaxis de KinesisIO.Read

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1428

En lugar de KinesisIO.Read.from().using(), usa KinesisIO.read().from().withClientProvider().

Se cambió la sintaxis de TFRecordIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1913

En lugar de TFRecordIO.Read.from() y TFRecordIO.Write.to(), usa TFRecordIO.read().from() y TFRecordIO.write().to().

XmlSource y XmlSink se consolidaron en XmlIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1914

En lugar de usar XmlSource y XmlSink directamente, usa XmlIO.

P. ej., en lugar de Read.from(XmlSource.from()), usa XmlIO.read().from(), y en lugar de Write.to(XmlSink.writeOf()), usa XmlIO.write().to().

Se cambió el nombre de CountingInput a GenerateSequence y se lo generalizó

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1414

En lugar de CountingInput.unbounded(), usa GenerateSequence.from(0). En lugar de CountingInput.upTo(n), usa GenerateSequence.from(0).to(n).

Se cambió Count, Latest y Sample

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1417BEAM-1421BEAM-1423

Las clases Count.PerElement, Count.PerKey y Count.Globally ahora son privadas, por lo que debe usar las funciones de fábrica, como Count.perElement() (mientras que antes podías usar new Count.PerElement()). Además, si deseas usar, por ejemplo, .withHotKeyFanout() en el resultado de la transformación, no se puede hacer eso directamente en un resultado de .apply(Count.perElement()). En su lugar, Count expone su función de combinación como Count.combineFn() y debes aplicar Combine.globally(Count.combineFn()) tú mismo.

Se aplican cambios similares a las transformaciones Latest y Sample.

Se cambió el orden de los parámetros para MapElements y FlatMapElements

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1418

Cuando se usa MapElements y FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), ahora el descriptor debe especificarse primero, p. ej., FlatMapElements.into(descriptor).via(fn).

Se cambió Window cuando se configuran parámetros adicionales

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1425

Cuando uses Window para configurar algo distinto a WindowFn (Window.into()), usa Window.configure(). P. ej., en lugar de Window.triggering(...), usa Window.configure().triggering(...).

Se cambió el nombre de Write.Bound a Write

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1416

La clase Write.Bound ahora es solo Write. Esto solo importa si extraías aplicaciones de Write.to(Sink) en una variable; su tipo solía ser Write.Bound<...> y ahora será Write<...>.

Se cambió el nombre de las clases de transformación Flatten

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1419

Se cambió el nombre de las clases Flatten.FlattenIterables y Flatten.FlattenPCollectionList respectivamente a Flatten.Iterables y Flatten.PCollections.

Se dividió GroupByKey.create(boolean) en dos métodos

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1420

GroupByKey.create(boolean fewKeys) ahora es solo GroupByKey.create() y GroupByKey.createWithFewKeys().

Se cambió SortValues

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1426

Los métodos definidores BufferedExternalSorter.Options se cambiaron de nombre de setSomeProperty a withSomeProperty.

Dependencia adicional de la API de Google

A partir de la versión 2.0.0 del SDK, también debes habilitar la API de Cloud Resource Manager.

Actualizaciones de dependencias

La versión 2.x actualiza las versiones fijadas de la mayoría de las dependencias, incluidas Avro, protobuf y gRPC. Algunas de estas dependencias pueden tener cambios rotundos propios, lo que puede causar problemas si el código también depende de la dependencia directamente. Las versiones que se usan en 2.0.0 se pueden encontrar en pom.xml o mediante mvn dependency:tree.

Refactorizaciones internas

Hubo cambios significativos en la estructura interna del SDK. Aquellos usuarios que se basaban en elementos más allá de la API pública (como las clases o métodos terminados en Internal o en paquetes util) quizás vean que varios elementos cambiaron de manera significativa.

Si usabas StateInternals o TimerInternals: Se quitaron estas API internas. Ahora puedes usar las API experimentales State y Timer para DoFn.