La aplicación de ejemplo de comercio electrónico muestra las prácticas recomendadas para usar Dataflow con el fin de implementar analíticas de datos de streaming e IA en tiempo real. El ejemplo contiene patrones de tareas que muestran la mejor forma de llevar a cabo tareas de programación en Java. Estas tareas suelen ser necesarias para crear aplicaciones de comercio electrónico.
La aplicación contiene los siguientes patrones de tareas de Java:
- Usar esquemas de Apache Beam para trabajar con datos estructurados
- Usar JsonToRow para convertir datos de JSON
- Usar el generador de código de
AutoValue
para crear objetos Java antiguos (POJO) - Poner en cola datos que no se pueden procesar para su análisis
- Aplicar transformaciones de validación de datos en serie
- Usar
DoFn.StartBundle
para realizar llamadas en microlotes a servicios externos - Utilizar un patrón de entrada secundaria adecuado
Usar esquemas de Apache Beam para trabajar con datos estructurados
Puedes usar esquemas de Apache Beam para facilitar el procesamiento de datos estructurados.
Convertir tus objetos en Rows te permite generar código Java muy limpio, lo que facilita la tarea de crear un gráfico acíclico dirigido (DAG). También puede hacer referencia a las propiedades de los objetos como campos en las instrucciones de analíticas que cree, en lugar de tener que llamar a métodos.
Ejemplo
Usar JsonToRow para convertir datos de JSON
Procesar cadenas JSON en Dataflow es una necesidad habitual. Por ejemplo, las cadenas JSON se procesan al transmitir información de clickstream capturada de aplicaciones web. Para procesar cadenas JSON, debe convertirlas en filas o en objetos Java antiguos (POJO) durante el procesamiento de la canalización.
Puedes usar la transformación integrada JsonToRow de Apache Beam para convertir cadenas JSON en filas. Sin embargo, si quieres una cola para procesar los mensajes que no se han podido procesar, debes crearla por separado. Consulta Poner en cola datos que no se pueden procesar para su análisis.
Si necesitas convertir una cadena JSON en un POJO mediante AutoValue, registra un esquema para el tipo con la anotación @DefaultSchema(AutoValueSchema.class)
y, a continuación, usa la clase de utilidad Convert. El código resultante es similar al siguiente:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Para obtener más información, incluido el tipo de Java a partir del que puedes inferir esquemas, consulta el artículo Crear esquemas.
Si JsonToRow no funciona con tus datos, puedes usar Gson como alternativa. Gson es bastante flexible en el procesamiento predeterminado de datos, por lo que es posible que tengas que añadir más validación al proceso de conversión de datos.
Ejemplos
Usar el generador de código AutoValue
para generar POJOs
Los esquemas de Apache Beam suelen ser la mejor forma de representar objetos en un flujo de procesamiento, ya que te permiten trabajar con datos estructurados. Sin embargo, a veces se necesita un objeto Java antiguo (POJO), como cuando se trabaja con objetos de clave-valor o se gestiona el estado de un objeto.
Para crear POJOs manualmente, debes codificar las anulaciones de los métodos equals()
y hashcode()
, lo que puede llevar mucho tiempo y dar lugar a errores. Las anulaciones incorrectas
pueden provocar un comportamiento incoherente de la aplicación o pérdida de datos.
Para generar POJOs, usa el
AutoValue
constructor de clases. Esta opción asegura que se usen las anulaciones necesarias y te permite evitar posibles errores.
AutoValue
se usa mucho en la base de código de Apache Beam, por lo que es útil familiarizarse con este compilador de clases si quieres desarrollar flujos de procesamiento de Apache Beam en Dataflow con Java.
También puedes AutoValue
con esquemas de Apache Beam si añades una anotación @DefaultSchema(AutoValueSchema.class)
. Para obtener más información, consulta el artículo Crear esquemas.
Para obtener más información sobre AutoValue
, consulta Por qué AutoValue?
y los documentos de AutoValue
.
Ejemplo
Poner en cola datos que no se pueden procesar para su análisis
En los sistemas de producción, es importante gestionar los datos problemáticos. Si es posible, valida y corrige los datos en el flujo. Cuando no sea posible corregir el valor, regístralo en una cola de mensajes no procesados (a veces denominada cola de mensajes fallidos) para analizarlo más adelante. Los problemas suelen producirse al convertir datos de un formato a otro, por ejemplo, al convertir cadenas JSON en filas.
Para solucionar este problema, usa una transformación de varias salidas para transferir los elementos que contienen los datos sin procesar a otra PCollection para analizarlos más a fondo. Este procesamiento es una operación habitual que puede que quieras usar en muchos lugares de una canalización. Intenta que la transformación sea lo suficientemente genérica como para usarla en varios lugares. Primero, crea un objeto de error para envolver las propiedades comunes, incluidos los datos originales. A continuación, crea una transformación de receptor que tenga varias opciones de destino.
Ejemplos
Aplicar transformaciones de validación de datos en serie
Los datos recogidos de sistemas externos a menudo necesitan limpieza. Estructura tu flujo de procesamiento de datos de forma que pueda corregir los datos problemáticos en el flujo cuando sea posible. Enviar los datos a una cola para analizarlos más a fondo cuando sea necesario.
Como un mismo mensaje puede tener varios problemas que deben corregirse, planifica el gráfico acíclico dirigido (DAG) necesario. Si un elemento contiene datos con varios defectos, debe asegurarse de que el elemento pase por las transformaciones adecuadas.
Por ejemplo, imagina un elemento con los siguientes valores, ninguno de los cuales debe ser nulo:
{"itemA": null,"itemB": null}
Asegúrate de que el elemento se someta a transformaciones que corrijan ambos problemas potenciales:
badElements.apply(fixItemA).apply(fixItemB)
Es posible que tu canal tenga más pasos en serie, pero fusión te ayuda a minimizar la sobrecarga de procesamiento que se introduce.
Ejemplo
Usar DoFn.StartBundle
para realizar llamadas en microlotes a servicios externos
Es posible que tengas que invocar APIs externas como parte de tu canalización. Como una canalización distribuye el trabajo entre muchos recursos de computación, hacer una sola llamada por cada elemento que fluye por el sistema puede sobrecargar un endpoint de servicio externo. Este problema es especialmente habitual cuando no has aplicado ninguna función de reducción.
Para evitar este problema, agrupa las llamadas a sistemas externos.
Puedes agrupar llamadas mediante una transformación GroupByKey
o la API Timer de Apache Beam. Sin embargo, ambos enfoques requieren barajar, lo que introduce una sobrecarga de procesamiento y la necesidad de un número mágico para determinar el espacio de claves.
En su lugar, usa los elementos de ciclo de vida StartBundle
y FinishBundle
para agrupar tus datos. Con estas opciones, no es necesario mezclar.
Un pequeño inconveniente de esta opción es que los tamaños de los paquetes se determinan de forma dinámica mediante la implementación del ejecutor en función de lo que esté ocurriendo en el canal y sus elementos de trabajo. En el modo de transmisión, los paquetes suelen ser pequeños. El agrupamiento de Dataflow se ve influido por factores del backend, como el uso de particiones, la cantidad de datos disponibles para una clave concreta y el rendimiento del flujo de trabajo.
Ejemplo
EventItemCorrectionService.java
Utilizar un patrón de entrada secundaria adecuado para enriquecer los datos
En las aplicaciones de analíticas de streaming, los datos suelen enriquecerse con información adicional que puede ser útil para realizar análisis posteriores. Por ejemplo, si tiene el ID de tienda de una transacción, puede añadir información sobre la ubicación de la tienda. Esta información adicional se suele añadir tomando un elemento y obteniendo información de una tabla de consulta.
En el caso de las tablas de consulta que cambian lentamente y tienen un tamaño reducido, es recomendable incorporar la tabla a la canalización como una clase singleton que implemente la interfaz Map<K,V>
. Con esta opción, no es necesario que cada elemento haga una llamada a la API para buscarlo. Después de incluir una copia de una tabla en la canalización, debes actualizarla periódicamente para que esté al día.
Para gestionar las entradas secundarias que se actualizan lentamente, usa los patrones de entrada secundaria de Apache Beam.
Almacenamiento en caché
Las entradas secundarias se cargan en la memoria y, por lo tanto, se almacenan en caché automáticamente.
Puedes definir el tamaño de la caché con la opción --setWorkerCacheMb
.
Puedes compartir la caché entre instancias de DoFn
y usar activadores externos para actualizarla.
Ejemplo
SlowMovingStoreLocationDimension.java