Using Pub/Sub with Ruby

Many applications need to do background processing outside of the context of a web request. In this sample, the Bookshelf app sends tasks to a separate background worker for execution. The worker gathers information from the Google Books API and updates the book information in the database. This sample demonstrates how to set up separate services in Google App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multi-page tutorial. To start from the beginning and see instructions for setting up, go to Ruby Bookshelf App.

Installing dependencies

Go to the getting-started-ruby/6-task-queueing directory, and enter this command:

bundle install

Configuring settings

  1. Copy the example settings file:

    cp config/settings.example.yml config/settings.yml
    
  2. Open settings.yml for editing. Replace the <your-project-id> with your project ID.

    default: &default
      project_id: <your-project-id>
    
  3. Set the other variables to the same values you used in the Authenticating Users part of this tutorial.

    For example, suppose your web application client ID is XYZCLIENTID and your client secret is XYZCLIENTSECRET. Also suppose your Cloud Storage bucket name is my-project, your access key ID is ABCKEY, and your secret access key is abcsecret. Then the default section of your settings.yml file would look like this:

    default: &default
      project_id: [YOUR_PROJECT_ID]
      oauth2:
        client_id: XYZCLIENTID
        client_secret: XYZCLIENTSECRET
      cloud_storage:
        bucket: my-project
        access_key_id: ABCKEY
        secret_access_key: abcsecret
    
  4. Copy the example database configuration file:

    cp config/database.example.yml config/database.yml
    
  5. Configure the sample app to use the same database that you set up during the Using Structured Data part of this tutorial:

    Cloud SQL

    Edit database.yml. Uncomment the lines in the Cloud SQL portion of the file. Replace the [PLACEHOLDERS] with the specific values for your Cloud SQL instance and database.

     mysql_settings: &mysql_settings
       adapter: mysql2
       encoding: utf8
       pool: 5
       timeout: 5000
       username: [MYSQL_USER]
       password: [MYSQL_PASS]
       database: [MYSQL_DATABASE]
       socket: /cloudsql/[YOUR_INSTANCE_CONNECTION_NAME]
    
    • Replace [MYSQL_USER] and [MYSQL_PASS] with your Cloud SQL instance username and password that you created previously.

    • Replace [MYSQL_DATABASE] with the name of the database that you created previously.

    • Replace [YOUR_INSTANCE_CONNECTION_NAME] with the Instance Connection Name of your Cloud SQL instance.

    Run migrations:

    rake db:migrate
    

    PostgreSQL

    Edit database.yml. Uncomment the lines in the PostgreSQL portion of the file. Replace the your-postgresql-* placeholders with the specific values for your PostgreSQL instance and database. For example, suppose your IPv4 address is 173.194.230.44, your username is postgres, and your password is pword123. Also suppose your database name of bookshelf. Then the PostgreSQL portion of your database.yml file would look like this:

    # PostgreSQL Sample Database Configuration
    # ----------------------------------------
      adapter: postgresql
      encoding: unicode
      pool: 5
      username: postgres
      password: pword123
      host: 173.194.230.44
      database: bookshelf
    

    Create the required database and tables:

    rake db:create
    rake db:migrate
    

    Cloud Datastore

    Edit database.yml. Uncomment the one line in the Cloud Datastore portion of the file. Replace your-project-id with your project ID. For example, suppose your project ID is my-project: Then the Cloud Datastore portion of your database.yml file would look like this:

    # Google Cloud Datastore Sample Database Configuration
    # ----------------------------------------------------
    dataset_id: my-project
    

    Run a rake task to copy the sample project files for Cloud Datastore:

    rake backend:datastore
    

Running the app on your local machine

  1. Start the local web server and two worker processes:

    bundle exec foreman start --formation web=1,worker=2
    
  2. In your web browser, enter this address:

    http://localhost:8080
    

Now add some well-known books to the bookshelf. You can watch the workers update the book information in the background.

The Foreman RubyGem starts the Rails web server and runs two worker processes.

The worker establishes a Pub/Sub subscription to listen for events. After the subscription exists, events published to the topic will be queued, even if there is no worker currently listening for events. When a worker comes online, Pub/Sub delivers any queued events.

You'll learn more about how the system works in the upcoming sections.

When you're ready to move forward, press Ctrl+C to exit the local web server and worker processes.

Deploying the app to the App Engine flexible environment

  1. Compile JavaScript assets for production:

    RAILS_ENV=production rake assets:precompile
    
  2. Deploy the worker:

    gcloud app deploy worker.yaml
    
  3. Deploy the sample app:

    gcloud app deploy
    
  4. In your web browser, enter this address. Replace [YOUR_PROJECT_ID] with your project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you can deploy the updated version by entering the same command you used to deploy the app the first time. The new deployment creates a new version of your app and promotes it to the default version. The older versions of your app remain, as do their associated VM instances. Be aware that all of these app versions and VM instances are billable resources.

You can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the Cloud Platform Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

For complete information about cleaning up billable resources, see the Cleaning up section in the final step of this tutorial.

Application structure

This diagram shows the app's components and how they fit together.

Auth sample structure

Understanding the code

This section walks you through the application code and explains how it works.

Queueing tasks

To gather information from the Google Books API for books added to the Bookshelf, the Book class enqueues a task:

after_create :lookup_book_details

private

def lookup_book_details
  if [author, description, published_on, image_url].any? {|attr| attr.blank? }
    LookupBookDetailsJob.perform_later self
  end
end

The preceding code creates an Active Record callback and specifies lookup_book_details as the method to be called after a book is created and saved in the database. If the book is missing any information, a job is enqueued to lookup the book's details.

LookupBookDetailsJob is an Active Job job.

The code passes the book to be updated, self, to LookupBookDetailsJob.perform_later. This enqueues a job to look up the book's details.

Pub/Sub Active Job backend

Active Job can be configured to use a custom backend, for example, Delayed Job or Resque, for enqueuing tasks. The Bookshelf sample app has its own custom backend, which is specified in the Application class:

config.active_job.queue_adapter = :pub_sub_queue

An Active Job backend, which is also called an adapter, must provide an enqueue method. When a job is enqueued using perform_later, the job is passed to the enqueue method of the configured Active Job backend.

The sample app enqueues a job by creating a subscription to a Pub/Sub topic, and then publishing the ID of a book to the topic. Once the subscription exists, messages are queued even if there is no worker currently listening. When a worker comes online, Pub/Sub any queued events.

require "google/cloud"

module ActiveJob
  module QueueAdapters
    class PubSubQueueAdapter

      def self.pubsub
        project_id = Rails.application.config.x.settings["project_id"]
        gcloud     = Google::Cloud.new project_id

        gcloud.pubsub
      end

      def self.enqueue job
        Rails.logger.info "[PubSubQueueAdapter] enqueue job #{job.inspect}"

        book  = job.arguments.first
        topic = pubsub.topic "lookup_book_details_queue"

        topic.publish book.id.to_s
      end

The preceding code uses the google-cloud RubyGem to interact with Pub/Sub. The Googl Cloud client library is an idiomatic Ruby client for interacting with Google Cloud Platform services.

gem "gcloud"

To process books added to a queue, a Pub/Sub Subscription listens for messages published to the lookup_book_details_queue topic. This is covered in the worker section.

Books API

The sample app uses the Google API client RubyGem to lookup book details from the Books API.

gem "google-api-client", "~> 0.9"

When a job runs, the LookupBookDetilsJob.perform method retrieves a list of books, based on a book title, from the Books API.

require "google/apis/books_v1"

BooksAPI = Google::Apis::BooksV1

class LookupBookDetailsJob < ActiveJob::Base
  queue_as :default

  def perform book
    Rails.logger.info "[BookService] Lookup details for book" +
                      "#{book.id} #{book.title.inspect}"

    # Create Book API Client
    book_service = BooksAPI::BooksService.new
    # Books API does not require authentication
    book_service.authorization = nil

    # Lookup a list of relevant books based on the provided book title.
    book_service.list_volumes book.title, order_by: "relevance" do |results, error|
      # Error ocurred soft-failure
      if error
        Rails.logger.error "[BookService] #{error.inspect}"
        break
      end

      # Book was not found
      if results.total_items.zero?
        Rails.logger.info "[BookService] #{book.title} was not found."
        break
      end

      # List of relevant books
      volumes = results.items

If a book volume result includes a title, author and book cover image, then it is selected as the best match. Otherwise the first result is used:

# To provide the best results, find the first returned book that
# includes title and author information as well as a book cover image.
best_match = volumes.find {|volume|
  info = volume.volume_info
  info.title && info.authors && info.image_links.try(:thumbnail)
}

volume = best_match || volumes.first

If any relevant volume is found, the book details are updated and saved in the database:

if volume
  info   = volume.volume_info
  images = info.image_links

  publication_date = info.published_date
  publication_date = "#{$1}-01-01" if publication_date =~ /^(\d{4})$/
  publication_date = Date.parse publication_date

  book.author       = info.authors.join(", ") unless book.author.present?
  book.published_on = publication_date unless book.published_on.present?
  book.description  = info.description unless book.description.present?
  book.image_url    = images.try(:thumbnail) unless book.image_url.
                                                         present?
  book.save
end

The worker

A worker process handles book lookup jobs. To run the worker, you can run this command, as specified in Procfile:

rake run_worker

The run_worker rake task calls PubSubQueueAdapter to start a worker.

desc "Run task queue worker"
task run_worker: :environment do
  ActiveJob::QueueAdapters::PubSubQueueAdapter.run_worker!
end

When the worker runs, it listens for messages on the lookup_book_details Pub/Sub subscription to the lookup_book_details_queue topic. When a message is received, the associated book is retrieved from the database and the LookupBookDetailsJob runs immediately to update the book.

def self.run_worker!
  Rails.logger.info "Running worker to lookup book details"

  topic        = pubsub.topic       "lookup_book_details_queue"
  subscription = topic.subscription "lookup_book_details"

  topic.subscribe "lookup_book_details" unless subscription.exists?

  subscription.listen autoack: true do |message|
    Rails.logger.info "Book lookup request (#{message.data})"

    book_id = message.data.to_i
    book    = Book.find_by_id book_id

    LookupBookDetailsJob.perform_now book if book
  end
end

Running on Cloud Platform

The worker is deployed as a separate module within the same application. App Engine applications can have multiple, independent services. This means that you can easily and independently deploy, configure, scale and update pieces of your application. The front end is deployed to the default module, and the worker is deployed to the worker module.

Even though the worker does not serve any web requests to users, or even run a web application, we strongly recommend that you provide an HTTP health check when running in the App Engine flexible environment to ensure that the service is running and responsive. It is, however, possible to disable health checking.

To provide a health check, the worker starts two processes instead of one. The first process is worker and the second process is health_check, which runs a simple Rack application application that responds to HTTP requests with a successful response for health checks:

# Respond to HTTP requests with non-500 error code
run lambda {|env| [200, {"Content-Type" => "text/plain"}, ["ok"]] }

The app uses Foreman to manage multiple processes. The processes are configured in Procfile:

web: bundle exec rackup -p 8080
worker: bundle exec rake run_worker
health_check: bundle exec rackup -p 8080 health_check.ru

Foreman is now used as the entrypoint for the docker container. This is specified in app.yaml and worker.yaml.

entrypoint: bundle exec foreman start --formation "$FORMATION"

Notice that Procfile contains an entry for the web front end for running the Bookshelf Rails application as well. Because the default (front-end) and worker services share the same codebase, the FORMATION environment variable is used to control which processes are started. The following diagram contrasts the single module deployment on the left with the multi-module deployment on the right:

Pub/sub deployment

The environment variables are set by app.yaml and worker.yaml.

env_variables:
  FORMATION: web=1

The worker is a separate module, so it needs its own yaml configuration file.

env_variables:
  FORMATION: worker=5,health_check=1

This configuration is similar to the app.yaml file that is used for the front end; the key differences are module: worker and the FORMATION environment variable, which configures Foreman to run five workers and the front end for the health check instead of the Bookshelf web application.

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the Cloud Platform Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the Cloud Platform Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the Cloud Platform Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click the Delete button at the top of the page to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the Cloud Platform Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click the checkbox next to the bucket you want to delete.
  3. Click the Delete button at the top of the page to delete the bucket.

What's next

Learn how to run the Ruby Bookshelf sample on Compute Engine.

Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Send feedback about...