Using Cloud Pub/Sub with Python

Many applications need to do background processing outside of the context of a web request. In this sample, the Bookshelf app sends tasks to a separate background worker for execution. The worker gathers information from the Google Books API and updates the book information in the database. This sample demonstrates how to set up separate services in Google App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multi-page tutorial. To start from the beginning and read the setup instructions, go to Python Bookshelf app.

Configuring settings

This section uses code in the 6-pubsub directory. Edit the files and run commands in this directory.

  1. Open config.py for editing.
  2. Set the value of PROJECT_ID to your project ID, which is visible in the GCP Console.
  3. Set DATA_BACKEND to the same value you used during the Using Structured Data tutorial.
  4. If you are using Cloud SQL or MongoDB, set the values under the Cloud SQL or Mongo section to the same values you used during the Using Structured Data step.
  5. Set the value CLOUD_STORAGE_BUCKET to your Cloud Storage bucket name.

  6. Under the OAuth2 configuration section, set the values of GOOGLE_OAUTH2_CLIENT_ID and GOOGLE_OAUTH2_CLIENT_SECRET to the application client ID and secret that you created previously.

  7. Save and close config.py.

If you are using Cloud SQL:

  1. Open app.yaml for editing.
  2. Set the value of cloud_sql_instances to the same value used for CLOUDSQL_CONNECTION_NAME in config.py . It should be in the format project:region:cloudsql-instance. Uncomment this entire line.
  3. Save and close app.yaml.

Installing dependencies

Enter these commands to create a virtual environment and install dependencies:

Linux/macOS

virtualenv -p python3 env
source env/bin/activate
pip install -r requirements.txt

Windows

virtualenv -p python3 env
env\scripts\activate
pip install -r requirements.txt

Running the app on your local machine:

  1. Run the worker:

    psqworker main.books_queue
    
  2. Open another terminal, also in the 6-pubsub directory, start a local web server:

    source env/bin/activate
    python main.py
    
  3. In your web browser, enter this address:

    http://localhost:8080

Now add some well-known books to the bookshelf. If you have both the application and worker instance running locally, you can watch the worker update the book information in the background.

It's important that you run the worker at least once before you add books. The worker will establish a Cloud Pub/Sub subscription to listen for events. Without the subscription, events published to the topic will simply be lost and you will not see any change to the bookshelf data. Once the subscription exists, the events will be queued even if there is no worker currently listening for events. When a worker comes online, Cloud Pub/Sub will deliver any queued events.

Press Control+C to exit the worker and then the local web server.

Deploying the app to the App Engine flexible environment

  1. Deploy the worker:

    gcloud app deploy worker.yaml
    
  2. Deploy the sample app:

    gcloud app deploy
    
  3. In your web browser, enter this address. Replace [YOUR_PROJECT_ID] with your project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you can deploy the updated version by entering the same command you used to deploy the app the first time. The new deployment creates a new version of your app and promotes it to the default version. The older versions of your app remain, as do their associated VM instances. Be aware that all of these app versions and VM instances are billable resources.

You can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click Delete delete at the top of the page to delete the app version.

For complete information about cleaning up billable resources, see the Cleaning up section in the final step of this tutorial.

Application structure

The following diagram shows the application components and how they connect to each other.

Cloud Pub/Sub sample structure

The application publishes events to Cloud Pub/Sub whenever a book is updated in the database. This publishing is done by enqueueing tasks into a particular queue. The worker, running separately, listens for those events. When an event is received, the worker makes a request to the Google Books API for information about the book and updates the book's record in the database. After the record is updated, you can refresh the book's info page and see the new information.

Understanding the code

Here we walk you through how to create queue, add tasks to the queue, and use the worker to process tasks. Finally, we demonstrate how to run the worker process in the App Engine flexible environment.

This sample uses psq, a Python package that implements a simple distributed task queue using Google Cloud Pub/Sub. psq is intentionally similar to the rq Python package.

Creating a queue

Tasks are added to a particular queue. The worker watches this queue for tasks to execute. To create a queue with psq:

publisher_client = pubsub.PublisherClient()
subscriber_client = pubsub.SubscriberClient()


def get_books_queue():
    project = current_app.config['PROJECT_ID']

    # Create a queue specifically for processing books and pass in the
    # Flask application context. This ensures that tasks will have access
    # to any extensions / configuration specified to the app, such as
    # models.
    return psq.Queue(
        publisher_client, subscriber_client, project,
        'books', extra_context=current_app.app_context)

If you are using celery or rq, the configuration is similar.

Queueing tasks

A task is just a Python function that is executed by the worker instead of the front-end application. The bookshelf application enqueues the task to process a book whenever a book is created or updated:

q = tasks.get_books_queue()
q.enqueue(tasks.process_book, book['id'])

Other task queues, such as rq and celery, have similar queueing mechanisms. We discuss the implementation of process_book in the Processing books section later in this topic.

The worker

The worker is a separate application that listens to pub/sub events. This splits the application into two independent services that communicate by using pub/sub, instead of directly with each other. Separating the services allows you to configure and scale the number of front-end and worker instances separately.

Running the worker

The psq package includes the standalone worker psqworker. This worker is a Python process that runs in the background to perform lengthy or blocking tasks that you don't want to perform in the main process of execution. This is similar to rq's rqworker.

psqworker main.books_queue

Starting psqworker like this automatically creates a subscription to the queue pointed to by main.books_queue. When an event is published to pub/sub by being enqueued as a task, psqworker automatically dequeues and executes the task.

Because psqworker needs to be able to import the queue, you must make the queue available as a module-level import.

# Make the queue available at the top-level, this allows you to run
# `psqworker main.books_queue`. We have to use the app's context because
# it contains all the configuration for plugins.
# If you were using another task queue, such as celery or rq, you can use this
# section to configure your queues to work with Flask.
with app.app_context():
    books_queue = bookshelf.tasks.get_books_queue()

Processing books

To process a book, the task fetches the book by its ID, finds additional information, and then saves the updated information back to the database:

def process_book(book_id):
    """
    Handles an individual Bookshelf message by looking it up in the
    model, querying the Google Books API, and updating the book in the model
    with the info found in the Books API.
    """

    model = get_model()

    book = model.read(book_id)

    if not book:
        logging.warn("Could not find book with id {}".format(book_id))
        return

    if 'title' not in book:
        logging.warn("Can't process book id {} without a title."
                     .format(book_id))
        return

    logging.info("Looking up book with title {}".format(book[
                                                        'title']))

    new_book_data = query_books_api(book['title'])

    if not new_book_data:
        return

    book['title'] = new_book_data.get('title')
    book['author'] = ', '.join(new_book_data.get('authors', []))
    book['publishedDate'] = new_book_data.get('publishedDate')
    book['description'] = new_book_data.get('description')

    # If the new book data has thumbnail images and there isn't currently a
    # thumbnail for the book, then copy the image to cloud storage and update
    # the book data.
    if not book.get('imageUrl') and 'imageLinks' in new_book_data:
        new_img_src = new_book_data['imageLinks']['smallThumbnail']
        book['imageUrl'] = download_and_upload_image(
            new_img_src,
            "{}.jpg".format(book['title']))

    model.update(book, book_id)

Notice that the task only needs the book's ID. It's better to provide the ID of the book instead of the book instance, as the data could be stale by the time the worker processes the task.

Additionally, note that the worker is able to call bookshelf.get_model() despite not running inside of a Flask application. When the queue is configured in get_books_queue, the application context is made available to all tasks.

The query_books_api function handles calling the Google Books API to fetch more information about the book.

def query_books_api(title):
    """
    Queries the Google Books API to find detailed information about the book
    with the given title.
    """
    r = requests.get('https://www.googleapis.com/books/v1/volumes', params={
        'q': title
    })

    try:
        data = r.json()['items'][0]['volumeInfo']
        return data

    except KeyError:
        logging.info("No book found for title {}".format(title))
        return None

    except ValueError:
        logging.info("Unexpected response from books API: {}".format(r))
        return None


def download_and_upload_image(src, dst_filename):
    """
    Downloads an image file and then uploads it to Google Cloud Storage,
    essentially re-hosting the image in GCS. Returns the public URL of the
    image in GCS
    """
    r = requests.get(src)

    if not r.status_code == 200:
        return

    return storage.upload_file(
        r.content,
        dst_filename,
        r.headers.get('content-type', 'image/jpeg'))

Running on Google Cloud Platform

The worker is deployed as a separate service within the same application. App Engine applications can have multiple, independent services. This means that you can easily and independently deploy, configure, scale, and update pieces of your application. The front end is deployed to the default service, and the worker is deployed to the worker service.

Even though the worker does not serve any web requests to users, or even run a web application, we strongly recommend that you provide an HTTP health check when running in the flexible environment to ensure that the service is running and responsive. It is, however, possible to disable health checking.

To provide a health check, the worker starts two processes instead of one. The first process is psqworker and the other process is a simple Flask application that ensures the psqworker process is running and responsive to health checks.

The application uses Honcho, a Python port of the Foreman process manager, to manage multiple processes. The processes are configured with a procfile:

bookshelf: gunicorn -b 0.0.0.0:$PORT main:app
worker: psqworker --pid /tmp/psq.pid main.books_queue
monitor: python monitor.py /tmp/psq.pid

The entrypoint in app.yaml now uses honcho instead of running processes directly:

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the default service, only the
# frontend process is needed.
entrypoint: honcho start -f /app/procfile bookshelf

Notice that the procfile contains an entry for the front-end bookshelf application as well. Because the default (front-end) and worker services share the exact same codebase, the arguments to honcho in the entrypoint control which processes are started. The following diagram contrasts the single service deployment on the left with the multi-service deployment on the right:

Cloud Pub/Sub deployment

You could create a service that runs both the bookshelf and worker processes but you would lose the ability to manage and scale them separately.

The worker is a separate service, so it needs its own separate and complete YAML configuration file.

service: worker
runtime: python
env: flex

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the worker service, both the queue worker
# and the monitor process are needed.
entrypoint: honcho start -f /app/procfile worker monitor

runtime_config:
  python_version: 3

beta_settings:
    # If using Cloud SQL, uncomment and set this value to the Cloud SQL
    # connection name, e.g.
    #   "project:region:cloudsql-instance"
    # You must also update the values in config.py.
    #
    # cloud_sql_instances: "your-cloudsql-connection-name"

This configuration is similar to the app.yaml file that is used for the front end; the key differences are service: worker and the arguments passed to honcho in entrypoint.

The monitor application

The monitor application watches the psqworker process and provides health checks. This monitor application can be easily re-used to run just about any standalone server in the flexible environment, such as an rqworker, a celery worker, or even a redis slave.

import os
import sys

from flask import Flask


# The app checks this file for the PID of the process to monitor.
PID_FILE = None


# Create app to handle health checks and monitor the queue worker. This will
# run alongside the worker, see procfile.
monitor_app = Flask(__name__)


# The health check reads the PID file created by psqworker and checks the proc
# filesystem to see if the worker is running. This same pattern can be used for
# rq and celery.
@monitor_app.route('/_ah/health')
def health():
    if not os.path.exists(PID_FILE):
        return 'Worker pid not found', 503

    with open(PID_FILE, 'r') as pidfile:
        pid = pidfile.read()

    if not os.path.exists('/proc/{}'.format(pid)):
        return 'Worker not running', 503

    return 'healthy', 200


@monitor_app.route('/')
def index():
    return health()


if __name__ == '__main__':
    PID_FILE = sys.argv[1]
    monitor_app.run('0.0.0.0', 8080)

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click Delete delete at the top of the page to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the GCP Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click Delete delete at the top of the page to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the GCP Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox next to the bucket you want to delete.
  3. Click Delete delete at the top of the page to delete the bucket.

What's next

Learn how to run the Python Bookshelf sample on Compute Engine.

Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Was this page helpful? Let us know how we did:

Send feedback about...