En este documento, se resaltan los cambios principales entre las versiones 1.x y las versiones 2.x del SDK de Dataflow para Java.
Aviso de baja del SDK de Dataflow: El SDK de Dataflow 2.5.0 es la última versión del SDK de Dataflow que es independiente de las versiones del SDK de Apache Beam. El servicio de Dataflow es totalmente compatible con las actualizaciones oficiales del SDK de Apache Beam. El servicio de Dataflow también es compatible con los SDK de Apache Beam de actualizaciones anteriores a partir de la versión 2.0.0 y posteriores. Consulta la página de asistencia de Dataflow para obtener el estado de compatibilidad de varios SDK. La página de descargas de Apache Beam contiene notas de las versiones de los SDK de Apache Beam.
Migración de 1.x a 2.x
A fin de instalar y usar el SDK de Apache Beam 2.x para Java, consulta la guía de instalación del SDK de Apache Beam.
Cambios principales de 1.x a 2.x
NOTA: Todos los usuarios deben conocer estos cambios si quieren actualizarse a las versiones 2.x.
Renombramiento y reestructuramiento de paquetes
Como parte del proceso de generalización de Apache Beam para que trabaje bien con entornos más allá Google Cloud Platform, el código del SDK se volvió a estructurar y a cambiar de nombre.
Se cambió el nombre de com.google.cloud.dataflow
a org.apache.beam
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-78
El SDK ahora se declara en el paquete org.apache.beam
en lugar de com.google.cloud.dataflow
. Necesitas actualizar todas tus declaraciones de importación con este cambio.
Subpaquetes nuevos: runners.dataflow
, runners.direct
y io.gcp
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-77
Los corredores se reorganizaron en sus propios paquetes, por lo que muchos elementos de com.google.cloud.dataflow.sdk.runners
se movieron a org.apache.beam.runners.direct
o org.apache.beam.runners.dataflow
.
Las opciones de canalización específicas de las ejecuciones en el servicio de Dataflow se movieron de com.google.cloud.dataflow.sdk.options
a org.apache.beam.runners.dataflow.options
.
La mayoría de los conectores de E/S a servicios de Google Cloud Platform se movieron a subpaquetes. Por ejemplo, BigQueryIO se movió de com.google.cloud.dataflow.sdk.io
a org.apache.beam.sdk.io.gcp.bigquery
.
La mayoría de los IDE podrán ayudarte a identificar las ubicaciones nuevas. Para verificar la ubicación nueva de los archivos específicos, puedes usar t
a fin de buscar el código en GitHub. Las versiones del SDK de Dataflow 1.x para Java se crean a partir del repositorio GoogleCloudPlatform/DataflowJavaSDK (rama master-1.x
). Las versiones del SDK de Dataflow 2.x para Java corresponden al código del repositorio apache/beam.
Ejecutores
Se quitó Pipeline
de los nombres de ejecutores
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1185
Se quitó Pipeline
de los nombres de todos los ejecutores para acortarlos. Por ejemplo, DirectPipelineRunner
ahora es DirectRunner
y DataflowPipelineRunner
ahora es DataflowRunner
.
Se requiere establecer --tempLocation
en una ruta de acceso de Google Cloud Storage
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-430
En lugar de que debas especificar solo --stagingLocation
o --tempLocation
y, luego, Dataflow infiera el otro, el servicio de Dataflow ahora requiere que --gcpTempLocation
se establezca en una ruta de acceso de Google Cloud Storage del --tempLocation
más general.
A no ser que se anule, también se usará para el --stagingLocation
.
Se quitó InProcessPipelineRunner
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-243
DirectRunner
todavía se ejecuta en la máquina local de un usuario, pero ahora también es compatible con ejecuciones multiproceso, PCollections no delimitadas y activadores para resultados especulativos y tardíos. Se alinea de manera más próxima al modelo de Beam documentado y puede causar (de forma correcta) fallas de prueba de unidades adicionales.
Como esta funcionalidad ahora está en DirectRunner
, se quitó InProcessPipelineRunner
(el SDK de Dataflow 1.6+ para Java).
Se reemplazó BlockingDataflowPipelineRunner
por PipelineResult.waitToFinish()
Usuarios afectados: Todos | Impacto: Error de compilación
Ahora se quitó BlockingDataflowPipelineRunner
. Si tu código pretende ejecutar una canalización de manera programática y esperar hasta que finalice, debería usar DataflowRunner
y llamar de forma explícita a pipeline.run().waitToFinish()
.
Si usaste --runner BlockingDataflowPipelineRunner
en la línea de comandos a fin de inducir de forma interactiva que tu programa principal se bloquee hasta que finalice la canalización, esto es una preocupación del programa principal; debería proporcionar una opción como --blockOnRun
que la inducirá para llamar a waitToFinish()
.
Se reemplazó TemplatingDataflowPipelineRunner
por --templateLocation
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-551
La funcionalidad en TemplatingDataflowPipelineRunner
(el SDK de Dataflow 1.9+ para Java) se reemplazó mediante --templateLocation
con DataflowRunner
.
ParDo y DoFn
Los DoFn
usan anotaciones en lugar de anulaciones de métodos
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-37
A fin de permitir una mayor flexibilidad y personalización, DoFn
ahora usa anotaciones de método para personalizar el procesamiento en lugar de requerir que los usuarios anulen métodos específicos.
Las diferencias entre el DoFn
nuevo y el anterior se demuestran en el siguiente código de ejemplo. Antes (con el SDK de Dataflow 1.x para Java), tu código se vería de la siguiente manera:
new DoFn<Foo, Baz>() {
@Override
public void processElement(ProcessContext c) { … }
}
Ahora (con el SDK de Apache Beam 2.x para Java), tu código se verá de la siguiente manera:
new DoFn<Foo, Baz>() {
@ProcessElement // <-- This is the only difference
public void processElement(ProcessContext c) { … }
}
Si DoFn
accedió a ProcessContext#window()
, entonces hay más cambios. En lugar de esto:
public class MyDoFn extends DoFn<Foo, Baz> implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) {
… (MyWindowType) c.window() …
}
}
escribirás esto:
public class MyDoFn extends DoFn<Foo, Baz> {
@ProcessElement
public void processElement(ProcessContext c, MyWindowType window) {
… window …
}
}
o:
return new DoFn<Foo, Baz>() {
@ProcessElement
public void processElement(ProcessContext c, MyWindowType window) {
… window …
}
}
El entorno de ejecución proporcionará la ventana a tu DoFn
de forma automática.
Los DoFn
se vuelven a usar en varios paquetes
Usuarios afectados: Todos | Impacto: Puede causar resultados inesperados de forma silenciosa | Problema de JIRA: BEAM-38
A fin de permitir mejoras de rendimiento, ahora se puede volver a usar el mismo DoFn
para procesar varios paquetes de elementos, en lugar de garantizar una instancia nueva por paquete. Cualquier DoFn
que mantenga el estado local (p. ej., variables de instancia) más allá del final de un paquete puede experimentar cambios de comportamiento, ya que el próximo paquete comenzará con ese estado en lugar de una copia nueva.
Para administrar el ciclo de vida, se agregaron métodos @Setup
y @Teardown
nuevos.
El ciclo de vida completo es como se detalla a continuación (aunque una falla puede truncar el ciclo de vida en cualquier punto):
@Setup
: Inicialización por instancia delDoFn
, como la apertura de conexiones reutilizables.- Cualquier número de la siguiente secuencia:
@StartBundle
: Inicialización por paquete, como restablecer el estado deDoFn
.@ProcessElement
: El procesamiento de elementos habitual.@FinishBundle
: Pasos concluyentes por paquete, como limpiar los efectos secundarios.
@Teardown
: Desmontaje de los recursos por instancia que mantieneDoFn
, como cerrar conexiones reutilizables.
Nota: Se espera que este cambio tenga un impacto limitado en la práctica. Sin embargo, no genera un error de tiempo de compilación y tiene el potencial de causar resultados inesperados de forma silenciosa.
Se cambió el orden de los parámetros cuando se especifican entradas o salidas adicionales
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1422
Ahora, DoFn
siempre debe especificarse primero cuando se aplica un ParDo. En lugar de esto:
foos.apply(ParDo
.withSideInputs(sideA, sideB)
.withOutputTags(mainTag, sideTag)
.of(new MyDoFn()))
escribirás esto:
foos.apply(ParDo
.of(new MyDoFn())
.withSideInputs(sideA, sideB)
.withOutputTags(mainTag, sideTag))
PTransforms
Se quitó .named()
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-370
Quita los métodos .named()
de PTransforms y subclases. En su lugar, usa PCollection.apply(“name”, PTransform)
.
Se cambió el nombre de PTransform.apply()
a PTransform.expand()
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-438
Se cambió el nombre de PTransform.apply()
a PTransform.expand()
para evitar confusiones con PCollection.apply()
. Todas las transformaciones compuestas escritas por el usuario deberán cambiar el nombre del método apply()
anulado a expand()
. No hay cambios respecto a cómo se construyen las canalizaciones.
Cambios rotundos adicionales
La siguiente es una lista de cambios rotundos adicionales y cambios a futuro.
Cambios individuales de la API
Se quitaron --credentialDir
y --tokenServerUrl
, junto con las opciones relacionadas
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA:BEAM-725
Se quitaron los siguientes métodos GcpOptions
: TokenServerUrl
, CredentialDir
, CredentialId
, SecretsFile
, ServiceAccountName
, ServiceAccountKeyFile
.
Usa GoogleCredentials.fromStream(InputStream for credential)
. La transmisión contiene un archivo de claves de la cuenta de servicio en el formato JSON de Google Developers Console o una credencial de usuario almacenada con el formato compatible con el SDK de Cloud.
Se cambió --enableProfilingAgent
por --saveProfilesToGcs
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1122
Se movió --update
a DataflowPipelineOptions
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-81
Mueve --update PipelineOption
a DataflowPipelineOptions
desde DataflowPipelineDebugOptions
.
Se quitó BoundedSource.producesSortedKeys()
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1201
Quita producesSortedKeys()
de BoundedSource
.
Se cambió la API de PubsubIO
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-974, BEAM-1415
A partir de 2.0.0-beta2, con PubsubIO.Read
y PubsubIO.Write
, se debe crear una instancia mediante PubsubIO.<T>read()
y PubsubIO.<T>write()
en lugar de los métodos de fábrica estáticos, como PubsubIO.Read.topic(String)
.
Se cambiaron los nombres de los métodos para configurar PubsubIO
, p. ej., se cambió el nombre de PubsubIO.read().topic(String)
a PubsubIO.read().fromTopic()
. Del mismo modo: subscription()
a fromSubscription()
, timestampLabel
y idLabel
a withTimestampAttribute
y withIdAttribute
respectivamente, y PubsubIO.write().topic()
a PubsubIO.write().to()
.
En lugar de especificar Coder
a fin de analizar la carga útil del mensaje, PubsubIO
expone funciones para leer y escribir strings, mensajes de Avro y Protobuf, p. ej., PubsubIO.readStrings()
, PubsubIO.writeAvros()
. Para leer y escribir tipos personalizados, usa PubsubIO.read/writeMessages()
(y PubsubIO.readMessagesWithAttributes
si se deben incluir atributos de mensaje) y usa ParDo
o MapElements
para alternar entre tu tipo personalizado y PubsubMessage
.
Se quitó la compatibilidad de DatastoreIO para la API de v1beta2 no compatible
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-354
Ahora, DatastoreIO está basado en la API de Cloud Datastore v1.
Se cambió DisplayData.Builder
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA:BEAM-745
DisplayData.Builder.include(..)
requiere un nuevo parámetro de ruta para registrar datos de muestra de subcomponentes. Las API de compilador ahora muestran DisplayData.ItemSpec<>
, en lugar de DisplayData.Item
.
Se requiere FileBasedSink.getWriterResultCoder()
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-887
Se transformó FileBasedSink.getWriterResultCoder
en un método abstracto, que debe proporcionarse.
Se cambió el nombre de Filter.byPredicate()
a Filter.by()
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-342
Se quitó IntraBundleParallelization
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-414
Se cambió el nombre de RemoveDuplicates
a Distinct
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-239
Se cambió TextIO
para usar una sintaxis diferente y operar solo en strings
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1354
TextIO.Read.from()
se cambia a TextIO.read().from()
; del mismo modo, TextIO.Write.to()
se cambia a TextIO.write().to()
.
TextIO.Read
ahora siempre muestra PCollection<String>
y no toma .withCoder()
para analizar las strings. En su lugar, analiza las strings mediante la aplicación de ParDo
o MapElements
a la colección.
Del mismo modo, TextIO.Write
ahora lleva siempre una PCollection<String>
y, para escribir algo más en TextIO
, conviértelo en String
mediante ParDo
o MapElements
.
Se cambió AvroIO
para que use una sintaxis diferente
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1402
Para leer y escribir tipos generados de Avro, se cambió AvroIO.Read.from().withSchema(Foo.class)
a AvroIO.read(Foo.class).from()
, del mismo modo que AvroIO.Write
.
Para leer y escribir registros genéricos de Avro mediante un esquema especificado, AvroIO.Read.from().withSchema(Schema or String)
se cambia a AvroIO.readGenericRecords().from()
, del mismo modo que AvroIO.Write
.
Se cambió KafkaIO
para especificar parámetros de tipo de forma explícita y usar los serializadores y deserializadores de Kafka
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1573, BEAM-2221
Ahora, en KafkaIO, debes especificar los parámetros de tipo de clave y valor de forma explícita: p. ej., KafkaIO.<Foo, Bar>read()
y KafkaIO.<Foo, Bar>write()
.
En lugar de usar Coder
para interpretar bytes de clave y valor, usa las clases estándar Serializer
y Deserializer
de Kafka. P. ej., en lugar de KafkaIO.read().withKeyCoder(StringUtf8Coder.of())
, usa KafkaIO.read().withKeyDeserializer(StringDeserializer.class)
, del mismo modo que para KafkaIO.write()
.
Se cambió la sintaxis de BigQueryIO
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1427
En lugar de BigQueryIO.Read.from()
y BigQueryIO.Write.to()
, usa BigQueryIO.read().from()
y BigQueryIO.write().to()
.
Se cambió la sintaxis de KinesisIO.Read
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1428
En lugar de KinesisIO.Read.from().using()
, usa KinesisIO.read().from().withClientProvider()
.
Se cambió la sintaxis de TFRecordIO
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1913
En lugar de TFRecordIO.Read.from()
y TFRecordIO.Write.to()
, usa TFRecordIO.read().from()
y TFRecordIO.write().to()
.
XmlSource
y XmlSink
se consolidaron en XmlIO
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1914
En lugar de usar XmlSource
y XmlSink
directamente, usa XmlIO
.
P. ej., en lugar de Read.from(XmlSource.from())
, usa XmlIO.read().from()
, y en lugar de Write.to(XmlSink.writeOf())
, usa XmlIO.write().to()
.
Se cambió el nombre de CountingInput
a GenerateSequence
y se lo generalizó
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1414
En lugar de CountingInput.unbounded()
, usa GenerateSequence.from(0)
. En lugar de CountingInput.upTo(n)
, usa GenerateSequence.from(0).to(n)
.
Se cambió Count
, Latest
y Sample
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1417, BEAM-1421, BEAM-1423
Las clases Count.PerElement
, Count.PerKey
y Count.Globally
ahora son privadas, por lo que debe usar las funciones de fábrica, como Count.perElement()
(mientras que antes podías usar new Count.PerElement()
). Además, si deseas usar, por ejemplo, .withHotKeyFanout()
en el resultado de la transformación, no se puede hacer eso directamente en un resultado de .apply(Count.perElement())
. En su lugar, Count
expone su función de combinación como Count.combineFn()
y debes aplicar Combine.globally(Count.combineFn())
tú mismo.
Se aplican cambios similares a las transformaciones Latest
y Sample
.
Se cambió el orden de los parámetros para MapElements
y FlatMapElements
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1418
Cuando se usa MapElements
y FlatMapElements.via(SerializableFunction).withOutputType(TypeDescriptor)
, ahora el descriptor debe especificarse primero, p. ej., FlatMapElements.into(descriptor).via(fn)
.
Se cambió Window
cuando se configuran parámetros adicionales
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1425
Cuando uses Window
para configurar algo distinto a WindowFn
(Window.into()
), usa Window.configure()
. P. ej., en lugar de Window.triggering(...)
, usa Window.configure().triggering(...)
.
Se cambió el nombre de Write.Bound
a Write
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1416
La clase Write.Bound
ahora es solo Write
. Esto solo importa si extraías aplicaciones de Write.to(Sink)
en una variable; su tipo solía ser Write.Bound<...>
y ahora será Write<...>
.
Se cambió el nombre de las clases de transformación Flatten
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1419
Se cambió el nombre de las clases Flatten.FlattenIterables
y Flatten.FlattenPCollectionList
respectivamente a Flatten.Iterables
y Flatten.PCollections
.
Se dividió GroupByKey.create(boolean)
en dos métodos
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1420
GroupByKey.create(boolean fewKeys)
ahora es solo GroupByKey.create()
y GroupByKey.createWithFewKeys()
.
Se cambió SortValues
Usuarios afectados: Todos | Impacto: Error de compilación | Problema de JIRA: BEAM-1426
Los métodos definidores BufferedExternalSorter.Options
se cambiaron de nombre de setSomeProperty
a withSomeProperty
.
Dependencia adicional de la API de Google
A partir de la versión 2.0.0 del SDK, también debes habilitar la API de Cloud Resource Manager.
Actualizaciones de dependencias
La versión 2.x actualiza las versiones fijadas de la mayoría de las dependencias, incluidas Avro, protobuf y gRPC. Algunas de estas dependencias pueden tener cambios rotundos propios, lo que puede causar problemas si el código también depende de la dependencia directamente. Las versiones que se usan en 2.0.0 se pueden encontrar en pom.xml
o mediante mvn dependency:tree
.
Refactorizaciones internas
Hubo cambios significativos en la estructura interna del SDK. Aquellos usuarios que se basaban en elementos más allá de la API pública (como las clases o métodos terminados en Internal
o en paquetes util
) quizás vean que varios elementos cambiaron de manera significativa.
Si usabas StateInternals
o TimerInternals
: Se quitaron estas API internas. Ahora puedes usar las API experimentales State
y Timer
para DoFn
.