This section provides a walkthrough of a series of example Dataflow pipelines that demonstrate more complex functionality than the basic WordCount example. The pipelines in this section process data from a hypothetical game that users play on their mobile phones. The pipelines demonstrate processing at increasing levels of complexity; the first pipeline, for example, shows how to run a batch analysis job to obtain relatively simple score data, while the later pipelines use Dataflow's windowing and triggers features to provide low-latency data analysis and more complex intelligence about user's play patterns.
Every time a user plays an instance of our hypothetical mobile game, they generate a data event. Each data event consists of the following information:
- The unique ID of the user playing the game.
- The team ID for the team to which the user belongs.
- A score value for that particular instance of play.
- A timestamp that records when the particular instance of play happened--this is the event time for each game data event.
When the user completes an instance of the game, their phone sends the data event to a game server, where the data is logged and stored in a file. Generally, the data is sent to the game server immediately upon completion. However, users can play the game "offline", when their phones are out of contact with the server (such as on an airplane, or outside network coverage area). When the user's phone comes back into contact with the game server, the phone will send all accumulated game data.
This means that some data events might be received by the game server significantly later than users generate them. This time difference can have processing implications for pipelines that make calculations that consider when each score was generated. Such pipelines might track scores generated during each hour of a day, for example, or they calculate the length of time that users are continuously playing the game—both of which depend on each data record's event time.
The Mobile Game example pipelines vary in complexity, from simple batch analysis to more complex pipelines that can perform real-time analysis and abuse detection. This section walks you through each example and demonstrates how to use Dataflow features like windowing and triggers to expand your pipeline's capabilities.
UserScore: Basic Score Processing in Batch
The UserScore
pipeline is the simplest example for processing mobile game data.
UserScore
determines the total score per user over a finite data set (for example,
one day's worth of scores stored on the game server). Pipelines like UserScore
are
best run periodically after all relevant data has been gathered. For example,
UserScore
could run as a nightly job over data gathered during that day.
What Does UserScore Do?
In a day's worth of scoring data, each user ID may have multiple records (if the user plays more than one instance of the game during the analysis window), each with their own score value and timestamp. If we want to determine the total score over all the instances a user plays during the day, our pipeline will need to group all the records together per individual user.
As the pipeline processes each event, the event score gets added to the sum total for that particular user.
UserScore
parses out only the data that it needs from each record, specifically the
user ID and the score value. The pipeline doesn't consider the event time for any record; it
simply processes all data present in the input files that you specify when you run the
pipeline.
UserScore
's basic pipeline flow does the following:
- Read the day's score data from a file stored in Google Cloud Storage.
- Sum the score values for each unique user by grouping each game event by user ID and combining the score values to get the total score for that particular user.
- Write the result data to a BigQuery table.
The following diagram shows score data for several users over the pipeline analysis period. In the diagram, each data point is an event that results in one user/score pair:
This example uses batch processing, and the diagram's Y-axis represents processing time: the pipeline processes events lower on the Y-axis first, and events higher up the axis later. The diagram's X-axis represents the event time for each game event, as denoted by that event's timestamp. Note that the individual events in the diagram are not processed by the pipeline in the same order as they occurred (according to their timestamps).
After reading the score events from the input file, the pipeline groups all of those user/score
pairs together and sums the score values into one total value per unique user.
UserScore
encapsulates the core logic for that step as the
user-defined composite transform
ExtractAndSumScore
:
ExtractAndSumScore
is written to be more general, in that you can pass in the field
by which you want to group the data (in the case of our game, by unique user or unique team). This
means we can re-use ExtractAndSumScore
in other pipelines that group score data by
team, for example.
Here's the main method of UserScore
, showing how we apply all three steps of the
pipeline:
Working with the Results
UserScore
writes the data to a BigQuery table (called user_score
by
default). With the data in the BigQuery table, we might perform a further interactive analysis,
such as querying for a list of the N top-scoring users for a given day.
Let's suppose we want to interactively determine the top 10 highest-scoring users for a given day. In the BigQuery user interface, we can run the following query:
SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10
Limitations
As written in the example, the UserScore
pipeline has a few limitations:
- Because some score data may be generated by offline players and sent after the daily cutoff,
for game data, the result data generated by the
UserScore
pipeline may be incomplete.UserScore
only processes the fixed input set present in the input file(s) when the pipeline runs. UserScore
processes all data events present in the input file at processing time and does not examine or otherwise error-check events based on event time. Therefore, the results may include some values whose event times fall outside the relevant analysis period, such as late records from the previous day.- Because
UserScore
runs only after all the data has been collected, it has high latency between when users generate data events (the event time) and when results are computed (the processing time). UserScore
also only reports the total results for the entire day and doesn't provide any finer-grained information about how the data accumulated during the day.
Starting with the next pipeline example, we'll discuss how you can use Dataflow's features to address these limitations.
HourlyTeamScore: Advanced Processing in Batch with Windowing
The HourlyTeamScore
pipeline expands on the basic batch analysis principles used in
the UserScore
pipeline and improves upon some of its limitations.
HourlyTeamScore
performs finer-grained analysis, both by using additional
features in the Dataflow SDKs, and taking into account more aspects of the game data. For example,
HourlyTeamScore
can filter out data that isn't part of the relevant analysis
period.
Like UserScore
, HourlyTeamScore
is best thought of as a job to be run
periodically after all the relevant data has been gathered (such as once per day). The pipeline
reads a fixed data set from a file, and writes the results to a BigQuery table, just like
UserScore
What Does HourlyTeamScore Do?
HourlyTeamScore
calculates the total score per team, per hour, in a fixed data set
(such as one day's worth of data).
- Rather than operating on the entire data set at once,
HourlyTeamScore
divides the input data into logical windows and performs calculations on those windows. This allowsHourlyUserScore
to provide information on scoring data per window, where each window represents the game score progress at fixed intervals in time (like once every hour). HourlyTeamScore
filters data events based on whether their event time (as indicated by the embedded timestamp) falls within the relevant analysis period. Basically, the pipeline checks each game event's timestamp and ensures that it falls within the range we want to analyze (in this case the day in question). Data events from previous days are discarded and not included in the score totals. This makesHourlyTeamScore
more robust and less prone to erroneous result data thanUserScore
. It also allows the pipeline to account for late-arriving data that has a timestamp within the relevant analysis period.
Below, we'll look at each of these enhancements in HourlyTeamScore
in detail:
Fixed-Time Windowing
Using fixed-time windowing lets the pipeline provide better information on how events accumulated in the data set over the course of the analysis period. In our case, it tells us when in the day each team was active and how much the team scored at those times.
The following diagram shows how the pipeline processes a day's worth of a single team's scoring data after applying fixed-time windowing:
Notice that as processing time advances, the sums are now per window; each window represents an hour of event time during the day in which the scores occurred.
Dataflow's windowing feature uses the intrinsic
timestamp information attached to each element of a PCollection
. Because we want
our pipeline to window based on event time, we must first extract the timestamp
that's embedded in each data record apply it to the corresponding element in the
PCollection
of score data. Then, the pipeline can apply the windowing function
to divide the PCollection
into logical windows.
Here's the code, which shows how HourlyTeamScore
uses the
WithTimestamps
and Window
transforms to perform these operations:
Notice the transforms that the pipeline uses to specify the windowing are distinct from the
actual data processing transforms (such as ExtractAndSumScores
). This functionality
provides you some flexibility in designing your Dataflow pipeline, in that you can run existing
transforms over datasets with different windowing characteristics.
Filtering Based On Event Time
HourlyTeamScore
uses filtering to remove any events from our dataset whose
timestamps don't fall within the relevant analysis period (i.e. they weren't generated during the
day that we're interested in). This keeps the pipeline from erroneously including any data that
was, for example, generated offline during the previous day but sent to the game server during
the current day.
It also lets the pipeline include relevant late data—data events with valid timestamps, but that arrived after our analysis period ended. If our pipeline cutoff time is 12:00 am, for example, we might run the pipeline at 2:00 am, but filter out any events whose timestamps indicate that they occurred after the 12:00 am cutoff. Data events that were delayed and arrived between 12:01 am and 2:00 am, but whose timestamps indicate that they occurred before the 12:00 am cutoff, would be included in the pipeline processing.
HourlyTeamScore
uses the Filter
transform to perform this operation. When you apply Filter
, you specify a predicate
to which each data record is compared. Data records that pass the comparison are included, while
events that fail the comparison are excluded. In our case, the predicate is the cut-off time we
specify, and we compare just one part of the data—the timestamp field.
The following code shows how HourlyTeamScore
uses the Filter
transform
to filter events that occur either before or after the relevant analysis period:
Calculating Score Per Team, Per Window
HourlyTeamScore
uses the same ExtractAndSumScores
transform as the
UserScore
pipeline but passes a different key (team, as opposed to user). Also,
because the pipeline applies ExtractAndSumScores
after applying fixed-time
1-hour windowing to the input data, the data gets grouped by both team and window. You
can see the full sequence of transforms in HourlyTeamScore
's main method:
Limitations
As written, HourlyTeamScore
still has a limitation:
HourlyTeamScore
still has high latency between when data events occur (the event time) and when results are generated (the processing time), because, as a batch pipeline, it needs to wait to begin processing until all data events are present.
LeaderBoard: Streaming Processing with Real-Time Game Data
One way we can help address the latency issue present in the UserScore
and
HourlyTeamScore
pipelines is by reading the score data from a streaming source. The
LeaderBoard
pipeline introduces streaming processing by reading the game score data
from Google Cloud Pub/Sub, rather than from a file on the game server.
The LeaderBoard
pipeline also demonstrates how to process game score data with
respect to both processing time and event time. LeaderBoard
outputs
data about both individual user scores and about team scores, each with respect to a different
time frame.
Because the LeaderBoard
pipeline reads the game data from a streaming source as that
data is generated, you can think of the pipeline as an ongoing job running concurrently with the
game process. LeaderBoard
can thus provide low-latency insights into how users are
playing the game at any given moment—useful if, for example, we want to provide a live
web-based scoreboard so that users can track their progress against other users as they play.
What Does LeaderBoard Do?
The LeaderBoard
pipeline reads game data published to Pub/Sub in near real-time, and
uses that data to perform two separate processing tasks:
LeaderBoard
calculates the total score for every unique user and publishes speculative results for every ten minutes of processing time. That is, every ten minutes, the pipeline outputs the total score per user that the pipeline has processed to date. This calculation provides a running "leader board" in close to real time, regardless of when the actual game events were generated.LeaderBoard
calculates the team scores for each hour that the pipeline runs. This is useful if we want to, for example, reward the top-scoring team for each hour of play. The team score calculation uses fixed-time windowing to divide the input data into hour-long finite windows based on the event time (indicated by the timestamp) as data arrives in the pipeline.
In addition, the team score calculation uses Dataflow's trigger mechanisms to provide speculative results for each hour (which update every five minutes until the hour is up), and to also capture any late data and add it to the specific hour-long window to which it belongs.
Below, we'll look at both of these tasks in detail.
Calculating User Score based on Processing Time
We want our pipeline to output a running total score for each user for every ten minutes that the pipeline runs. This calculation doesn't consider when the actual score was generated by the user's play instance; it simply outputs the sum of all the scores for that user that have arrived in the pipeline to date. Late data gets included in the calculation whenever it happens to arrive in the pipeline as it's running.
Because we want all the data that has arrived in the pipeline every time we update our calculation, we have the pipeline consider all of the user score data in a single global window. The single global window is unbounded, but we can specify a kind of temporary cut-off point for each ten-minute calculation by using a processing time trigger.
When we specify a ten-minute processing time trigger for the single global window, the effect is that the pipeline effectively takes a "snapshot" of the contents of the window every time the trigger fires (which it does at ten-minute intervals). Since we're using a single global window, each snapshot contains all the data collected to that point in time. The following diagram shows the effects of using a processing time trigger on the single global window:
As processing time advances and more scores are processed, the trigger outputs the updated sum for each user.
The following code example shows how LeaderBoard
sets the processing time trigger to
output the data for user scores:
Note that LeaderBoard
uses an
accumulating trigger for the user score
calculation (by invoking .accumulatingFiredPanes
when setting the trigger). Using an
accumulating trigger causes the pipeline to accumulate the previously emitted data together with
any new data that's arrived since the last trigger fire. This ensures that
LeaderBoard
a running sum for the user scores, rather than a collection of
individual sums.
Calculating Team Score based on Event Time
We want our pipeline to also output the total score for each team during each hour of play. Unlike the user score calculation, for team scores, we care about when in event time each score actually occurred, because we want to consider each hour of play individually. We also want to provide speculative updates as each individual hour progresses, and to allow any instances of late data—data that arrives after a given hour's data is considered complete—to be included in our calculation.
Because we consider each hour individually, we can apply fixed-time windowing to our input data,
just like in HourlyTeamScore
. To provide the speculative updates and updates on late
data, we'll specify additional trigger parameters. The trigger will cause each window to
calculate and emit results at an interval we specify (in this case, every five minutes), and also
to keep triggering after the window is considered "complete" to account for late data. Just like
the user score calculation, we'll set the trigger to accumulating mode to ensure that we get a
running sum for each hour-long window.
The triggers for speculative updates and late data help with the problem of time skew. Events in the pipeline aren't necessarily processed in the order in which they actually occurred according to their timestamps; they may arrive in the pipeline out of order, or late (in our case, because they were generated while the user's phone was out of contact with a network). Dataflow needs a way to determine when it can reasonably assume that it has "all" of the data in a given window: this is called the watermark.
In an ideal world, all data would be processed immediately when it occurs, so the processing time would be equal to (or at least have a linear relationship to) the event time. However, because distributed systems contain some inherent inaccuracy (like our late-reporting phones), Dataflow often uses a heuristic watermark.
The following diagram shows the relationship between ongoing processing time and each score's event time for two teams:
The dotted line in the diagram is the "ideal" watermark: Dataflow's notion of when all data in a given window can reasonably considered to have arrived. The irregular solid line represents the actual watermark, as determined by the data source (in our case, Pub/Sub).
Data arriving above the solid watermark line is late data—this is a score event that was delayed (perhaps generated offline) and arrived after the window to which it belongs had closed. Our pipeline's late-firing trigger ensures that this late data is still included in the sum.
The following code example shows how LeaderBoard
applies fixed-time windowing with
the appropriate triggers to have our pipeline perform the calculations we want:
Taken together, these processing strategies let us address the latency and completeness issues
present in the UserScore
and HourlyTeamScore
pipelines, while still
using the same basic transforms to process the data—as a matter of fact, both calculations
still use the same ExtractAndSumScore
transform that we used in both the
UserScore
and HourlyTeamScore
pipelines.
GameStats: Abuse Detection and Usage Analysis
While LeaderBoard
demonstrates how to use basic windowing and triggers to perform
low-latency and flexible data analysis, we can use more advanced windowing techniques to perform
more comprehensive analysis. This might include some calculations designed to detect system abuse
(like spam) or to gain insight into user behavior. The GameStats
pipeline builds on
the low-latency functionality in LeaderBoard
to demonstrate how you can use Dataflow
to perform this kind of advanced analysis.
Like LeaderBoard
, GameStats
reads data from a streaming source, in this
case Pub/Sub. It is best thought of as an ongoing job that provides insight into the game as users
play.
What Does GameStats Do?
Like LeaderBoard
, GameStats
calculates the total score per team, per
hour. However, the pipeline also performs two kinds of more complex analysis:
GameStats
does abuse detection system that performs some simple statistical analysis on the score data to determine which users, if any, might be spammers or bots. It then uses the list of suspected spam/bot users to filter the bots out of the hourly team score calculation.GameStats
analyzes usage patterns by grouping together game data that share similar event times using session windowing. This lets us gain some intelligence on how long users tend to play, and how game length changes over time.
Below, we'll look at these features in more detail.
Abuse Detection
Let's suppose scoring in our game depends on the speed at which a user can "click" on their
phone. GameStats
's abuse detection analyzes each user's score data to detect if a
user has an abnormally high "click rate" and thus an abnormally high score. This might indicate
that the game is being played by a bot that operates significantly faster than a human could
play.
To determine whether or not a score is "abnormally" high, GameStats
calculates the
average of every score in that fixed-time window, and then checks each score individual score
against the average score multiplied by an arbitrary weight factor (in our case, 2.5). Thus, any
score more than 2.5 times the average is deemed to be the product of spam. The
GameStats
pipeline tracks a list of "spam" users and filters those users out of the
team score calculations for the team leader board.
Since the average depends on the pipeline data, we need to calculate it, and then use that
calculated data in a subsequent ParDo
transform that filters scores that exceed the
weighted value. To do this, we can pass the calculated average to as a
side input to the filtering
ParDo
.
The following code example shows the composite transform that handles abuse detection. The
transform uses the Sum.integersPerKey
transform to sum all scores per user, and then
the Mean.globally
transform to determine the average score for all users. Once
that's been calculated (as a PCollectionView
singleton), we can pass it to the
filtering ParDo
using .withSideInputs
:
The abuse-detection transform generates a view of users suspected to be spambots. Later in the pipeline, we use that view to filter out any such users when we calculate the team score per hour, again by using the side input mechanism. The following code example shows where we insert the spam filter, between windowing the scores into fixed windows and extracting the team scores:
Analyzing Usage Patterns
We can gain some insight on when users are playing our game, and for how long, by examining the
event times for each game score and grouping scores with similar event times into sessions.
GameStats
uses Dataflow's built-in
session windowing function to group user scores
into sessions based on the time they occurred.
When you set session windowing, you specify a minimum gap duration between events. All events whose arrival times are closer together than the minimum gap duration are grouped into the same window. Events where the difference in arrival time is greater than the gap are grouped into separate windows. Depending on how we set our minimum gap duration, we can safely assume that scores in the same session window are part of the same (relatively) uninterrupted stretch of play. Scores in a different window indicate that the user stopped playing the game for at least the minimum gap time before returning to it later.
The following diagram shows how data might look when grouped into session windows. Unlike fixed windows, session windows are different for each user and is dependent on each individual user's play pattern:
We can use the session-windowed data to determine the average length of uninterrupted play time for all of our users, as well as the total score they achieve during each session. We can do this in the code by first applying session windows, summing the score per user and session, and then using a transform to calculate the length of each individual session:
This gives us a set of user sessions, each with an attached duration. We can then calculate the average session length by re-windowing the data into fixed time windows, and then calculating the average for all sessions that end in each hour:
We can use the resulting information to find, for example, what times of day our users are playing the longest, or which stretches of the day are more likely to see shorter play sessions.
Further Information
You can find out more about how these examples work by running the example pipelines, which are
included in the Dataflow
SDK Examples (master-1.x
branch) on GitHub. See the README.md
in the examples directory for full instructions on running the included example pipelines.
The following blog posts and videos provide further context and information on Dataflow's unified data model for batch and streaming processing:
- Dataflow & Spark blog post — a blog post authored by Google engineers that uses examples in the mobile gaming domain to illustrate how the Dataflow model compares to that of Apache Spark.
- Dataflow talk @Scale — a video presentation taken from the @Scale conference, by Google engineer Frances Perry, that explains the Dataflow model using an earlier version of the mobile gaming examples described above.
- "The World Beyond Batch: Streaming 101 and Streaming 102" — a two-part series of posts on the O'Reilly blog, by Google engineer Tyler Akidau, that examines the future of Big Data processing.