Using Cloud Pub/Sub with Python

Many apps need to do background processing outside of the context of a web request. This part of the Python Bookshelf app tutorial shows how the sample app sends tasks to a separate background worker. The worker gathers information from the Google Books API and updates the book information in the database. This sample shows how to set up separate services in App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multipage tutorial. To start from the beginning and read the setup instructions, go to Python Bookshelf app.

Configuring settings

This section uses code in the 6-pubsub directory. Edit the files and run commands in this directory.

  1. Open the config.py file for editing and replace the following values:
    • Set the value of [PROJECT_ID] to your project ID, which is visible in the GCP Console.
    • Set the value of [DATA_BACKEND] to the same value you used during the Using structured data tutorial.
    • If you are using Cloud SQL or MongoDB, set the values under the Cloud SQL or Mongo section to the same values you used during the Using structured data step.
    • Set the value of [CLOUD_STORAGE_BUCKET] to your Cloud Storage bucket name.
    • Under the OAuth2 configuration section, set the values of [GOOGLE_OAUTH2_CLIENT_ID] and [GOOGLE_OAUTH2_CLIENT_SECRET] to the application client ID and secret that you created previously.

  2. Save and close the config.py file.

If you are using Cloud SQL:

  1. Open the app.yaml file for editing.
  2. Set the value of cloudsql-instance to the same value used for [CLOUDSQL_CONNECTION_NAME] in the config.py file. Use the format project:region:cloudsql-instance. Uncomment this entire line.
  3. Save and close the app.yaml file.

Installing dependencies

To create a virtual environment and install dependencies, use the following commands:

Linux/macOS

virtualenv -p python3 env
source env/bin/activate
pip install -r requirements.txt

Windows

virtualenv -p python3 env
env\scripts\activate
pip install -r requirements.txt

Running the app on your local machine

  1. Run the worker:

    psqworker main.books_queue
    
  2. Open another terminal that is also in the 6-pubsub directory, and then start a local web server:

    python main.py
    
  3. In your browser, enter the following address:

    http://localhost:8080
    
Add some books to the bookshelf. If you have both the app and worker instances running locally, you can watch the worker update the book information in the background.

It's important that you run the worker at least once before you add books. The worker establishes a Cloud Pub/Sub subscription to listen for events. Without the subscription, events published to the topic are lost, and you won't see changes to the bookshelf data. After the subscription exists, the events are queued, even if there is no worker currently listening for events. When a worker comes online, Cloud Pub/Sub delivers any queued events.

Press Control+C to exit the worker and then the local web server.

Deploying the app to the App Engine flexible environment

  1. Deploy the worker:

    gcloud app deploy worker.yaml
    
  2. Deploy the sample app:

    gcloud app deploy
    
  3. In your browser, enter the following address. Replace [YOUR_PROJECT_ID] with your GCP project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you deploy the updated version by entering the same command that you used to deploy the app. The deployment creates a new version of your app and promotes it to the default version. The earlier versions of your app remain, as do their associated virtual machine (VM) instances. All of these app versions and VM instances are billable resources. To reduce costs, delete the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the Versions page for App Engine.

    Go to the Versions page

  2. Select the checkbox for the non-default app version you want to delete.
  3. Click Delete to delete the app version.

For more information about cleaning up billable resources, see the Cleaning up section in the final step of this tutorial.

App structure

The Bookshelf app uses two independent services that communicate by using Cloud Pub/Sub. Using two services lets you configure and scale the number of frontend and worker instances separately. The following diagram shows the app components and how they connect to each other.

Cloud Pub/Sub sample structure

  1. The app publishes events to Cloud Pub/Sub whenever a book is updated in the database.

  2. This publishing is done by adding tasks to a particular queue.

  3. The worker, running separately, listens for those events.

  4. When an event is received, the worker makes a request to the Google Books API for information about the book and updates the book's record in the database.

  5. After the record is updated, you can refresh the book's info page to see the new information.

Understanding the code

The following sections describe how to create a queue, add tasks to the queue, and use the worker to process tasks in the App Engine flexible environment.

This sample uses psq, a Python package that implements a distributed task queue using Cloud Pub/Sub. psq is intentionally similar to the rq Python package.

Creating a queue

Tasks are added to a particular queue. The worker watches this queue for tasks to execute. The following example shows how to create a queue with psq.

publisher_client = pubsub.PublisherClient()
subscriber_client = pubsub.SubscriberClient()


def get_books_queue():
    project = current_app.config['PROJECT_ID']

    # Create a queue specifically for processing books and pass in the
    # Flask application context. This ensures that tasks will have access
    # to any extensions / configuration specified to the app, such as
    # models.
    return psq.Queue(
        publisher_client, subscriber_client, project,
        'books', extra_context=current_app.app_context)

If you are using rq or celery, the configuration is similar.

Queueing tasks

A task is a Python function that is executed by the worker instead of the app. The Bookshelf app enqueues the task to process a book whenever a book is created or updated.

q = tasks.get_books_queue()
q.enqueue(tasks.process_book, book['id'])

Other task queues, such as rq and celery, work similarly. For information about process_book, see the Processing books section later in this topic.

Running the worker

The worker is a separate app that listens to Cloud Pub/Sub events.

The psq package includes the standalone worker psqworker. This worker is a Python process that runs in the background to perform lengthy or blocking tasks that you don't want to perform in the main process of execution. This is similar to rq's rqworker.

Start psqworker with a subscription to the queue pointed to by main.books_queue.

psqworker main.books_queue

When an event is published to Cloud Pub/Sub by being enqueued as a task, psqworker automatically dequeues and runs the task.

Because psqworker needs to import the queue, you must make the queue available as a module-level import.

# Make the queue available at the top-level, this allows you to run
# `psqworker main.books_queue`. We have to use the app's context because
# it contains all the configuration for plugins.
# If you were using another task queue, such as celery or rq, you can use this
# section to configure your queues to work with Flask.
with app.app_context():
    books_queue = bookshelf.tasks.get_books_queue()

Processing books

To process a book, the task fetches the book by its ID, finds additional information, and then saves the updated information back to the database.

def process_book(book_id):
    """
    Handles an individual Bookshelf message by looking it up in the
    model, querying the Google Books API, and updating the book in the model
    with the info found in the Books API.
    """

    model = get_model()

    book = model.read(book_id)

    if not book:
        logging.warn("Could not find book with id {}".format(book_id))
        return

    if 'title' not in book:
        logging.warn("Can't process book id {} without a title."
                     .format(book_id))
        return

    logging.info("Looking up book with title {}".format(book[
                                                        'title']))

    new_book_data = query_books_api(book['title'])

    if not new_book_data:
        return

    book['title'] = new_book_data.get('title')
    book['author'] = ', '.join(new_book_data.get('authors', []))
    book['publishedDate'] = new_book_data.get('publishedDate')
    book['description'] = new_book_data.get('description')

    # If the new book data has thumbnail images and there isn't currently a
    # thumbnail for the book, then copy the image to cloud storage and update
    # the book data.
    if not book.get('imageUrl') and 'imageLinks' in new_book_data:
        new_img_src = new_book_data['imageLinks']['smallThumbnail']
        book['imageUrl'] = download_and_upload_image(
            new_img_src,
            "{}.jpg".format(book['title']))

    model.update(book, book_id)

The task only needs the book's ID. It's better to provide the ID of the book instead of the book instance because the data could be stale by the time the worker processes the task.

Additionally, the worker calls bookshelf.get_model() despite not running in a Flask app. When the queue is configured in get_books_queue, the app context is made available to all tasks.

The query_books_api function handles calling the Google Books API to get more information about the book.

def query_books_api(title):
    """
    Queries the Google Books API to find detailed information about the book
    with the given title.
    """
    r = requests.get('https://www.googleapis.com/books/v1/volumes', params={
        'q': title
    })

    try:
        data = r.json()['items'][0]['volumeInfo']
        return data

    except KeyError:
        logging.info("No book found for title {}".format(title))
        return None

    except ValueError:
        logging.info("Unexpected response from books API: {}".format(r))
        return None


def download_and_upload_image(src, dst_filename):
    """
    Downloads an image file and then uploads it to Google Cloud Storage,
    essentially re-hosting the image in GCS. Returns the public URL of the
    image in GCS
    """
    r = requests.get(src)

    if not r.status_code == 200:
        return

    return storage.upload_file(
        r.content,
        dst_filename,
        r.headers.get('content-type', 'image/jpeg'))

Running on Google Cloud Platform

App Engine apps can have multiple, independent services, so you can independently deploy, configure, scale, and update pieces of your app.

The worker is deployed as a separate service within the same app. The frontend is deployed to the default service, and the worker is deployed to the worker service.

Even though the worker doesn't serve any web requests to users, or even run a web app, it's a good idea to provide an HTTP health check to ensure that the service is running and responsive. It is, however, possible to disable health checking.

To provide a health check, the worker starts two processes instead of one. The first process is psqworker and the other process is a simple Flask app that ensures the psqworker process is running and responsive to health checks.

The app uses Honcho, a Python port of the Foreman process manager, to manage multiple processes. The processes are configured by using a procfile.

bookshelf: gunicorn -b 0.0.0.0:$PORT main:app
worker: psqworker --pid /tmp/psq.pid main.books_queue
monitor: python monitor.py /tmp/psq.pid

The entrypoint in app.yaml now uses honcho instead of running processes directly.

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the default service, only the
# frontend process is needed.
entrypoint: honcho start -f /app/procfile bookshelf

Notice that the procfile also contains an entry for the frontend of the bookshelf app. Because the default (frontend) and worker services share the exact same codebase, the arguments to honcho in the entrypoint determine which processes are started. The following diagram contrasts the single-service deployment on the left with the multiple-service deployment on the right.

Cloud Pub/Sub deployment

You can create a service that runs both the bookshelf and worker processes, but you will lose the ability to manage and scale them separately.

The worker is a separate service, so it needs its own separate and complete YAML configuration file.

service: worker
runtime: python
env: flex

# Instead of using gunicorn directly, we'll use Honcho. Honcho is a python port
# of the Foreman process manager. For the worker service, both the queue worker
# and the monitor process are needed.
entrypoint: honcho start -f /app/procfile worker monitor

runtime_config:
  python_version: 3

beta_settings:
    # If using Cloud SQL, uncomment and set this value to the Cloud SQL
    # connection name, e.g.
    #   "project:region:cloudsql-instance"
    # You must also update the values in config.py.
    #
    # cloud_sql_instances: "your-cloudsql-connection-name"

This configuration is similar to the app.yaml file that is used for the frontend; the key differences are service: worker and the arguments passed to honcho in entrypoint.

Using the monitor app

The monitor app watches the psqworker process and provides health checks. You can reuse this monitor app to run just about any stand-alone server in the flexible environment, such as an rqworker, a celery worker, or even a redis secondary process.

import os
import sys

from flask import Flask


# The app checks this file for the PID of the process to monitor.
PID_FILE = None


# Create app to handle health checks and monitor the queue worker. This will
# run alongside the worker, see procfile.
monitor_app = Flask(__name__)


# The health check reads the PID file created by psqworker and checks the proc
# filesystem to see if the worker is running. This same pattern can be used for
# rq and celery.
@monitor_app.route('/_ah/health')
def health():
    if not os.path.exists(PID_FILE):
        return 'Worker pid not found', 503

    with open(PID_FILE, 'r') as pidfile:
        pid = pidfile.read()

    if not os.path.exists('/proc/{}'.format(pid)):
        return 'Worker not running', 503

    return 'healthy', 200


@monitor_app.route('/')
def index():
    return health()


if __name__ == '__main__':
    PID_FILE = sys.argv[1]
    monitor_app.run('0.0.0.0', 8080)

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the Versions page for App Engine.

    Go to the Versions page

  2. Select the checkbox for the non-default app version you want to delete.
  3. Click Delete to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the GCP Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click Delete to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the GCP Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox for the bucket you want to delete.
  3. Click Delete to delete the bucket.

What's next

Learn how to run the Python Bookshelf app on Compute Engine.

Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Trang này có hữu ích không? Hãy cho chúng tôi biết đánh giá của bạn:

Gửi phản hồi về...