En la aplicación de muestra de comercio electrónico, se demuestran las prácticas recomendadas para usar Dataflow a fin de implementar la estadísticas de datos de transmisión y la IA en tiempo real. El ejemplo contiene patrones de tareas que muestran la mejor manera de realizar tareas de programación de Java. Por lo general, estas tareas son necesarias para crear aplicaciones de comercio electrónico.
La aplicación contiene los siguientes patrones de tareas de Java:
- Usa esquemas de Apache Beam para trabajar con datos estructurados
- Usa JsonToRow para convertir datos JSON
- Usa el generador de código
AutoValue
para generar objetos antiguos y sin formato basados en Java (POJOs) - Pon en cola datos que no se pueden procesar para un análisis posterior
- Aplica transformaciones de validación de datos en serie
- Usa
DoFn.StartBundle
para llamadas por microlotes a servicios externos - Usa un patrón de entrada complementaria adecuado
Usa esquemas de Apache Beam para trabajar con datos estructurados
Puedes usar los esquemas de Apache Beam para facilitar el procesamiento de datos estructurados.
Convertir tus objetos en Filas te permite producir código Java muy limpio, lo que hace que el ejercicio de compilación del grafo acíclico dirigido (DAG) sea más fácil. También puedes hacer referencia a las propiedades del objeto como campos dentro de las declaraciones de estadísticas que creas, en lugar de tener que llamar a los métodos.
Ejemplo
Usa JsonToRow para convertir datos JSON
Procesar strings JSON en Dataflow es una necesidad común, Por ejemplo, las strings JSON se procesan cuando se transmite información de flujo de clics capturada desde aplicaciones web. Para procesar cadenas JSON, debes convertirlas en Filas o en objetos antiguos y sin formato basados en Java (POJO) durante el procesamiento de la canalización.
Puedes usar la transformación incorporada de Apache Beam JsonToRow para convertir cadenas JSON en filas. Sin embargo, si quieres una cola para procesar mensajes con errores, debes compilarla por separado. Consulta Pon en cola datos no procesables a fin de analizarlos luego.
Si necesitas convertir una string JSON en un POJO mediante AutoValue,
registra un esquema para el tipo mediante la
anotación @DefaultSchema(AutoValueSchema.class)
y, luego, usa el
Convert. El código resultante es similar al siguiente:
PCollection<String> json = ...
PCollection<MyUserType> = json
.apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
.apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))
Para obtener más información, incluidos los diferentes tipos de Java de los que puedes inferir esquemas, consulta Crea esquemas.
Si JsonToRow no funciona con tus datos, Gson es una alternativa. Gson tiene un procesamiento predeterminado de datos bastante tranquilo, lo que puede requerir que compiles más validación en el proceso de conversión de datos.
Ejemplos
Usa el generador de código AutoValue
para generar los POJOs
Los esquemas de Apache Beam
suelen ser la mejor manera de representar objetos en una canalización, debido a la forma en la que te permiten trabajar con datos estructurados. Sin embargo, a veces se necesita un
Objeto antiguo y sin formato basado en Java (POJO),
por ejemplo, cuando se manejan objetos par clave-valor o el control del estado del objeto.
El proceso de compilación manual de POJO requiere que codifiques las anulaciones para los métodos equals()
y hashcode()
, lo que puede llevar mucho tiempo y generar errores. Las anulaciones incorrectas
pueden dar como resultado un comportamiento incoherente de la aplicación o una pérdida de datos.
Para generar los POJO, usa el
compilador de clases
AutoValue
. Esta opción garantiza que se usen las anulaciones necesarias y te permite evitar posibles errores.
AutoValue
se usa mucho en la base de código de Apache Beam, por lo que es útil familiarizarte con este compilador de clases si deseas desarrollar canalizaciones de Apache Beam en Dataflow mediante Java.
También puedes AutoValue
con esquemas de Apache Beam si agregas una anotación @DefaultSchema(AutoValueSchema.class)
. Para obtener más información, consulta Creación de esquemas.
Para obtener más información sobre AutoValue
,
consulta Por qué AutoValue?
y los documentos de AutoValue
.
Ejemplo
Pon en cola datos que no se pueden procesar para un análisis posterior
En los sistemas de producción, es importante manejar los datos problemáticos. Si es posible, validas y corriges los datos in-stream. Cuando no es posible la corrección, registra el valor en una cola de mensajes no procesados, a veces llamada cola de mensajes no entregados, para analizarla más tarde. Por lo general, los problemas se producen cuando se convierten datos de un formato en otro, por ejemplo, cuando se convierten strings JSON en filas.
Para solucionar este problema, usa una transformación de salida múltiple a fin de transportar los elementos que contienen los datos no procesados a otra PCollection para su posterior análisis. Este procesamiento es una operación común que puedes usar en muchos lugares de una canalización. Intenta que la transformación sea lo bastante genérica para usarla en varios lugares. Primero, crea un objeto de error para unir las propiedades comunes, incluidos los datos originales. A continuación, crea una transformación de receptor que tenga varias opciones para el destino.
Ejemplos
Aplica transformaciones de validación de datos en serie
Los datos recopilados de sistemas externos suelen necesitar limpieza. Estructura tu canalización para que pueda corregir los datos problemáticos en la transmisión cuando sea posible. Envía los datos a una cola para un análisis más detallado cuando sea necesario.
Debido a que un solo mensaje puede experimentar varios problemas que necesitan corrección, planifica el grafo acíclico dirigido (DAG) necesario. Si un elemento contiene datos con varios defectos, debes asegurarte de que el elemento fluya a través de las transformaciones adecuadas.
Por ejemplo, imagina un elemento con los siguientes valores, ninguno de los cuales debe ser nulo:
{"itemA": null,"itemB": null}
Asegúrate de que el elemento fluya a través de transformaciones que corrijan ambos problemas potenciales:
badElements.apply(fixItemA).apply(fixItemB)
Tu canalización puede tener más pasos en serie, pero la fusión ayuda a minimizar la sobrecarga de procesamiento ingresada.
Ejemplo
Usa DoFn.StartBundle
para llamadas por microlotes a servicios externos
Es posible que debas invocar APIs externas como parte de la canalización. Debido a que una canalización distribuye trabajo en muchos recursos de procesamiento, realizar una sola llamada para cada elemento que fluye a través del sistema puede sobrecargar un extremo de servicio externo. Este problema es particularmente común cuando no aplicaste ninguna función de reducción.
Para evitar este problema, realiza llamadas por lotes a sistemas externos.
Puedes agrupar las llamadas por lotes con una transformación GroupByKey
o la API de cronómetro de Apache Beam. Sin embargo, estos enfoques requieren una redistribución, que introduce parte de la sobrecarga del procesamiento y la necesidad de un número mágico para determinar el espacio de claves.
En su lugar, usa los
elementos del ciclo de vida StartBundle
y
FinishBundle
para agrupar tus datos por lotes. Con estas opciones, no es necesario realizar cambios de redistribución.
Una desventaja menor de esta opción es que la implementación del ejecutor determina de forma dinámica los tamaños de los paquetes según lo que ocurre actualmente dentro de la canalización y sus trabajadores. En el modo de transmisión, los paquetes suelen ser pequeños. La creación de paquetes de Dataflow se ve afectada por factores de backend, como la fragmentación del uso, la cantidad de datos disponibles para una clave en particular y la capacidad de procesamiento de la canalización.
Ejemplo
EventItemCorrectionService.java
Usa un patrón de entrada complementaria adecuado para el enriquecimiento de datos
En las aplicaciones de estadísticas de transmisión, los datos suelen enriquecerse con información adicional que podría ser útil para realizar un análisis posterior. Por ejemplo, si tienes el storeId de una transacción, te recomendamos que agregues información sobre la ubicación del almacenamiento. Esta información adicional a menudo se agrega si tomas un elemento y obtienes información de una tabla de consulta.
En el caso de las tablas de consulta que cambian con lentitud y tienen un tamaño menor,
la tabla se agrega a la canalización como una clase singleton que implementa la interfaz Map<K,V>
. Esta opción te permite evitar que cada elemento realice una llamada a la API para su búsqueda. Una vez que incluyas una copia de una tabla en la canalización, debes actualizarla de forma periódica para mantenerla actualizada.
Para controlar las entradas complementarias de actualización lenta, usa los patrones de entradas complementarias de Apache Beam.
Almacenamiento en caché
Las entradas complementarias se cargan en la memoria y, por lo tanto, se almacenan en caché automáticamente.
Puedes configurar el tamaño de la caché con la opción --setWorkerCacheMb
.
Puedes compartir la caché en las instancias de DoFn
y usar activadores externos para actualizarla.
Ejemplo
SlowMovingStoreLocationDimension.java