Google Cloud Platform

Life of a Cloud Dataflow service-based shuffle

Users of Google Cloud Dataflow, Google Cloud Platform's (GCP) fully-managed service for processing data in streaming or batch modes with equal reliability and expressiveness, recently got access to a new service-based Shuffle implementation (currently in beta) in the Cloud Dataflow SDK for Java version 2.0. In this post, I’ll explain and demonstrate the practical impact of the new shuffle on data pipelines using the Opinion Analysis project as an example. Along the way I’ll use the latest and greatest features of Apache Beam 2.0 (upon which Cloud Dataflow SDK 2.0 is based), including an improved Google BigQuery connector.

Background about the project

The Opinion Analysis project ingests data from sources such as databases, social media, text files, news feeds, etc. and looks for sentiment and opinions in text using the Plutchik sentiment analysis framework. We recently added a new data source to that project that is a perfect use case for the new Cloud Dataflow Shuffle.

This new data source has two sets of tables in BigQuery (Posts and Comments) partitioned by publication month. You can think of the Posts records as the beginning of a long discussion thread, and the Comments records as the links in this long chain. Comments are connected to the Posts records via the “link_id” field and both the Posts and the Comments tables contain free text fields, which are prime material for opinion extraction.

Because the project uses some heavy-duty text processing libraries written in Java, the text analysis will not run directly in BigQuery. Instead, Cloud Dataflow is a natural choice because it allows you to run text-processing code on the textual fields and makes the join transforms available for linking two datasets together.

Running the pipeline

In the example code, the IndexerPipeline now accepts the Posts and Comments tables via BigqueryIO. If you use CoGroupByKey transform to join the records using the “id” field in Posts and the “link_id” in Comments, the CoGroupByKey transform internally uses Shuffle to join records in two PCollections that have the same key.

To see how the new Shuffle performs against the old Shuffle implementation, we did several runs with the worker-based Shuffle and service-based Shuffle, using Cloud Dataflow’s autoscaling feature, setting maxNumWorkers to 70 and using the n1-standard-2 worker type. (By the way, to enable the service-based Shuffle all you need to do is add the --experiments=shuffle_mode=service parameter to your pipeline. You can also remove parameters for diskSizeGb and workerDiskType, if you used them to optimize Shuffle performance in the past, because large local disks are no longer required with the service-based Shuffle. This will save you significant costs, as you will see later.)

The resulting pipeline looks like this:

life-of-dataflow-shuffle-3kixv.PNG

Posts and Comments are read by the BigQueryIO connector, joined in the CoGroupByKey, indexed in the ParDo(IndexDocument), and then written back into BigQuery into three tables: webresource, document, and sentiment.

Here are the results, averaged over two runs each for both worker- and service-based Shuffle jobs:

Worker-based ShuffleService-based ShuffleUnitChangeWorker-based Shuffle CostService- based Shuffle Cost Change
Elapsed time3.12.5hours-19%
Total vCPU time400296vCPU hr-26%$22.4-$5.8
Total memory time1,5001,111GB hr-26%$5.3-$1.4
Total PD time50,0094,073GB hr-92%$2.7-$2.5
Total service shuffle usage0101GB hrn/an/a$2.2
Total$30.4-$7.5
-25%

The service-based Shuffle ran 19% faster than the worker-based Shuffle, and consumed 26% less CPU and Memory and 92% less Persistent Disk. The charges for Shuffle usage in the service-based Shuffle amounted to $2.20, and overall the costs of the pipeline with service-based Shuffle were $22.90 vs $30.40 (a 25% reduction).

The most interesting thing here was that while Shuffle wasn’t the most resource-intensive part of the pipeline, we still were able to achieve significant savings in cost and time. Based on the step times in the pipeline DAG (see screenshot), the pipeline “spent” nine worker-days in the text indexing step ParDo(IndexDocument) and just 3 hrs 50 mins of worker time in the CoGroupByKey step, where the Shuffle happens.

It soon became clear why using the service-based Shuffle led to ~20-25% reductions in duration and cost: When we looked at the Stackdriver logs, we saw several hundred warnings of above-average processing time. We added this warning during a previous investigation into the sluggishness of our IndexerPipeline. Overall we counted 285 warnings of a text document taking more than 10,000 milliseconds to process. Keep in mind that the pipeline indexed about 90 million posts and comments in just 2.5 hours using 70 workers and consuming 295 vCPU hours; if you divide the CPU time by the number of text documents that got indexed, the average duration of a text index operation was just 12 milliseconds of a single vCPU. The 285 text documents that took an abnormally long time to process constituted a smaller than 0.001% incidence rate. Still, these documents were the culprit, especially because many of the Stackdriver log warnings had 15-30 minute processing times.

life-of-dataflow-shuffle-1m1q6.PNG

Both the service-based and worker-based Shuffle pipelines contained these warnings (it’s a property of my text processing code, not the pipeline), but it became very clear that with the worker-based Shuffle these “hot-key” elements played a larger role at the end of the processing of my input dataset.

With the legacy worker-based Shuffle, both user code processing and the data shuffle processing is happening on the worker node. Each worker holds a portion of shuffled data on the attached Persistent Disk, and while it is holding this data, the worker cannot be shut down even if the worker is not performing any computation. When our user code was running these long, 15-minute calculations, all workers holding the shuffled data cache were waiting for the completion of the calculations on a small fraction of other workers.

Hot Key warnings in worker-based Shuffle pipeline

life-of-dataflow-shuffle-29mvc.PNG

Hot Key warnings in service-based Shuffle pipeline

life-of-dataflow-shuffle-48283.PNG

This behavior applies not just to text processing. For example, you might have a transform that runs heavy statistical of financial computations on input parameters. Through user-induced bugs (or just because of some natural properties of the transform function), a small set of your keys/input parameters can result in very long processing times, consuming all or almost all of your worker CPU. When shuffling (which is a compute+storage intensive operation) is combined with pure compute, the resulting system is not able to scale up or down as well as when shuffle and compute are separated (but connected through a fast network, as is the case with Cloud Dataflow’s service-based Shuffle). The service-based Shuffle leads to shorter pipeline execution times and fewer worker resources consumed by your jobs.

Putting it all together

Why does this all matter, you might ask? Going back to the project’s original goal of analyzing discussion threads and looking for opinions, imagine you are a budding entrepreneur in the FinTech industry who wants to understand your potential customers’ wants and desires. You could go on a major discussion site such as Reddit or HackerNews, find personal finance-related groups and scan discussion threads, looking for expressions of excitement and interest (as representations of what your prospects want), or disappointment and anger (as negative examples of people don’t want). In a week you could review perhaps 10-20,000 posts and write up a report on customer needs. But what if you could use an automated opinion extraction tool that tells you what people are talking about, and what kind of feelings and opinions they have about them? The Opinion Analysis project, based on Cloud Dataflow, is exactly such a tool.

After getting the posts and comments indexed and deposited into our analytical BigQuery dataset, we ran a quick query to figure out what users were talking about on several personal finance-related groups.

  SELECT 
  t.Tag, COUNT(*) AS cnt
FROM opinions.webresource wr 
  INNER JOIN opinions.sentiment s ON 
    s.MainWebResourceHash = wr.WebResourceHash, UNNEST(s.Tags) AS t
WHERE 
  t.GoodAsTopic = true AND
  REGEXP_EXTRACT(wr.Url, "/r/([^\\s/]+)") IN (
    'personalfinance',
    'financialindependence',
    'PersonalFinanceCanada',
    'UKPersonalFinance',
    'FinancialPlanning'
  )
GROUP BY 1
HAVING COUNT(*) > 10
ORDER BY 2 DESC

It turns out that the top-20 topics mentioned in these comments included credit cards, interest rates, emergency funds, and, yes, “more money”.

Top 20 topics in Personal Finance-related groups in the month of March 2017

Tagcnt
credit card5338
interest rate3736
emergency fund3285
credit score3002
student loan2884
saving account1552
more money1515
Roth1464
credit report1334
Vanguard1237
IRS1204
bank account1190
most people1138
IRA1121
mutual fund1012
index fund922
IRAs869
insurance company851
tax return839
credit union813

Knowing what people are talking about is great, but can we step this up a bit and see some verbatim expressions of emotions specifically related to the top topic, “credit cards”?

What are people curious about or interested in? What are they anticipating? Here is the query for this:

  SELECT 
  s.AnnotatedText, s.SentimentTotalScore 
FROM opinions.webresource wr 
  INNER JOIN opinions.sentiment s ON s.MainWebResourceHash = wr.WebResourceHash, UNNEST(s.Tags) AS t
WHERE 
  t.Tag = 'credit card' AND
  (s.StInterest + s.StAnticipation) > 0 AND
  REGEXP_EXTRACT(wr.Url, "/r/([^\\s/]+)") IN (
    'personalfinance',
    'financialindependence',
    'PersonalFinanceCanada',
    'UKPersonalFinance',
    'FinancialPlanning'
  )
ORDER BY s.SentimentTotalScore DESC

BiqQuery returns many interesting results in the AnnotatedText field (populated by the Opinion Analysis library inside the Cloud Dataflow IndexerPipeline job):

  [i]<<I>> [i]<<want>> [i]<<to>> [i]<<get>> [i]<<a>> [e]{{secured [e]{{credit card}} at TD but am [f]<<worried>> I wo[m]<<n't>> get [m]<<approved>>.
tl;dr: [i]<<Looking>> [i]<<for>> someone who can [g]<<help>> [g]<<me>> address [e]{{student loans}} from ITT Tech, a [e,n]<{bankruptcy>> filing}} that was never completed, and [e]{{credit cards}} that were put into [e]{{collections}}.
[i]<<Looking>> [i]<<for>> the [p]<<best>> [e]{{credit card}} that would [p]<<suit>> me, I seen a card with [e]{{24.99 %}} [e]{{APR}}, [e]{{39$ annual fee}}, and1.5 % cash back. And seen another [e]{{one same thing}} no annual fee but also no cash back.
[t]<<Curious>> to hear what everyone does/thinks about the [p]<<best>> way to [p]<<utilize>> [e]{{credit cards}} for couples.
Now that I have paid off my [e]{{credit card}} my [e]{{utilisation}} is fine. I'm [t]<<more>> [t]<<curious>> whether I'd get a [p]<<better>> [e,i]<{interest>> rate}} if I reapplied in a [e]{{few months}} over the [e]{{6.9 %}} I've been offered.

Participants on the discussion board we analyzed were looking for info on secured credit cards for which they could actually get approved, dealing with credit cards in collections, best terms for credit cards, and credit cards for couples. That’s a good start for our hypothetical startup person (or a star developer at a leading bank) who wants to understand customer needs (and perhaps connect with these users and learn more). By the way, the brackets in front of some of the words are labels for either entities (e.g. “[e]{{credit card}}”) or emotions (e.g. [i]<> [i]<> where “i” stands for the “Interest” emotion). This is something that the Opinion Analysis project produces. You can also get the plain text of the opinion (in the sentiment.Text field) or HTML version of that text (sentiment.AnnotatedHtml field). What about the “fear” emotion? What are consumers afraid of when it comes to credit cards? There is a query for that too:

  SELECT 
  s.AnnotatedText, s.SentimentTotalScore 
FROM opinions.webresource wr 
  INNER JOIN opinions.sentiment s ON s.MainWebResourceHash = wr.WebResourceHash, UNNEST(s.Tags) AS t
WHERE 
  t.Tag = 'credit card' AND
  (s.StFear) > 0 AND
  REGEXP_EXTRACT(wr.Url, "/r/([^\\s/]+)") IN (
    'personalfinance',
    'financialindependence',
    'PersonalFinanceCanada',
    'UKPersonalFinance',
    'FinancialPlanning'
  )
ORDER BY s.SentimentTotalScore DESC

And here is what the query returns:

  I haven't had [e]{{overpayments}}, I've been overcharged. So I'm [f]<<worried>> that my 3 or [e]{{4 [n]<<complaints}> of [e]{{unauthorized charges}} to my [e]{{credit card}} would then become a [m]<<problem>> if someone creates a [n]<<fraudulent>> [e]{{credit card}} and buys stuff.
I'm [f]<<scared>> my [e]{{information}} has been [n]<<stolen>>, but my [e]{{bank accounts}} seem fine. What do I do about this [e]{{credit card}}? Please Help[g]<<!>>
Personally, I tried to avoid [e]{{credit cards}} for a while (I was [f]<<nervous>> I'd fall into the [e]{{same [n]<<bad>> habits}} as my mom) but I [j]<<love>> this card.
Some people [j]<<(like>> my dad) are [f]<<paranoid>> that using [e]{{credit cards}} creates a [g]<<paper>> [g]<<trail>> and then [e]{{"they"}} will have all this [e]{{information}} about you and your [e]{{buying habits}}.

Some interesting concerns with credit cards include the consequences of unauthorized charge reporting, implications of personal information theft, fear of falling into the same bad habits as one’s parents, and fear of a paper trail created by credit cards. This is all good information, and a gold mine for someone who is trying to build a new financial product that delivers on customer needs and addresses possible concerns.

And the “hot keys” in the IndexerPipeline code? After some debugging we found out that in one particular section of the Natural Language Processing code we employed (one responsible for determining base forms of words) certain characters present in these “hot key” data elements would trigger excessive tokenization of text phrases. The resulting tokens were then applied against a MaxEnt model to determine the best base form for a word.

Normally, there would be one or at most two tokens to evaluate. But the presence of non-alphanumeric and other Unicode characters would result in dozens of such tokens. The base-form determination algorithm had, unfortunately, an exponential dependency on the number of tokens, hence the excessive CPU consumption and processing times. While we were able to add additional checks to prevent this from happening again, we were glad that the new service-based Cloud Dataflow Shuffle was more tolerant to such user code transgressions.

Next steps

We hope you agree that this example has demonstrated the practical impact of Cloud Dataflow’s service-based Shuffle on data pipelines. Get more details about how to access the public beta here.