Real-Time Data Analysis with Kubernetes, Cloud Pub/Sub, and BigQuery

In this tutorial, you'll perform real-time data analysis of Twitter data using a pipeline built on Google Compute Engine, Kubernetes, Google Cloud Pub/Sub, and BigQuery. This type of analysis has a number of useful applications, including:

  • Performing sentiment analysis
  • Identifying general patterns and trends in data
  • Monitoring the performance and reach of campaigns
  • Crafting responses in real time

The following diagram describes the architecture of the example application.

The architecture of the example application

The example application uses Cloud Pub/Sub to buffer incoming tweets. A replicated pod, defined by using a Deployment reads Twitter's public sample stream using the Twitter streaming API, publishing incoming tweets to a Cloud Pub/Sub topic. Two additional replicated pods subscribe to the topic. When new tweet data is published to the topic, these pods add the data to BigQuery using the BigQuery API.


  • Download the source code.
  • Configure and start a Kubernetes cluster.
  • Create a Twitter application.
  • Create a Cloud Pub/Sub topic.
  • Start up the Kubernetes app.
  • Use BigQuery to query for Twitter data.


This tutorial uses several billable components of Google Cloud Platform, including:

  • 5 Compute Engine n1-standard-1 virtual machine instances.
  • 5 Compute Engine 10GB persistent disks.
  • BigQuery.

The cost of running this tutorial varies depending on run time. Use the pricing calculator to generate a cost estimate based on your projected usage.

New Cloud Platform users might be eligible for a free trial.

Before you begin

Create and configure a new project

To work through this example, you must have a project with the required APIs enabled. In the Google Cloud Platform Console, create a new project, and then enable the following APIs:

  • Google Compute Engine API
  • Google Pub/Sub API

You will be prompted to enable billing if you have not previously done so.

This tutorial also uses the following APIs that were enabled by default when you created your new project:

  • BigQuery
  • Google Cloud Storage JSON API

Set up the Google Cloud SDK

  1. Authenticate using your Google account:

    gcloud auth login
  2. Set the default project for the Cloud SDK to the project you selected in the previous section of this tutorial. Replace PROJECT_ID with your ID:

    gcloud config set project [PROJECT_ID]
  3. Update the components:

    gcloud components update
  4. Install the kubectl binary:

    gcloud components update kubectl
  5. Add the directory to kubectl to your path:

    export PATH=$PATH:/usr/local/share/google/google-cloud-sdk/bin/

Download the sample code

You can get the sample code either of two ways:

  • Download the zip archive. Unzip the code into a directory named kube-pubsub-bq.

  • Clone the Github repository. In a terminal window, run the following command:

    git clone kube-pubsub-bq

Download Kubernetes

Kubernetes is an open source orchestration system for Docker containers. It schedules containers onto the instances in your Compute Engine cluster, manages workloads to ensure that their state matches your declared intentions, and organizes containers by label and by task type for easy management and discovery.

  1. Download the latest Kubernetes binary release.

  2. Unpack the file into the same parent directory from where you installed the example code. Note that the tar file will unpack into a directory named kubernetes, so you don't need to create a new directory. For example, enter:

    tar -zxvf kubernetes.tar.gz

Creating your BigQuery table

Create a BigQuery table to store your tweets. BigQuery groups tables into abstraction layers called datasets. Use the bq command-line tool, which is included with the Cloud SDK, to create a new BigQuery dataset named rtda.

  1. In a terminal window, enter the following command:

    bq mk rtda
  2. Now that you have a dataset in place, create a new table named tweets to contain the incoming tweets. Each BigQuery table must be defined by a schema. To save you time, this example provides a predefined schema, schema.json, that you can use to define your table. To create the table by using the predefined schema, enter the following command:

    bq mk -t rtda.tweets kube-pubsub-bq/bigquery-setup/schema.json
  3. To verify that your new dataset and table have been created, use the BigQuery web UI. You should see the dataset name in the left sidebar. If it isn't there, make sure you're looking at the correct project. When you click the arrow next to the dataset name, you should see your new table. Alternatively, you can view all datasets in a project or all tables in a given dataset by running the following commands:

    bq ls [PROJECT_ID]:
    bq ls [DATASET_ID]

    In this case, the dataset ID is rtda.

  4. Finally, edit kube-pubsub-bq/pubsub/bigquery-controller.yaml. Update the value for the following fields to reflect your BigQuery configuration:

    • PROJECT_ID: Your project ID.
    • BQ_DATASET: The BigQuery dataset containing your table (rtda).
    • BQ_TABLE: The BigQuery table you just created (tweets).
apiVersion: extensions/v1beta1
kind: Deployment
  name: bigquery-controller
    name: bigquery-controller
  replicas: 2
        name: bigquery-controller
      - name: bigquery
        - name: PROCESSINGSCRIPT
          value: pubsub-to-bigquery
        # Change this to your pubsub topic
        - name: PUBSUB_TOPIC
          value: projects/your-project/topics/your-topic
        # Change this to your project ID.
        - name: PROJECT_ID
          value: xxxx
        # Change the following two settings to your dataset and table.
        - name: BQ_DATASET
          value: xxxx
        - name: BQ_TABLE
          value: xxxx

Configuring and starting a Kubernetes cluster

Kubernetes is portable—it uses a set of shell scripts for starting up, shutting down, and managing configuration—so no installation is necessary.

  1. Edit kubernetes/cluster/gce/

  2. Add the bigquery and scopes to the file's NODE_SCOPES definition.

This setting allows your nodes to write to your BigQuery table. Save the configuration and close your file.

Start up your cluster

Now you are ready to start up your cluster.

  1. Enter the following command:


    Starting the cluster can take some time. During the startup process, you might be prompted to create a new SSH key for Compute Engine or, if you've already done so previously, to enter your SSH key passphrase.

  2. After your cluster has started, enter the following commands to see your running instances:

    kubectl get nodes
    kubectl cluster-info

You should see one Kubernetes master instance and four Kubernetes nodes.

Creating a Twitter application

To receive tweets from Twitter, you need to create a Twitter application and add its key/secret and token/secret values to the specification for your Twitter-to-PubSub Kubernetes pod. In all, you will copy four values. Follow these steps:

  1. Create a new Twitter application.
  2. In the Twitter Application Management page, navigate to the Keys and Access Tokens tab.
  3. Click the Create my access token button to create a new access token.
  4. Edit kube-pubsub-bq/pubsub/twitter-stream.yaml.
  5. Replace the following values with your consumer key and consumer secret:
  6. Replace the following values with your access token and access token secret:

The example application reads from a random sample of Twitter's public stream by default. To filter on a set of keywords instead:

  1. Edit kube-pubsub-bq/twitter-stream.yaml and change the value of TWSTREAMMODE to filter.
  2. Edit and replace the keywords defined in the track variable with the keywords you want to filter on.
  3. Rebuild the container image. For instructions about how to build the container image, see the appendix.

Creating a Cloud Pub/Sub topic

Next, create a Cloud Pub/Sub topic to which your Kubernetes pods can publish and subscribe.

  1. Visit the Try It! section of the relevant Cloud Pub/Sub API page.

  2. Enable the Authorize requests using OAuth 2.0 switch and then click Authorize.

  3. Enter the following path in the name field. Replace [PROJECT_ID] with your project ID:

  4. Click Execute to create your new topic. You can verify that the topic has been created by checking response code in the Response section. If the response code is 200 OK, your topic was created successfully.

  5. Edit the following two files:


  6. Update the PUBSUB_TOPIC values in the files with your new topic. Look for this line:

    value: projects/your-project/topics/your-topic

    Replace your-project with your project ID. Replace your-topic with new_tweets.

Understanding the Docker image for your Kubernetes pods

This example uses two different Kubernetes pod templates to construct the analysis pipeline. The specification files point to a pre-built Docker image; you don't need to do anything special to use it. If you'd like to build the image yourself, follow the steps in the appendix.

The Docker image contains two main scripts that perform the work for this solution.

The code in streams incoming tweets from Twitter to Cloud Pub/Sub.

#!/usr/bin/env python
# Copyright 2015 Google Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

"""This script uses the Twitter Streaming API, via the tweepy library,
to pull in tweets and publish them to a PubSub topic.

import base64
import datetime
import os
from tweepy import OAuthHandler
from tweepy import Stream
from tweepy.streaming import StreamListener

import utils

# Get your twitter credentials from the environment variables.
# These are set in the 'twitter-stream.json' manifest file.
consumer_key = os.environ['CONSUMERKEY']
consumer_secret = os.environ['CONSUMERSECRET']
access_token = os.environ['ACCESSTOKEN']
access_token_secret = os.environ['ACCESSTOKENSEC']


def publish(client, pubsub_topic, data_lines):
    """Publish to the given pubsub topic."""
    messages = []
    for line in data_lines:
        pub = base64.urlsafe_b64encode(line)
        messages.append({'data': pub})
    body = {'messages': messages}
    resp = client.projects().topics().publish(
            topic=pubsub_topic, body=body).execute(num_retries=NUM_RETRIES)
    return resp

class StdOutListener(StreamListener):
    """A listener handles tweets that are received from the stream.
    This listener dumps the tweets into a PubSub topic

    count = 0
    twstring = ''
    tweets = []
    batch_size = 50
    total_tweets = 10000000
    client = utils.create_pubsub_client(utils.get_credentials())

    def write_to_pubsub(self, tw):
        publish(self.client, PUBSUB_TOPIC, tw)

    def on_data(self, data):
        """What to do when tweet data is received."""
        if len(self.tweets) >= self.batch_size:
            self.tweets = []
        self.count += 1
        # if we've grabbed more than total_tweets tweets, exit the script.
        # If this script is being run in the context of a kubernetes
        # replicationController, the pod will be restarted fresh when
        # that happens.
        if self.count > self.total_tweets:
            return False
        if (self.count % 1000) == 0:
            print 'count is: %s at %s' % (self.count,
        return True

    def on_error(self, status):
        print status

if __name__ == '__main__':
    print '....'
    listener = StdOutListener()
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)

    print 'stream mode is: %s' % os.environ['TWSTREAMMODE']

    stream = Stream(auth, listener)
    # set up the streaming depending upon whether our mode is 'sample', which
    # will sample the twitter public stream. If not 'sample', instead track
    # the given set of keywords.
    # This environment var is set in the 'twitter-stream.yaml' file.
    if os.environ['TWSTREAMMODE'] == 'sample':
                track=['bigdata', 'kubernetes', 'bigquery', 'docker', 'google',
                       'googlecloud', 'golang', 'dataflow',
                       'containers', 'appengine', 'gcp', 'compute',
                       'scalability', 'gigaom', 'news', 'tech', 'apple',
                       'amazon', 'cluster', 'distributed', 'computing',
                       'cloud', 'android', 'mobile', 'ios', 'iphone',
                       'python', 'recode', 'techcrunch', 'timoreilly']

The code in streams cached data from Cloud Pub/Sub to BigQuery.

#!/usr/bin/env python
# Copyright 2015 Google Inc. All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""This script grabs tweets from a PubSub topic, and stores them in BiqQuery
using the BigQuery Streaming API.

import base64
import datetime
import json
import os
import time

import utils

# Get the project ID and pubsub topic from the environment variables set in
# the 'bigquery-controller.yaml' manifest.
PROJECT_ID = os.environ['PROJECT_ID']

def fqrn(resource_type, project, resource):
    """Returns a fully qualified resource name for Cloud Pub/Sub."""
    return "projects/{}/{}/{}".format(project, resource_type, resource)

def create_subscription(client, project_name, sub_name):
    """Creates a new subscription to a given topic."""
    print "using pubsub topic: %s" % PUBSUB_TOPIC
    name = get_full_subscription_name(project_name, sub_name)
    body = {'topic': PUBSUB_TOPIC}
    subscription = client.projects().subscriptions().create(
            name=name, body=body).execute(num_retries=NUM_RETRIES)
    print 'Subscription {} was created.'.format(subscription['name'])

def get_full_subscription_name(project, subscription):
    """Returns a fully qualified subscription name."""
    return fqrn('subscriptions', project, subscription)

def pull_messages(client, project_name, sub_name):
    """Pulls messages from a given subscription."""
    BATCH_SIZE = 50
    tweets = []
    subscription = get_full_subscription_name(project_name, sub_name)
    body = {
            'returnImmediately': False,
            'maxMessages': BATCH_SIZE
        resp = client.projects().subscriptions().pull(
                subscription=subscription, body=body).execute(
    except Exception as e:
        print "Exception: %s" % e
    receivedMessages = resp.get('receivedMessages')
    if receivedMessages is not None:
        ack_ids = []
        for receivedMessage in receivedMessages:
                message = receivedMessage.get('message')
                if message:
        ack_body = {'ackIds': ack_ids}
                subscription=subscription, body=ack_body).execute(
    return tweets

def write_to_bq(pubsub, sub_name, bigquery):
    """Write the data to BigQuery in small chunks."""
    tweets = []
    CHUNK = 50  # The size of the BigQuery insertion batch.
    # If no data on the subscription, the time to sleep in seconds
    # before checking again.
    WAIT = 2
    tweet = None
    mtweet = None
    count = 0
    count_max = 50000
    while count < count_max:
        while len(tweets) < CHUNK:
            twmessages = pull_messages(pubsub, PROJECT_ID, sub_name)
            if twmessages:
                for res in twmessages:
                        tweet = json.loads(res)
                    except Exception, bqe:
                        print bqe
                    # First do some massaging of the raw data
                    mtweet = utils.cleanup(tweet)
                    # We only want to write tweets to BigQuery; we'll skip
                    # 'delete' and 'limit' information.
                    if 'delete' in mtweet:
                    if 'limit' in mtweet:
                # pause before checking again
                print 'sleeping...'
        response = utils.bq_data_insert(bigquery, PROJECT_ID, os.environ['BQ_DATASET'],
                             os.environ['BQ_TABLE'], tweets)
        tweets = []
        count += 1
        if count % 25 == 0:
            print ("processing count: %s of %s at %s: %s" %
                   (count, count_max,, response))

if __name__ == '__main__':
    topic_info = PUBSUB_TOPIC.split('/')
    topic_name = topic_info[-1]
    sub_name = "tweets-%s" % topic_name
    print "starting write to BigQuery...."
    credentials = utils.get_credentials()
    bigquery = utils.create_bigquery_client(credentials)
    pubsub = utils.create_pubsub_client(credentials)
        # TODO: check if subscription exists first
        subscription = create_subscription(pubsub, PROJECT_ID, sub_name)
    except Exception, e:
        print e
    write_to_bq(pubsub, sub_name, bigquery)
    print 'exited write loop'

Starting up your Kubernetes app

After optionally building and pushing your Docker image, you can begin starting up your Kubernetes app.

In Kubernetes, pods {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} – rather than individual application containers – are the smallest deployable units that can be created, scheduled, and managed.

A replica set {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} ensures that a specified number of pod replicas are running at any one time. If there are too many, it will shut down some. If there are too few, it will start more. As opposed to just creating singleton pods or even creating pods in bulk, a replica set replaces pods that are deleted or terminated for any reason, such as in the case of node failure.

A Deployment {: target='k8s' track-type='tutorial' track-name='externalLink' track-metadata-position='body'} provides declarative updates for pods and replica sets. You only need to describe the desired state in a Deployment object, and the Deployment controller will change the actual state to the desired state at a controlled rate for you.

You will use Deployments to launch your Kubernetes app.

Start the Cloud-Pub/Sub-to-BigQuery Deployment

Begin by starting the Deployment for the pods that will subscribe to your Cloud Pub/Sub topic and stream tweets to your BigQuery table as they become available.

Before you start the Deployment, you might find it useful to take a closer look at kube-pubsub-bq/pubsub/bigquery-controller.yaml. This file defines the Deployment, and specifies a replica set with two pod replicas. The characteristics of these replicated pods are defined in the file by using a pod template.

  1. Run the following command to start up the replicated pods:

    kubectl create -f kube-pubsub-bq/pubsub/bigquery-controller.yaml
  2. To verify that the pods are running, run the following:

    kubectl get pods

    It can take about 30 seconds for your new pods to move from ContainerCreating to Running. In the list, you should see two BigQuery-to-Cloud-Pub/Sub pods labeled bigquery-controller.

  3. You can run:

    kubectl get deployments

to see the system's defined deployments, and how many replicas each is specified to have.

Start the Twitter-to-Cloud-Pub/Sub Deployment

After your Cloud-Pub/Sub-to-BigQuery pipeline pods are up and running, start the Deployment for the pod that that pulls in tweets and publishes them to your Cloud Pub/Sub topic. Like kube-pubsub-bq/pubsub/bigquery-controller.yaml, the Twitter-to-Cloud-Pub/Sub specification file, kube-pubsub-bq/pubsub/twitter-stream.yaml, defines a Deployment. However, this time the Deployment only asks for one replicated pod, as Twitter only allows one streaming API connection at a time.

  1. To start up the Deployment:

    kubectl create -f kube-pubsub-bq/pubsub/twitter-stream.yaml
  2. Verify that all of your pods are now running by running the following command. Again, it can take about 30 seconds for your new pod to move from ContainerCreating to Running.

    kubectl get pods

In addition to the pods you saw in the previous step, you should see a new pod labeled twitter-stream. Congratulations! Your analysis pipeline is now up and running.

  1. You can again run:

    kubectl get deployments

to see the system's defined deployments, and how many replicas each is specified to have. You should now see something like this:

  bigquery-controller    2         2         2            2           2m
  twitter-stream         1         1         1            1           2m

Querying your BigQuery table

Open the BigQuery web UI and click Compose Query to begin writing a new query. You can verify that it is working by running a simple query, as follows:


Let your pipeline collect tweets for a while – a few hours should do, but the longer you let it run, the richer your data set will be. After you have some more data in your BigQuery table, you can try running some interesting sample queries.

This example query demonstrates how to find the most retweeted tweets in your table, filtering on a specific term (in this case, "android"):

  MAX(retweeted_status.retweet_count) AS max_retweets,
  text CONTAINS 'android'
  max_retweets DESC

You might also find it interesting to filter your collected tweets by a set of terms. The following query filters by the words "Kubernetes," "BigQuery," "Cloud Pub/Sub," or "Twitter":

  text CONTAINS 'kubernetes'
  OR text CONTAINS 'BigQuery'
  OR text CONTAINS 'cloud pub/sub'
  OR text CONTAINS 'twitter'
  created_at DESC

The following query looks for a correlation between the number of favorites and the number of retweets in your set of tweets:

  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
COUNT(*) c
FROM [rtda.tweets]
HAVING c > 2000000

Going even deeper, you could also investigate whether the speakers of a specific language prefer favoriting to retweeting, or vice versa:

  CORR(retweeted_status.retweet_count, retweeted_status.favorite_count),
COUNT(*) c,
AVG(retweeted_status.retweet_count) avg_rt,
AVG(retweeted_status.favorite_count) avg_fv,
AVG(retweeted_status.retweet_count)/AVG(retweeted_status.favorite_count) ratio_rt_fv
FROM [rtda.tweets]
WHERE retweeted_status.retweet_count > 1 AND retweeted_status.favorite_count > 1
HAVING c > 1000000

Cleaning up

Labels make it easy to select the resources you want to stop or delete. For example:

kubectl delete deployment -l "name in (twitter-stream, bigquery-controller)"

If you'd like to shut down your cluster instances altogether, run the following command:


This deletes all of the instances in your cluster.

Appendix: Building and pushing the Docker image

Your Kubernetes pods require a Docker image that includes the app scripts and their supporting libraries. This image is used to start up the Twitter-to-Cloud- Pub/Sub and Cloud-Pub/Sub-to-BigQuery pods that are part of your analysis pipeline. The Dockerfile located in the kube-pubsub-bq/pubsub/pubsub-pipe-image directory specifies this image as follows:

FROM google/python

RUN pip install --upgrade pip
RUN pip install tweepy
RUN pip install --upgrade google-api-python-client
RUN pip install python-dateutil


CMD python

This Dockerfile first instructs the Docker daemon to install some required Python libraries to the new image:

  • tweepy, a Python wrapper for the Twitter API.
  • The Google API Python libraries.
  • python-dateutil, an extension to Python's standard datetime module.

Then, four scripts in pubsub-pipe-image are added:

  • Provides a common execution point for the other two Python scripts in the pipeline image.
  • Contains some helper functions.
  • Streams cached data from Cloud Pub/Sub to BigQuery.
  • Streams incoming tweets from Twitter to Cloud Pub/Sub.

  • To build the pipeline image using the associated Dockerfile, run the following command. Replace [PROJECT_ID] with your project ID:

    sudo docker build -t[PROJECT_ID]/pubsub_pipeline kube-pubsub-bq/pubsub/pubsub-pipe-image
  • After you build your image, push it to the Google Container Registry so that Kubernetes can access it. Replace [PROJECT_ID] with your project ID:

    sudo gcloud docker -- push[PROJECT_ID]/pubsub_pipeline

Update the Kubernetes specification files to use your custom image

Next, update the image values in the specification files to use your new image. You need to update the following two files:

  1. Edit each file and look for this line:
  2. Replace google-samples with your project ID. Replace pubsub-bq-pipe:v3 with pubsub_pipeline.

  3. Save and close the files.

What's next

  • Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Send feedback about...