Migra del SDK de Cloud Dataflow 1.x para Java

En este documento, se resaltan los cambios principales entre las versiones 1.x y las versiones 2.x del SDK de Cloud Dataflow para Java.

Aviso de baja del SDK de Cloud Dataflow: La versión 2.5.0 es la última actualización del SDK de Cloud Dataflow independiente de las actualizaciones del SDK de Apache Beam. El servicio de Cloud Dataflow es compatible en su totalidad con las versiones oficiales del SDK de Apache Beam. El servicio de Cloud Dataflow también es compatible con los SDK de Apache Beam de actualizaciones anteriores a partir de la versión 2.0.0. Consulta la página de compatibilidad de Cloud Dataflow para obtener el estado de compatibilidad de varios SDK. La página Descargas de Apache Beam contiene notas de las versiones de los SDK de Apache Beam.

Migración de 1.x a 2.x

A fin de instalar y usar el SDK de Apache Beam 2.x para Java, consulta la guía de instalación del SDK de Apache Beam.

Cambios principales de 1.x a 2.x

NOTA: Todos los usuarios deben conocer estos cambios si quieren actualizarse a las versiones 2.x.

Renombramiento y reestructuramiento de paquetes

Como parte del proceso de generalización de Apache Beam para que trabaje bien con entornos más allá Google Cloud Platform, el código del SDK se volvió a nombrar y a estructurar.

com.google.cloud.dataflow se renombró como org.apache.beam

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-78

Ahora, el SDK está declarado en el paquete org.apache.beam en lugar de com.google.cloud.dataflow. Necesitas actualizar todas tus declaraciones de importación con este cambio.

Subpaquetes nuevos: runners.dataflow, io.gcp y runners.direct

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-77

Los ejecutores se reorganizaron en sus propios paquetes, por lo que muchos elementos de com.google.cloud.dataflow.sdk.runners se movieron a org.apache.beam.runners.direct o, en su defecto, a org.apache.beam.runners.dataflow.

Las opciones de canalización especificas de las ejecuciones en el servicio de Cloud Dataflow se movieron de com.google.cloud.dataflow.sdk.options a org.apache.beam.runners.dataflow.options.

La mayoría de los conectores de E/S a servicios de Google Cloud Platform se movieron a subpaquetes. Por ejemplo, BigQueryIO se movió de com.google.cloud.dataflow.sdk.io a org.apache.beam.sdk.io.gcp.bigquery.

La mayoría de los IDE podrán ayudarte a identificar las ubicaciones nuevas. A fin de verificar la ubicación nueva de archivos específicos, puedes usar t para buscar el código en GitHub. Las versiones del SDK de Cloud Dataflow 1.x para Java se compilan desde el repositorio GoogleCloudPlatform/DataflowJavaSDK (rama master-1.x). Las versiones del SDK de Cloud Dataflow 2.x para Java corresponden al código del repositorio apache/beam.

Ejecutores

Se quitó Pipeline de los nombres de ejecutores

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1185

Se quitó Pipeline de los nombres de todos los ejecutores para acortarlos. Por ejemplo, DirectPipelineRunner ahora es DirectRunner y DataflowPipelineRunner es DataflowRunner.

Requiere establecer --tempLocation en una ruta de Google Cloud Storage

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-430

En lugar de permitirte especificar solo una --stagingLocation o --tempLocation y que luego Cloud Dataflow infiera las otras, el servicio de Cloud Dataflow ahora requiere que se establezca --tempLocation en una ruta de Google Cloud Storage, pero puede inferirse de la --gcpTempLocation más general. A no ser que se anule, también se usará para --stagingLocation.

Se quitó InProcessPipelineRunner

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-243

DirectRunner todavía se ejecuta en la máquina local de un usuario, pero ahora también es compatible con ejecuciones multiproceso, PCollection no delimitadas y activadores para resultados especulativos y tardíos. Se alinea de manera más próxima al modelo de Beam documentado y puede causar (de forma correcta) fallas de prueba de unidades adicionales.

Como esta función ahora se encuentra en DirectRunner, se quitó el InProcessPipelineRunner(SDK de Cloud Dataflow 1.6+ para Java).

BlockingDataflowPipelineRunner se reemplazó por PipelineResult.waitToFinish()

Usuarios afectados: Todos | Impacto: Error de compilación

Ahora, se quitó BlockingDataflowPipelineRunner. Si tu código pretende ejecutar una canalización de manera programática y esperar hasta que finalice, debería usar DataflowRunner y llamar de forma explícita a pipeline.run().waitToFinish().

Si usaste --runner BlockingDataflowPipelineRunner en la línea de comandos para inducir de forma interactiva a tu programa principal a bloquearse hasta que finalice la canalización, entonces este es un asunto del programa principal: debería proporcionar una opción como --blockOnRun que lo induzca a llamar a waitToFinish().

TemplatingDataflowPipelineRunner se reemplazó por --templateLocation

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-551

Se reemplazó la funcionalidad en TemplatingDataflowPipelineRunner (el SDK de Cloud Dataflow 1.9+ para Java) mediante --templateLocation con DataflowRunner.

ParDo y DoFn

DoFn usa anotaciones en lugar de anulaciones de método

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-37

A fin de permitir más flexibilidad y personalización, ahora DoFn usa anotaciones de método para personalizar el procesamiento en lugar de necesitar que los usuarios anulen métodos específicos.

La diferencia entre el DoFn nuevo y el anterior se demuestra en el siguiente código de muestra. Antes (con el SDK de Cloud Dataflow 1.x para Java), tu código se veía de la siguiente manera:

new DoFn<Foo, Baz>() {
  @Override
  public void processElement(ProcessContext c) { … }
}

Ahora (con el SDK de Apache Beam 2.x para Java), tu código se verá de la siguiente manera:

new DoFn<Foo, Baz>() {
  @ProcessElement   // <-- This is the only difference
  public void processElement(ProcessContext c) { … }
}

Si tu DoFn accedió a ProcessContext#window(), entonces hay más cambios. En lugar de esto:

public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
  @Override
  public void processElement(ProcessContext c) {
    … (MyWindowType) c.window() …
  }
}

escribirás esto:

public class MyDoFn extends DoFn<Foo, Baz> {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

o esto:

return new DoFn<Foo, Baz>() {
  @ProcessElement
  public void processElement(ProcessContext c, MyWindowType window) {
    … window …
  }
}

El entorno de ejecución proporcionará, de manera automática, la ventana a tu DoFn.

Los DoFn se vuelven a usar en múltiples paquetes

Usuarios afectados: Todos | Impacto: Puede causar resultados inesperados de forma silenciosa | Problema de JIRA: BEAM-38

A fin de permitir mejoras en el rendimiento, el mismo DoFn puede volver a usarse para procesar múltiples paquetes de elementos, en lugar de asegurar una instancia nueva por paquete. Cualquier DoFn que mantenga un estado local (p. ej., variables de instancia) más allá del fin de un paquete puede encontrar cambios de comportamiento, ya que el próximo paquete comenzará con ese estado en lugar de con una copia nueva.

A fin de administrar el ciclo de vida, se agregaron métodos de @Setup y @Teardown nuevos. El ciclo de vida completo es como se detalla a continuación (si bien una falla puede truncar el ciclo de vida en cualquier punto):

  • @Setup: inicialización de DoFn por instancia, como abrir conexiones reusables.
  • Cualquier número de la siguiente secuencia:
    • @StartBundle: inicialización por paquete, como restablecer el estado de DoFn.
    • @ProcessElement: el procesamiento de elementos usual.
    • @FinishBundle: pasos concluyentes por paquete, como limpiar los efectos secundarios.
  • @Teardown: desmontaje de los recursos de DoFn por instancia, como cerrar conexiones reusables.

Nota: Se espera que este cambio tenga un impacto limitado en la práctica. Sin embargo, no genera un error de tiempo de compilación y tiene el potencial de causar resultados inesperados de forma silenciosa.

Se cambió el orden de los parámetros cuando se especifican entradas o salidas adicionales

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1422

Ahora, DoFn siempre debe especificarse primero cuando se aplica un `ParDo. En lugar de esto:

foos.apply(ParDo
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag)
    .of(new MyDoFn()))

escribirás esto:

foos.apply(ParDo
    .of(new MyDoFn())
    .withSideInputs(sideA, sideB)
    .withOutputTags(mainTag, sideTag))

PTransforms

Se quitó .named()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-370

Quita los métodos .named() de PTransforms y subclases. En su lugar, usa PCollection.apply(“name”, PTransform).

PTransform.apply() se renombró como PTransform.expand()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-438

PTransform.apply() se renombró como PTransform.expand() para evitar confusión con PCollection.apply(). Todas las transformaciones compuestas escritas por el usuario deberán renombrar el método apply() como expand(). No hay cambios respecto a cómo se construyen las canalizaciones.

Cambios rotundos adicionales

La siguiente es una lista de cambios rotundos adicionales y cambios a futuro.

Cambios individuales de la API

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-725

Se quitó lo siguiente GcpOptions: TokenServerUrl, CredentialDir, CredentialId, SecretsFile, ServiceAccountName, ServiceAccountKeyFile.

Usa GoogleCredentials.fromStream(InputStream for credential). La transmisión contiene un archivo de claves de la cuenta de servicio en el formato JSON de Google Developers Console o una credencial de usuario almacenada con el formato compatible con el SDK de Cloud.

--enableProfilingAgent se cambió a --saveProfilesToGcs

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1122

--update se movió a DataflowPipelineOptions

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-81

Mueve --update PipelineOption a DataflowPipelineOptions desde DataflowPipelineDebugOptions.

Se quitó BoundedSource.producesSortedKeys()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1201

Quita producesSortedKeys() de BoundedSource.

Se cambió la API de PubsubIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-974, BEAM-1415

A partir de 2.0.0-beta2, PubsubIO.Read y PubsubIO.Write deben instanciarse con PubsubIO.<T>read() y PubsubIO.<T>write() en lugar de usar los métodos de fábrica estáticos, como PubsubIO.Read.topic(String).

Se renombraron los métodos para configurar PubsubIO, p. ej., se renombró PubsubIO.read().topic(String) como PubsubIO.read().fromTopic(). Del mismo modo, subscription() como fromSubscription(), idLabel y timestampLabel, respectivamente, como withIdAttribute y withTimestampAttribute, y PubsubIO.write().topic() como PubsubIO.write().to().

En lugar de especificar Coder a fin de analizar el mensaje de carga útil, PubsubIO expone funciones para leer y escribir strings, mensajes de Avro y de Protobuf, p. ej., PubsubIO.readStrings(), PubsubIO.writeAvros(). Para leer y escribir tipos personalizados, usa PubsubIO.read/writeMessages() (y PubsubIO.readMessagesWithAttributes si deben incluirse los atributos de mensaje) y usa ParDo o MapElements para convertir entre tu tipo personalizado y PubsubMessage.

Se quitó la compatibilidad de DatastoreIO para la API de v1beta2 no compatible

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-354

Ahora, DatastoreIO está basado en la API de Cloud Datastore v1.

Se cambió DisplayData.Builder

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-745

DisplayData.Builder.include(..) requiere un parámetro de ruta nuevo para registrar datos de muestra de subcomponentes. Las API de compilador ahora muestran un DisplayData.ItemSpec<>, en lugar de DisplayData.Item.

Se requiere FileBasedSink.getWriterResultCoder()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-887

FileBasedSink.getWriterResultCoder se transformó en un método abstracto, que debe proporcionarse.

Filter.byPredicate() se renombró como Filter.by()

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-342

Se quitó IntraBundleParallelization

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-414

RemoveDuplicates se renombró como Distinct

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-239

Se cambió TextIO para que use una sintaxis diferente y opere solo en strings

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1354

TextIO.Read.from() cambió a TextIO.read().from(). Del mismo modo, TextIO.Write.to() cambió a TextIO.write().to().

Ahora, TextIO.Read siempre muestra una PCollection<String> y no toma .withCoder() para analizar strings. En cambio, aplica ParDo o MapElements a la colección para analiza las strings. Del mismo modo, TextIO.Write ahora lleva siempre una PCollection<String>. A fin de escribir algo más en TextIO, conviértelo en String con ParDo o MapElements.

Se cambió AvroIO para que use una sintaxis diferente

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1402

Para leer y escribir tipos generados de Avro, AvroIO.Read.from().withSchema(Foo.class) cambió a AvroIO.read(Foo.class).from(), del mismo modo AvroIO.Write.

Para leer y escribir registros genéricos de Avro con un esquema especificado, AvroIO.Read.from().withSchema(Schema or String) cambió a AvroIO.readGenericRecords().from(), del mismo modo AvroIO.Write.

Se cambió KafkaIO para especificar parámetros de tipo de manera explícita y usar serializadores y deserializadores de Kafka

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1573, BEAM-2221

Ahora, en KafkaIO, debes especificar los parámetros de tipo de clave y valor de forma explícita: p. ej., KafkaIO.<Foo, Bar>read() y KafkaIO.<Foo, Bar>write().

En lugar de usar Coder para interpretar bytes de clave y valor, usa las clases estándar Serializer y Deserializer de Kafka. P. ej.: en lugar de KafkaIO.read().withKeyCoder(StringUtf8Coder.of()), usa KafkaIO.read().withKeyDeserializer(StringDeserializer.class), del mismo modo para KafkaIO.write().

Cambio de sintaxis de BigQueryIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1427

En lugar de BigQueryIO.Read.from() y BigQueryIO.Write.to(), usa BigQueryIO.read().from() y BigQueryIO.write().to().

Cambio de sintaxis de KinesisIO.Read

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1428

En lugar de KinesisIO.Read.from().using(), usa KinesisIO.read().from().withClientProvider().

Cambio de sintaxis de TFRecordIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1913

En lugar de TFRecordIO.Read.from() y TFRecordIO.Write.to(), usa TFRecordIO.read().from() y TFRecordIO.write().to().

Se consolidó XmlSource y XmlSink como XmlIO

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1914

En lugar de usar XmlSource y XmlSink de forma directa, usa XmlIO.

P. ej.: en lugar de Read.from(XmlSource.from()), usa XmlIO.read().from(); en lugar de Write.to(XmlSink.writeOf()), usa XmlIO.write().to().

CountingInput se renombró como GenerateSequence y se generalizó

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1414

En lugar de CountingInput.unbounded(), usa GenerateSequence.from(0). En lugar de CountingInput.upTo(n), usa GenerateSequence.from(0).to(n).

Se cambió Count, Latest, Sample

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1417, BEAM-1421, BEAM-1423

Las clases Count.PerElement, Count.PerKey y Count.Globally ahora son privadas, por lo que debes usar las funciones de fábrica, como Count.perElement() (antes podías usar new Count.PerElement()). Además, si deseas usar, p. ej., .withHotKeyFanout() en el resultado de la transformación, ya no puedes hacerlo de forma directa en el resultado de .apply(Count.perElement()). En su lugar, Count expone su función de combinar como Count.combineFn() y deberías aplicar Combine.globally(Count.combineFn()) tú mismo.

Se aplican cambios similares a las transformaciones Latest y Sample.

Se cambió el orden de los parámetros para MapElements y FlatMapElements

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1418

Ahora, cuando usas MapElements and y FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor), el descriptor debe especificarse primero: p. ej., FlatMapElements.into(descriptor).via(fn).

Se cambió Window cuando se configuran parámetros adicionales

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1425

Cuando usas Window para configurar algo distinto a WindowFn (Window.into()), usa Window.configure(). P. ej.: en lugar de Window.triggering(...), usa Window.configure().triggering(...).

Se renombró Write.Bound como Write

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1416

La clase Write.Bound ahora es solo Write. Esto importa solo si extraías aplicaciones de Write.to(Sink) a una variable; su tipo solía ser Write.Bound<...>, ahora será Write<...>.

Se renombraron las clases de transformaciones Flatten

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1419

Las clases Flatten.FlattenIterables y Flatten.FlattenPCollectionList se renombraron respectivamente como Flatten.Iterables y Flatten.PCollections.

Se dividió GroupByKey.create(boolean) en dos métodos

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1420

GroupByKey.create(boolean fewKeys) ahora es solo GroupByKey.create() y GroupByKey.createWithFewKeys().

Se cambió SortValues

Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1426

Los métodos definidores BufferedExternalSorter.Options (setSomeProperty) se renombraron como withSomeProperty.

Dependencia adicional de la API de Google

A partir de la versión 2.0.0 del SDK, también debes habilitar la API de Cloud Resource Manager.

Actualizaciones de dependencias

La versión 2.x actualiza las versiones fijadas de la mayoría de las dependencias, incluidas Avro, Protobuf y gRPC. Algunas de estas dependencias pueden tener cambios rotundos propios, lo que puede causar problemas si tu código también depende de forma directa de la dependencia. Las versiones usadas en 2.0.0 pueden encontrarse en el pom.xml o con mvn dependency:tree.

Refactorizaciones internas

Hubo cambios significativos en la estructura interna del SDK. Aquellos usuarios que se basaban en elementos más allá de la API pública (como las clases o métodos terminados en Internal o en paquetes util) quizás vean que varios elementos cambiaron de manera significativa.

Si usabas StateInternals o TimerInternals: se quitaron estas API internas. Puedes usar las API experimentales de State y Timer para DoFn.

¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.