Jump to Content
Data Analytics

Building advanced Beam pipelines in Scala with SCIO

November 2, 2022
https://storage.googleapis.com/gweb-cloudblog-publish/images/da_2022.max-2500x2500.jpg
Prathap Kumar Parvathareddy

Staff Data Engineer

Apache Beam is an open source, unified programming model with a set of language-specific SDKs for defining and executing data processing workflows. Scio, pronounced shee-o, is Scala API for Beam developed by Spotify to build both Batch and Streaming pipelines.

https://storage.googleapis.com/gweb-cloudblog-publish/images/1_Framework.max-1600x1600.jpg

In this blog we will uncover the need for SCIO and a few reference patterns.

Why Scio

SCIO provides high level abstraction for developers and is preferred for following reasons:

  • Striking balance between concise and performance. Pipeline written in Scala are concise compared to java with similar performance

  • Easier migration for Scalding/Spark developers due to similar semantics compared to Beam API thereby avoiding a steep learning curve for developers.

  • Enables access to a large ecosystem of infrastructure libraries in Java e.g. Hadoop, Avro, Parquet and high level numerical processing libraries in Scala like Algebird and Breeze.

  • Supports Interactive exploration of data and code snippets using SCIO REPL

Reference Patterns

 Let us checkout few concepts along with examples: 

1. Graph Composition

If you have a complex pipeline consisting of several transforms, the feasible approach is to compose the logically related transforms into blocks.  This would make it easy to manage and debug the graph rendered on dataflow UI. Let us consider an example using popular WordCount pipeline.

https://storage.googleapis.com/gweb-cloudblog-publish/images/2_WordCount_NoComposition.max-1800x1800.jpg
https://storage.googleapis.com/gweb-cloudblog-publish/images/2a_WordCount_NoComposition.max-400x400.jpg

 Fig:  Word Count Pipeline without Graph Composition

 Let us modify the code to group the related transforms into blocks:

https://storage.googleapis.com/gweb-cloudblog-publish/images/3_WordCount_Composition.max-600x600.jpg
https://storage.googleapis.com/gweb-cloudblog-publish/images/3a_WordCount_Composition.max-1000x1000.jpg

Fig:  Word Count Pipeline with Graph Composition

2. Distributed Cache

Distributed Cache allows to load the data from a given URI on workers and use the corresponding data across all tasks (DoFn’s) executing on the worker. Some of the common use cases are loading serialized machine learning model from object stores like Google Cloud Storage for running predictions,  lookup data references etc.

Let us checkout an example that loads lookup data from CSV file on worker during initialization and utilizes to count the number of matching lookups for each input element.

https://storage.googleapis.com/gweb-cloudblog-publish/images/4_DistributedCache.max-2200x2200.jpg

Fig:  Example demonstrating Distribute Cache

3. Scio Joins

Joins in Beam are expressed using CoGroupByKey  while Scio allows to express various join types like inner, left outer and full outer joins through flattening the CoGbkResult

Hash joins (syntactic sugar over a beam side input) can be used, if one of the dataset is extremely small (max ~1GB) by representing a smaller dataset on the right hand side. Side inputs are small, in-memory data structures that are replicated to all workers and avoids shuffling. 

MultiJoin can be used to join up to 22 data sets. It is recommended that all data sets be ordered in descending size, because non-shuffle joins do require the largest data sets to be on the left of any chain of operators 

Sparse Joins can be used for cases where the left collection (LHS) is much larger than the right collection (RHS) that cannot fit in memory but contains a sparse intersection of keys matching with the left collection .  Sparse Joins are implemented by constructing a Bloom filter of keys from the right collection and split the left side collection into 2 partitions. Only the partition with keys in the filter go through the join and the rest are either concatenated (i.e Outer join) or discarded (Inner join). Sparse Join is especially useful for joining historical aggregates with incremental updates.

Skewed Joins are a more appropriate choice for cases where the left collection (LHS) is much larger and contains hotkeys.  Skewed join uses Count Mink Sketch which is a probabilistic data structure to count the frequency of keys in the LHS collection.  LHS is partitioned into Hot and chill partitions.  While the Hot partition is joined with corresponding keys on RHS using a Hash join, chill partition uses a regular join and finally both the partitions are combined through union operation.

https://storage.googleapis.com/gweb-cloudblog-publish/images/5_Joins.max-2200x2200.jpg

Fig:  Example demonstrating Scio Joins

Note that while using Beam Java SDK you can also take advantage of some of the similar join abstractions using Join Library extension

4. AlgeBird Aggregators and SemiGroup

Algebird is Twitter’s abstract algebra library containing several reusable modules for parallel aggregation and approximation. Algebird Aggregator or Semigroup can be used with aggregate and sum transforms on SCollection[T] or aggregateByKey and sumByKey transforms on SCollection[(K, V)].  Below example illustrates computing parallel aggregation on customer orders and composition of result into OrderMetrics class

https://storage.googleapis.com/gweb-cloudblog-publish/images/6_AlgebirdAggregators.max-2200x2200.jpg

Fig:  Example demonstrating Algebird Aggregators

 Below code snippet expands on previous example and demonstrates the SemiGroup for aggregation of objects by combining fields.

https://storage.googleapis.com/gweb-cloudblog-publish/images/7_AlgebirdSemiGroup.max-2200x2200.jpg

Fig:  Example demonstrating Algebird SemiGroup

5. GroupMap and GroupMapReduce

GroupMap can be used as a replacement of groupBy(key) + mapValues(_.map(func)) or _.map(e  => kv.of(keyfunc, valuefunc)) + groupBy(key)

Let us consider the below example that calculates the length of words for each type. Instead of grouping by each type and applying length function, the GroupMap allows combining these operations by applying keyfunc and valuefunc.

https://storage.googleapis.com/gweb-cloudblog-publish/images/8_GroupMap.max-2200x2200.jpg

 Fig:  Example demonstrating GroupMap

GroupMapReduce  can be used to derive the key and apply the associative operation on the values associated with each key. The associative function is performed locally on each mapper similarly to a "combiner" in MapReduce (aka combiner lifting) before sending the results to the reducer.  This is equivalent to keyBy(keyfunc) + reduceByKey(reducefunc) 

Let us consider the below example that calculates the cumulative sum of odd and even numbers in a given range.  In this case individual values are combined on each worker and the local results are aggregated to calculate the final result

https://storage.googleapis.com/gweb-cloudblog-publish/images/9_GroupMapReduce.max-1300x1300.jpg

Fig:  Example demonstrating GroupMapReduce

Conclusion

Thanks for reading and I hope now you are motivated to learn more about SCIO.  Beyond the patterns covered above, SCIO contains several interesting features like implicit coders for Scala case classes,  Chaining jobs using I/O Taps , Distinct Count using HyperLogLog++ , Writing sorted output to files etc.  Several use case specific libraries like BigDiffy (comparison of large datasets) , FeaTran (used for ML Feature Engineering) were also built on top of SCIO. 

For Beam lovers with Scala background, SCIO is the perfect recipe for building complex distributed data pipelines.

Posted in