Using Cloud Pub/Sub with Node.js

Many applications need to do background processing outside of the context of a web request. In this sample, the Bookshelf app sends tasks to a separate background worker for execution. The worker gathers information from the Google Books API and updates the book information in the database. This sample demonstrates how to set up separate services in Google App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multi-page tutorial. To start from the beginning and see instructions for setting up, go to Node.js Bookshelf App.

Configuring settings

Copy your config.json file from the Authenticating Users part of this tutorial to the nodejs-getting-started/6-pubsub directory. Add these lines to the copied file:

"TOPIC_NAME": "[YOUR_TOPIC_NAME]",
"SUBSCRIPTION_NAME": "[YOUR_SUBSCRIPTION_NAME]"

Replace [YOUR_TOPIC_NAME] and [YOUR_SUBSCRIPTION_NAME] with appropriate values, for example my-topic and my-subscription. See this page for more information on what topic and subscription names are allowed.

Installing dependencies

Install dependencies in the nodejs-getting-started/6-pubsub directory:

  • Using npm:

    npm install
    
  • Using yarn:

    yarn install
    

Running the app on your local machine

  1. Start a local web server using npm or yarn:

    • Using npm:

      npm start
      
    • Using yarn:

      yarn start
      
  2. Set the PORT environment variable, and start the worker using npm or yarn:

    • Using npm:

      SCRIPT=worker.js PORT=8081 npm start
      
    • Using yarn:

      SCRIPT=worker.js PORT=8081 yarn start
      
  3. In your web browser, enter this address.

    http://localhost:8080

Now add some well-known books to the bookshelf. If you have both the application and worker instance running locally, you can watch the worker update the book information in the background.

It's important that you run the worker at least once before you add books. Otherwise, the worker will not be listening for events. The events will be lost and you will not see any changes to the bookshelf data. If the worker has been run at least once, the events are queued, even when the worker is not running.

You can reach the worker instance at http://localhost:8081. The worker's web page displays status about the number of books it has processed.

Deploying the app to the App Engine flexible environment

  1. Deploy the worker:

    gcloud app deploy worker.yaml
    
  2. Deploy the sample app:

    gcloud app deploy
    
  3. In your web browser, enter this address. Replace [YOUR_PROJECT_ID] with your project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you can deploy the updated version by entering the same command you used to deploy the app the first time. The new deployment creates a new version of your app and promotes it to the default version. The older versions of your app remain, as do their associated VM instances. Be aware that all of these app versions and VM instances are billable resources.

You can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the Cloud Platform Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

For complete information about cleaning up billable resources, see the Cleaning up section in the final step of this tutorial.

Application structure

The following diagram shows the application components and how they connect to each other.

Cloud Pub/sub sample structure

The application publishes events to Cloud Pub/Sub whenever a book is updated in the database. The worker, running separately, will listen for these events. When the event is received the worker makes a request to the Google Books API for information about the book and updates the book's record in the database. After the record is updated, you should be able to refresh the book's info page and see the new information.

Understanding the code

This section walks you through the application code and explains how it works.

Publishing events to Cloud Pub/Sub

Events are published to topics in Cloud Pub/Sub.

function getTopic (cb) {
  pubsub.createTopic(topicName, (err, topic) => {
    // topic already exists.
    if (err && err.code === 409) {
      cb(null, pubsub.topic(topicName));
      return;
    }
    cb(err, topic);
  });
}

The application will send an event to the topic containing the ID of the book that was updated. This allows the worker to know which book should be processed.

function queueBook (bookId) {
  getTopic((err, topic) => {
    if (err) {
      logging.error('Error occurred while getting pubsub topic', err);
      return;
    }

    topic.publish({
      data: {
        action: 'processBook',
        bookId: bookId
      }
    }, (err) => {
      if (err) {
        logging.error('Error occurred while queuing background task', err);
      } else {
        logging.info(`Book ${bookId} queued for background processing`);
      }
    });
  });
}

The queueBook function is called from the model whenever a book is created or updated. The implement depends on which database backend you chose:

Datastore

function update (id, data, queueBook, cb) {
  let key;
  if (id) {
    key = ds.key([kind, parseInt(id, 10)]);
  } else {
    key = ds.key(kind);
  }

  const entity = {
    key: key,
    data: toDatastore(data, ['description'])
  };

  ds.save(
    entity,
    (err) => {
      if (err) {
        cb(err);
        return;
      }
      data.id = entity.key.id;
      if (queueBook) {
        background.queueBook(data.id);
      }
      cb(null, data);
    }
  );
}

Cloud SQL

function create (data, queueBook, cb) {
  connection.query('INSERT INTO `books` SET ?', data, (err, res) => {
    if (err) {
      cb(err);
      return;
    }
    if (queueBook) {
      background.queueBook(res.insertId);
    }
    read(res.insertId, cb);
  });
}
function update (id, data, queueBook, cb) {
  connection.query(
    'UPDATE `books` SET ? WHERE `id` = ?', [data, id], (err) => {
      if (err) {
        cb(err);
        return;
      }
      if (queueBook) {
        background.queueBook(id);
      }
      read(id, cb);
    });
}

MongoDB

function create (data, queueBook, cb) {
  getCollection((err, collection) => {
    if (err) {
      cb(err);
      return;
    }
    collection.insert(data, {w: 1}, (err, result) => {
      if (err) {
        cb(err);
        return;
      }
      const item = fromMongo(result.ops);
      if (queueBook) {
        background.queueBook(item.id);
      }
      cb(null, item);
    });
  });
}
function update (id, data, queueBook, cb) {
  getCollection((err, collection) => {
    if (err) {
      cb(err);
      return;
    }
    collection.update(
      { _id: new ObjectID(id) },
      { '$set': toMongo(data) },
      { w: 1 },
      (err) => {
        if (err) {
          cb(err);
          return;
        }
        if (queueBook) {
          background.queueBook(id);
        }
        read(id, cb);
      }
    );
  });
}

The worker application

The worker is a separate application that listens to pub/sub events instead of serving a user-facing web application. This splits the application into two independent services that communicate by using pub/sub, instead of directly with each other. Separating the services allows the you to configure and scale the number of frontend and worker instances separately.

Listening for events

In order to receive messages, a worker must create or use a subscription to a topic. The frontend publishes events to a specific topic and the worker subscribes to that same topic.

function subscribe (cb) {
  let subscription;

  // Event handlers
  function handleMessage (message) {
    cb(null, message.data);
  }
  function handleError (err) {
    logging.error(err);
  }

  getTopic((err, topic) => {
    if (err) {
      cb(err);
      return;
    }

    topic.subscribe(subscriptionName, {
      autoAck: true
    }, (err, sub) => {
      if (err) {
        cb(err);
        return;
      }

      subscription = sub;

      // Listen to and handle message and error events
      subscription.on('message', handleMessage);
      subscription.on('error', handleError);

      logging.info(`Listening to ${topicName} with subscription ${subscriptionName}`);
    });
  });

  // Subscription cancellation function
  return () => {
    if (subscription) {
      // Remove event listeners
      subscription.removeListener('message', handleMessage);
      subscription.removeListener('error', handleError);
      subscription = undefined;
    }
  };
}

If you want Cloud Pub/Sub to evenly distribute events to each worker, use the same subscriptionName value between all of them. Otherwise, if each worker had a unique subscription, every worker would receive a copy of every event.

The worker uses the subscribe function to listen to events and trigger processing, as follows:

const unsubscribeFn = background.subscribe((err, message) => {
  // Any errors received are considered fatal.
  if (err) {
    throw err;
  }
  if (message.data.action === 'processBook') {
    logging.info(`Received request to process book ${message.data.bookId}`);
    processBook(message.data.bookId);
  } else {
    logging.warn('Unknown request', message);
  }
});

Processing books

To process the book, the worker fetches the book by its ID, finds additional information, and then saves the updated information back to the database:

function processBook (bookId, callback) {
  waterfall([
    // Load the current data
    (cb) => {
      model.read(bookId, cb);
    },
    // Find the information from Google
    findBookInfo,
    // Save the updated data
    (updated, cb) => {
      model.update(updated.id, updated, false, cb);
    }
  ], (err) => {
    if (err) {
      logging.error('Error occurred', err);
      if (callback) {
        callback(err);
      }
      return;
    }
    logging.info(`Updated book ${bookId}`);
    bookCount += 1;
    if (callback) {
      callback();
    }
  });
}

The findBookInfo function handles processing the new information about the book:

function findBookInfo (book, cb) {
  queryBooksApi(book.title, (err, r) => {
    if (!err && !r.items) {
      err = 'Not found';
    }
    if (err) {
      cb(err);
      return;
    }
    const top = r.items[0];

    book.title = top.volumeInfo.title;
    book.author = (top.volumeInfo.authors || []).join(', ');
    book.publishedDate = top.volumeInfo.publishedDate;
    book.description = book.description || top.volumeInfo.description;

    // If there is already an image for the book or if there's no
    // thumbnails, go ahead and return.
    if (book.imageUrl || !top.volumeInfo.imageLinks) {
      cb(null, book);
      return;
    }

    // Otherwise, try to fetch them and upload to cloud storage.
    const imageUrl =
      top.volumeInfo.imageLinks.thumbnail ||
      top.volumeInfo.imageLinks.smallThumbnail;
    const imageName = `${book.id}.jpg`;

    images.downloadAndUploadImage(
      imageUrl, imageName, (err, publicUrl) => {
        if (!err) {
          book.imageUrl = publicUrl;
        }
        cb(null, book);
      });
  });
}

The findBookInfo function calls queryBooksApi to fetch more information from Google Books:

function queryBooksApi (query, cb) {
  request(
    `https://www.googleapis.com/books/v1/volumes?q=${encodeURIComponent(query)}`,
    (err, resp, body) => {
      if (err || resp.statusCode !== 200) {
        cb(err || `Response returned ${resp.statusCode}`);
        return;
      }
      cb(null, JSON.parse(body));
    }
  );
}

The findBookInfo function also attempts to download any available cover photo and then upload it to Cloud Storage:

function downloadAndUploadImage (sourceUrl, destFileName, cb) {
  const file = bucket.file(destFileName);

  request
    .get(sourceUrl)
    .on('error', (err) => {
      logging.warn(`Could not fetch image ${sourceUrl}`, err);
      cb(err);
    })
    .pipe(file.createWriteStream())
    .on('finish', () => {
      logging.info(`Uploaded image ${destFileName}`);
      file.makePublic(() => {
        cb(null, getPublicUrl(destFileName));
      });
    })
    .on('error', (err) => {
      logging.error('Could not upload image', err);
      cb(err);
    });
}

Running on Google Cloud Platform

Even though the worker does not serve any web requests to users, providing health checks is recommended for all applications running on Google App Engine. Health checking sends a periodic HTTP GET request to /_ah/health to ensure that your application instance is healthy. By default, any status response other than HTTP 500 indicates a healthy instance, including HTTP 404. You didn't need to explicitly define a health check in the frontend application because it responded with a 404 status code for /_ah/health.

To handle health checks for the backend worker, you can create a simple web server:

const app = express();

app.use(logging.requestLogger);

app.get('/_ah/health', (req, res) => {
  res.status(200).send('ok');
});

// Keep count of how many books this worker has processed
let bookCount = 0;

app.get('/', (req, res) => {
  res.send(`This worker has processed ${bookCount} books.`);
});

app.use(logging.errorLogger);

if (module === require.main) {
  const server = app.listen(config.get('PORT'), () => {
    const port = server.address().port;
    console.log(`App listening on port ${port}`);
  });
}

The worker will now listen for both health check requests as well as for events from Cloud Pub/Sub.

The worker also needs its own module configuration.

service: worker
runtime: nodejs
env: flex

env_variables:
  SCRIPT: worker.js

This configuration is very similar to the app.yaml file that is used for the frontend, but the key difference is the service: worker clause. App Engine applications can have multiple, independent services. This means that you can independently deploy, configure, scale, and update pieces of your application easily.

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the Cloud Platform Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the Cloud Platform Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the Cloud Platform Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click the Delete button at the top of the page to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the Cloud Platform Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click the checkbox next to the bucket you want to delete.
  3. Click the Delete button at the top of the page to delete the bucket.

What's next

Learn how to run the Node.js Bookshelf sample on Compute Engine.

Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Send feedback about...