Google Cloud Platform

How to do data processing and analytics from Google App Engine with Google Cloud Dataflow

Learn how to launch Google Cloud Dataflow pipelines from Google App Engine to support MapReduce jobs and other data processing and analytics tasks.

Until recently, if you wanted to run MapReduce jobs from a Python app in Google App Engine, you would use this MR library.

Recently, Apache Beam and Google Cloud Dataflow entered the picture. Beam is a unified model for building data processing pipelines that handle streaming (as well as batch) data, as well as a collection of SDKs for building these pipelines. Cloud Dataflow is a fully-managed Google Cloud Platform (GCP) service for executing parallelized stream/batch data processing pipelines written using Beam.

Cloud Dataflow supports a range of data processing patterns, including ETL, batch computation and continuous computation. The Beam model supports and subsumes MapReduce. So, you can map your MR jobs to equivalent Beam pipelines, and Beam's programming model makes it straightforward to extend and modify your existing MR logic.

The Beam Python SDK makes it easy to launch Cloud Dataflow pipeline jobs from a Python app on App Engine. The SDK includes a Cloud Datastore source and sink. This makes it easy to write Dataflow pipelines that support the functionality of any existing MR jobs, as well as support additional analytics.

In this blog post, we'll look at an example app that shows how to periodically launch a Python Dataflow pipeline from App Engine, to analyze data stored in Google Cloud Datastore (GCP’s highly-scalable NoSQL database service) — in this case, stored tweets from Twitter. The pipeline does several kinds of analysis on the data; for example, it identifies "interesting" word co-occurrences (bigrams) in the tweets, as in this snippet below.

running-dataflow-app-engine-1bjox.PNG

The example is an App Engine app with two services (previously, "modules"):

The Standard service — the one that gathers the tweets — is just for example purposes; in your own apps, you probably already have other means of collecting and storing data in Cloud Datastore.

Building a service to define and launch a pipeline from App Engine

We'll use a Flex custom runtime based on the gcr.io/google_appengine/python image for the service that launches the data processing pipeline, as we'll install the gcloud SDK in the instance container(s). So, the example includes a Dockerfile used to deploy the service. As the last command in the Dockerfile, we'll start up a Gunicorn server to serve a Flask app script (main_df.py).

The Python code for this service consists of the small Flask app script (main_df.py), which accesses a module (dfpipe) that does most of the heavy lifting in terms of defining and launching the example pipeline (in dfpipe/pipe.py).

Setting the pipeline options

As part of the process of launching a Cloud Dataflow pipeline, various options may be set. To make the dfpipe module available to the Cloud Dataflow workers, the pipeline options include a setup_file flag.

  pipeline_options = {
      'project': PROJECT,
      'staging_location': 'gs://' + BUCKET + '/staging',
      'runner': 'DataflowRunner',
      'setup_file': './setup.py',
      'job_name': PROJECT + '-twcount',
      'max_num_workers': 10,
      'temp_location': 'gs://' + BUCKET + '/temp'
  }

This points to a setup.py file that specifies to package the dfpipe module using setuptools. If our pipeline also had dependencies on third-party libs, we could include those in setup.py as well. The indicated code is gathered in a package that's built as a source distribution, staged in the staging area for the workflow being run, and then installed in the workers when they start running.

A look at the Cloud Dataflow pipeline

Next, let's take a quick look at dfpipe/pipe.py, to see what the Cloud Dataflow pipeline does: it reads recent tweets from the past N days from Cloud Datastore, then essentially splits into three processing branches. It finds the top-N most popular words in terms of the percentage of tweets they were found in, calculates the top-N most popular URLs in terms of their count, and then derives relevant word co-occurrences (bigrams) using an approximation to a _tf*idf_ ranking metric. It writes the results to three Google BigQuery tables.

running-dataflow

Using Cloud Datastore as a pipeline source

This pipeline reads from Cloud Datastore, grabbing the tweets that the other App Engine Standard service is periodically grabbing and writing to the store.

In main.py, the app script for the service, you can see the Tweet entity schema:

  from google.appengine.ext import ndb
class Tweet(ndb.Model):
  user = ndb.StringProperty()
  text = ndb.StringProperty()
  created_at = ndb.DateTimeProperty()
  tid = ndb.IntegerProperty()
  urls = ndb.StringProperty(repeated=True)

In dfpipe/pipe.py, we can use the google.cloud.proto.datastore API to define a query for Tweet entities more recent than a given date — in this case, four days ago — by creating a property filter on the created_at field.

  from google.cloud.proto.datastore.v1 import query_pb2
def make_query(kind):
  """Creates a Cloud Datastore query to retrieve all Tweet entities with a
  'created_at' date > N days ago.
  """
  days = 4
  now = datetime.datetime.now()
  earlier = now - datetime.timedelta(days=days)
  query = query_pb2.Query()
  query.kind.add().name = kind
  datastore_helper.set_property_filter(query.filter, 'created_at',
                                       PropertyFilter.GREATER_THAN,
                                       earlier)
  return query

Then, we use that query to define an input source for the pipeline:

  p = beam.Pipeline(options=pipeline_options)
  # Create a query to read entities from datastore.
  query = make_query('Tweet')
  # Read entities from Cloud Datastore into a PCollection.
  lines = (p
      | 'read from datastore' >> ReadFromDatastore(project, query, None))
  ...

We can use properties.get() on an element of the resulting collection to extract the value of a given field of the entity, in this case the "text" field:

  class WordExtractingDoFn(beam.DoFn):
  """Parse each tweet text into words, removing some 'stopwords'."""
  def process(self, element):
    content_value = element.properties.get('text', None)
    text_line = ''
    if content_value:
      text_line = content_value.string_value
    words = set([x.lower() for x in re.findall(r'[A-Za-z\']+', text_line)])
    stopwords = [...]
    return list(words - set(stopwords))

Then, this snippet from the pipeline shows how WordExtractingDoFn can be used as part of the Cloud Datastore input processing:

  # Count the occurrences of each word.
  percents = (lines
      | 'split' >> (beam.ParDo(WordExtractingDoFn())
                    .with_output_types(unicode))
      | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
      | 'group' >> beam.GroupByKey()
      | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))

Launching the pipeline periodically using a cron job

In the example app, we want to launch a pipeline job every few hours, where each job analyzes the tweets from the past few days, providing a "moving window" of analysis. So, it makes sense to just set things up as an app cron job, which looks like this (backend is the name of the app service that handles this request, and the URL is the handler that launches the job):

  cron:
- description: launch dataflow pipeline
  url: /launchpipeline
  schedule: every 5 hours
  target: backend

A pipeline job could of course be triggered by other means as well — e.g., as part of handling a client request to the app, or perhaps via a Task Queue task.

Example results in BigQuery

Once our example app is up and running, it periodically runs a Cloud Dataflow job that writes the results of its analysis to BigQuery. (It would be just as easy to write results to the Cloud Datastore if that makes more sense for your workflow — or to write to multiple sources).

With BigQuery, it's easy to run some fun queries on the data. For example, we can find recent word co-occurrences that are "interesting" by our metric:

running-dataflow-app-engine-4to1k.PNG

Or look for emerging word pairs that have become "interesting" in the last day or so (as of early April 2017):

running-dataflow-app-engine-29367.PNG

We can contrast the "interesting" word pairs with the words that are simply the most popular within a given period (note that most of these words are common, but not particularly newsworthy):

running-dataflow-app-engine-6ufid.PNG

Or, find the most often-tweeted URLs from the past few days (some URLs are truncated in the output):

running-dataflow-app-engine-3wmkx.PNG

Next steps

In this post, you’ve learned how to programmatically launch Cloud Dataflow pipelines — that read from Cloud Datastore — directly from your App Engine app. See the example app's README for more detail on how to configure and run the app yourself.

Cloud Dataflow's expressive programming model makes it easy to build and support a range of scalable processing and analytic tasks. We hope you find the example app useful as a starting point toward defining new pipelines and running your own analytics from your App Engine apps. We look forward to hearing more about what you build!