Java task patterns

Stay organized with collections Save and categorize content based on your preferences.

The e-commerce sample application demonstrates best practices for using Dataflow to implement streaming data analytics and real-time AI. The example contains a number of task patterns that show the best way to accomplish Java programming tasks that are commonly needed to create this type of application.

The application contains the following Java task patterns:

Use Apache Beam schemas to work with structured data

You can use Apache Beam schemas to make processing structured data easier.

Converting your objects to Rows lets you produce very clean Java code, which makes your directed acyclic graph (DAG) building exercise easier. You can also reference object properties as fields within the analytics statements that you create, instead of having to call methods.

Example

CountViewsPerProduct.java

Use JsonToRow to convert JSON data

Processing JSON strings in Dataflow is a common need. For example, JSON strings are processed when streaming clickstream information captured from web applications. To process JSON strings, you need to convert them into either Rows or plain old Java objects (POJOs) for the duration of the pipeline processing.

You can use the Apache Beam built-in transform JsonToRow to convert JSON strings to Rows. However, if you want a queue for processing unsuccessful messages, you need to build that separately, see Queuing unprocessable data for further analysis.

If you need to convert a JSON string to a POJO using AutoValue, register a schema for the type by using the @DefaultSchema(AutoValueSchema.class) annotation, then use the Convert utility class. The resulting code is similar to the following:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

For more information, including what different Java types you can infer schemas from, see Creating Schemas.

If JsonToRow does not work with your data, Gson is an alternative. Gson is fairly relaxed in its default processing of data, which might require you to build more validation into the data conversion process.

Examples

Use the AutoValue code generator to generate POJOs

Apache Beam schemas are often the best way to represent objects in a pipeline, because of the way they allow you to work with structured data. Nevertheless, at times a plain old Java object (POJO) is needed, such as when dealing with key-value objects or handling object state. Hand building POJOs requires you to code overrides for the equals() and hashcode() methods, which can be time consuming and error prone. Incorrect overrides might result in inconsistent application behavior or data loss.

To generate POJOs, use the AutoValue class builder. This option ensures that the necessary overrides are used and lets you avoid potential errors.

AutoValue is heavily used within the Apache Beam code base, so familiarity with it is useful if you want to develop Apache Beam pipelines on Dataflow using Java.

You can also AutoValue with Apache Beam schemas if you add an @DefaultSchema(AutoValueSchema.class) annotation. For more information, see Creating Schemas.

For more information about AutoValue, see Why AutoValue? and the AutoValue docs.

Example

Clickstream.java

Queue unprocessable data for further analysis

In production systems, it is important to handle problematic data. If possible, you validate and correct data in-stream, but when correction isn't possible, log the value to an unprocessed messages queue, sometimes called a dead-letter queue, for later analysis. Issues commonly occur when converting data from one format to another, for example when converting JSON strings to Rows.

To address this issue, use a multi-output transform to shuttle the elements containing the unprocessed data into another PCollection for further analysis. Because this processing is a common operation that you might want to use in many places in a pipeline, make the transform generic enough to use in multiple places. First, create an error object to wrap common properties, including the original data. Next, create a sink transform that has multiple options for the destination.

Examples

Apply data validation transforms serially

Data collected from external systems often needs cleaning. Structure your pipeline in a way that allows it to correct problematic data in-stream when possible and to send the data to a queue for further analysis when needed.

Because a single message might suffer from multiple issues that need correction, plan out the needed directed acyclic graph (DAG). If an element contains data with multiple defects, you must ensure that the element flows through all of the appropriate transforms.

For example, imagine an element with the following values, neither of which should be null:

{"itemA": null,"itemB": null}

Make sure the element flows through transforms that correct both potential issues:

badElements.apply(fixItemA).apply(fixItemB)

Your pipeline might have more serial steps, but fusion helps to minimize the processing overhead introduced.

Example

ValidateAndCorrectCSEvt.java

Use DoFn.StartBundle to micro-batch calls to external services

You might need to invoke external APIs as part of your pipeline. Because a pipeline distributes work across many compute resources, you can overwhelm an external service endpoint if you make a single call for each element flowing through the system, especially if you haven't applied any reducing functions.

To avoid this issue, batch calls to external systems.

You can batch calls using a GroupByKey transform or using the Apache Beam Timer API. However, these approaches both require shuffling, which introduces some processing overhead as well as the need for a magic number to determine the key space.

Instead, use the StartBundle and FinishBundle lifecycle elements to batch your data. With these options, no shuffling is needed.

One minor downside to this option is that bundle sizes are dynamically determined by the implementation of the runner based on what's currently happening inside the pipeline and its workers. In stream mode, bundles are often small in size. Dataflow bundling is influenced by backend factors like sharding usage, how much data is available for a particular key, and the throughput of the pipeline.

Example

EventItemCorrectionService.java

Use an appropriate side-input pattern for data enrichment

In streaming analytics applications, data is often enriched with additional information that might be useful for further analysis. For example, if you have the store ID for a transaction, you might want to add information about the store location. This additional information is often added by taking an element and bringing in information from a lookup table.

For lookup tables that are both slowly changing and smaller in size, bringing the table into the pipeline as a singleton class that implements the Map<K,V> interface works well. This option lets you avoid having each element do an API call for its lookup. After you include a copy of a table in the pipeline, you need to update it periodically to keep it fresh.

To handle slow updating side inputs, use the Apache Beam Side input patterns.

Caching

Side inputs are loaded in memory and are therefore cached automatically.

You can set the size of the cache by using the --setWorkerCacheMb option.

You can share the cache across DoFn instances and use external triggers to refresh the cache.

Example

SlowMovingStoreLocationDimension.java