Patrones de tareas de Java

En la aplicación de muestra de comercio electrónico, se demuestran las prácticas recomendadas para usar Dataflow a fin de implementar la estadísticas de datos de transmisión y la IA en tiempo real. El ejemplo contiene patrones de tareas que muestran la mejor manera de realizar tareas de programación de Java. Por lo general, estas tareas son necesarias para crear aplicaciones de comercio electrónico.

La aplicación contiene los siguientes patrones de tareas de Java:

Usa esquemas de Apache Beam para trabajar con datos estructurados

Puedes usar los esquemas de Apache Beam para facilitar el procesamiento de datos estructurados.

Convertir tus objetos en Filas te permite producir código Java muy limpio, lo que hace que el ejercicio de compilación del grafo acíclico dirigido (DAG) sea más fácil. También puedes hacer referencia a las propiedades del objeto como campos dentro de las declaraciones de estadísticas que creas, en lugar de tener que llamar a los métodos.

Ejemplo

CountViewsPerProduct.java

Usa JsonToRow para convertir datos JSON

Procesar strings JSON en Dataflow es una necesidad común, Por ejemplo, las strings JSON se procesan cuando se transmite información de flujo de clics capturada desde aplicaciones web. Para procesar cadenas JSON, debes convertirlas en Filas o en objetos antiguos y sin formato basados en Java (POJO) durante el procesamiento de la canalización.

Puedes usar la transformación incorporada de Apache Beam JsonToRow para convertir cadenas JSON en filas. Sin embargo, si quieres una cola para procesar mensajes con errores, debes compilarla por separado. Consulta Pon en cola datos no procesables a fin de analizarlos luego.

Si necesitas convertir una string JSON en un POJO mediante AutoValue, registra un esquema para el tipo mediante la anotación @DefaultSchema(AutoValueSchema.class) y, luego, usa el Convert. El código resultante es similar al siguiente:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

Para obtener más información, incluidos los diferentes tipos de Java de los que puedes inferir esquemas, consulta Crea esquemas.

Si JsonToRow no funciona con tus datos, Gson es una alternativa. Gson tiene un procesamiento predeterminado de datos bastante tranquilo, lo que puede requerir que compiles más validación en el proceso de conversión de datos.

Ejemplos

Usa el generador de código AutoValue para generar los POJOs

Los esquemas de Apache Beam suelen ser la mejor manera de representar objetos en una canalización, debido a la forma en la que te permiten trabajar con datos estructurados. Sin embargo, a veces se necesita un Objeto antiguo y sin formato basado en Java (POJO), por ejemplo, cuando se manejan objetos par clave-valor o el control del estado del objeto. El proceso de compilación manual de POJO requiere que codifiques las anulaciones para los métodos equals() y hashcode(), lo que puede llevar mucho tiempo y generar errores. Las anulaciones incorrectas pueden dar como resultado un comportamiento incoherente de la aplicación o una pérdida de datos.

Para generar los POJO, usa el compilador de clases AutoValue. Esta opción garantiza que se usen las anulaciones necesarias y te permite evitar posibles errores. AutoValue se usa mucho en la base de código de Apache Beam, por lo que es útil familiarizarte con este compilador de clases si deseas desarrollar canalizaciones de Apache Beam en Dataflow mediante Java.

También puedes AutoValue con esquemas de Apache Beam si agregas una anotación @DefaultSchema(AutoValueSchema.class). Para obtener más información, consulta Creación de esquemas.

Para obtener más información sobre AutoValue, consulta Por qué AutoValue? y los documentos de AutoValue.

Ejemplo

Clickstream.java

Pon en cola datos que no se pueden procesar para un análisis posterior

En los sistemas de producción, es importante manejar los datos problemáticos. Si es posible, validas y corriges los datos in-stream. Cuando no es posible la corrección, registra el valor en una cola de mensajes no procesados, a veces llamada cola de mensajes no entregados, para analizarla más tarde. Por lo general, los problemas se producen cuando se convierten datos de un formato en otro, por ejemplo, cuando se convierten strings JSON en filas.

Para solucionar este problema, usa una transformación de salida múltiple a fin de transportar los elementos que contienen los datos no procesados a otra PCollection para su posterior análisis. Este procesamiento es una operación común que puedes usar en muchos lugares de una canalización. Intenta que la transformación sea lo bastante genérica para usarla en varios lugares. Primero, crea un objeto de error para unir las propiedades comunes, incluidos los datos originales. A continuación, crea una transformación de receptor que tenga varias opciones para el destino.

Ejemplos

Aplica transformaciones de validación de datos en serie

Los datos recopilados de sistemas externos suelen necesitar limpieza. Estructura tu canalización para que pueda corregir los datos problemáticos en la transmisión cuando sea posible. Envía los datos a una cola para un análisis más detallado cuando sea necesario.

Debido a que un solo mensaje puede experimentar varios problemas que necesitan corrección, planifica el grafo acíclico dirigido (DAG) necesario. Si un elemento contiene datos con varios defectos, debes asegurarte de que el elemento fluya a través de las transformaciones adecuadas.

Por ejemplo, imagina un elemento con los siguientes valores, ninguno de los cuales debe ser nulo:

{"itemA": null,"itemB": null}

Asegúrate de que el elemento fluya a través de transformaciones que corrijan ambos problemas potenciales:

badElements.apply(fixItemA).apply(fixItemB)

Tu canalización puede tener más pasos en serie, pero la fusión ayuda a minimizar la sobrecarga de procesamiento ingresada.

Ejemplo

ValidateAndCorrectCSEvt.java

Usa DoFn.StartBundle para llamadas por microlotes a servicios externos

Es posible que debas invocar APIs externas como parte de la canalización. Debido a que una canalización distribuye trabajo en muchos recursos de procesamiento, realizar una sola llamada para cada elemento que fluye a través del sistema puede sobrecargar un extremo de servicio externo. Este problema es particularmente común cuando no aplicaste ninguna función de reducción.

Para evitar este problema, realiza llamadas por lotes a sistemas externos.

Puedes agrupar las llamadas por lotes con una transformación GroupByKey o la API de cronómetro de Apache Beam. Sin embargo, estos enfoques requieren una redistribución, que introduce parte de la sobrecarga del procesamiento y la necesidad de un número mágico para determinar el espacio de claves.

En su lugar, usa los elementos del ciclo de vida StartBundle y FinishBundle para agrupar tus datos por lotes. Con estas opciones, no es necesario realizar cambios de redistribución.

Una desventaja menor de esta opción es que la implementación del ejecutor determina de forma dinámica los tamaños de los paquetes según lo que ocurre actualmente dentro de la canalización y sus trabajadores. En el modo de transmisión, los paquetes suelen ser pequeños. La creación de paquetes de Dataflow se ve afectada por factores de backend, como la fragmentación del uso, la cantidad de datos disponibles para una clave en particular y la capacidad de procesamiento de la canalización.

Ejemplo

EventItemCorrectionService.java

Usa un patrón de entrada complementaria adecuado para el enriquecimiento de datos

En las aplicaciones de estadísticas de transmisión, los datos suelen enriquecerse con información adicional que podría ser útil para realizar un análisis posterior. Por ejemplo, si tienes el storeId de una transacción, te recomendamos que agregues información sobre la ubicación del almacenamiento. Esta información adicional a menudo se agrega si tomas un elemento y obtienes información de una tabla de consulta.

En el caso de las tablas de consulta que cambian con lentitud y tienen un tamaño menor, la tabla se agrega a la canalización como una clase singleton que implementa la interfaz Map<K,V>. Esta opción te permite evitar que cada elemento realice una llamada a la API para su búsqueda. Una vez que incluyas una copia de una tabla en la canalización, debes actualizarla de forma periódica para mantenerla actualizada.

Para controlar las entradas complementarias de actualización lenta, usa los patrones de entradas complementarias de Apache Beam.

Almacenamiento en caché

Las entradas complementarias se cargan en la memoria y, por lo tanto, se almacenan en caché automáticamente.

Puedes configurar el tamaño de la caché con la opción --setWorkerCacheMb.

Puedes compartir la caché en las instancias de DoFn y usar activadores externos para actualizarla.

Ejemplo

SlowMovingStoreLocationDimension.java