Using Cloud Pub/Sub with Node.js

Many applications need to do background processing outside of the context of a web request. In this sample, the Bookshelf app sends tasks to a separate worker service for execution. The worker gathers information from the Google Books API and updates the book information in the database. This sample demonstrates how to set up separate services in Google App Engine, how to use Cloud Pub/Sub to distribute messages, and how to deal with lifecycle events.

This page is part of a multi-page tutorial. To start from the beginning and see instructions for setting up, go to Node.js Bookshelf App.

Configuring settings

First, create a new topic in Cloud Pub/Sub:

  1. Go to the Pub/Sub Topics page of Cloud Console.
  2. Click Create Topic.
  3. Type in a name for your Pub/Sub topic, such as sample-topic.

Next, create a new subscription to the topic you just created:

  1. In the Pub/Sub Topics page of Cloud Console, click the name of the topic you just created.
  2. Click Create Subscription.
  3. Type in a name for you Pub/Sub subscription, such as sample-subscription.
  4. Choose Push into an endpoint url for delivery type. Use

    https://worker-dot-[YOUR-PROJECT-ID].appspot.com/endpoint

    for the endpoint url. Replace [YOUR-PROJECT-ID] with the value of your own.

And lastly, copy your config.json file from the Authenticating Users part of this tutorial to the nodejs-getting-started/6-pubsub directory. Add these lines to the copied file:

"TOPIC_NAME": "[YOUR_TOPIC_NAME]",

Replace [YOUR_TOPIC_NAME] with the name of the topic you just created.

Installing dependencies

Install dependencies in the nodejs-getting-started/6-pubsub directory:

  • Using npm:

    npm install
    
  • Using yarn:

    yarn install
    

Running the app on your local machine

  1. Start a local web server using npm or yarn:

    • Using npm:

      npm start
      
    • Using yarn:

      yarn start
      
  2. Set the PORT environment variable, and start the worker using npm or yarn:

    • Using npm:

      SCRIPT=worker.js PORT=8081 npm start
      
    • Using yarn:

      SCRIPT=worker.js PORT=8081 yarn start
      
  3. In your web browser, enter the following address:

    http://localhost:8080

You can reach the worker instance at http://localhost:8081. The worker's web page displays status about the number of books it has processed. Note that the worker service only starts processing messages after deployment.

Deploying the app to the App Engine standard environment

  1. Deploy the worker:

    gcloud app deploy worker.yaml
    
  2. Deploy the sample app:

    gcloud app deploy
    

  1. In your web browser, enter this address. Replace [YOUR_PROJECT_ID] with your project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you can deploy the updated version by entering the same command you used to deploy the app the first time. The new deployment creates a new version of your app and promotes it to the default version. The older versions of your app remain. By default the App Engine standard environment scales to 0 instances when there is no incoming traffic to a version, thus unused versions should not cost anything. However, all of these app versions are still billable resources.

See the Cleaning up section in the final step of this tutorial for more information on cleaning up billable resources, including non-default app versions.

Application structure

The following diagram shows the application components and how they connect to each other.

Cloud Pub/sub sample structure

The application publishes events to Cloud Pub/Sub whenever a book is updated in the database. The worker, running separately, will listen for these events. When the event is received the worker makes a request to the Google Books API for information about the book and updates the book's record in the database. After the record is updated, you should be able to refresh the book's info page and see the new information.

Understanding the code

This section walks you through the application code and explains how it works.

Publishing events to Cloud Pub/Sub

Events are published to topics in Cloud Pub/Sub.

function getTopic (cb) {
  pubsub.createTopic(topicName, (err, topic) => {
    // topic already exists.
    if (err && err.code === 6) {
      cb(null, pubsub.topic(topicName));
      return;
    }
    cb(err, topic);
  });
}

The application will send an event to the topic containing the ID of the book that was updated. This allows the worker to know which book should be processed.

function queueBook (bookId) {
  getTopic((err, topic) => {
    if (err) {
      logging.error('Error occurred while getting pubsub topic', err);
      return;
    }

    const data = {
      action: 'processBook',
      bookId: bookId
    };

    const publisher = topic.publisher();
    publisher.publish(Buffer.from(JSON.stringify(data)), (err) => {
      if (err) {
        logging.error('Error occurred while queuing background task', err);
      } else {
        logging.info(`Book ${bookId} queued for background processing`);
      }
    });
  });
}

The queueBook function is called from the model whenever a book is created or updated. The implementation depends on which database backend you chose:

Datastore

function update (id, data, queueBook, cb) {
  let key;
  if (id) {
    key = ds.key([kind, parseInt(id, 10)]);
  } else {
    key = ds.key(kind);
  }

  const entity = {
    key: key,
    data: toDatastore(data, ['description'])
  };

  ds.save(
    entity,
    (err) => {
      if (err) {
        cb(err);
        return;
      }
      data.id = entity.key.id;
      if (queueBook) {
        background.queueBook(data.id);
      }
      cb(null, data);
    }
  );
}

Cloud SQL

function create (data, queueBook, cb) {
  connection.query('INSERT INTO `books` SET ?', data, (err, res) => {
    if (err) {
      cb(err);
      return;
    }
    if (queueBook) {
      background.queueBook(res.insertId);
    }
    read(res.insertId, cb);
  });
}
function update (id, data, queueBook, cb) {
  connection.query(
    'UPDATE `books` SET ? WHERE `id` = ?', [data, id], (err) => {
      if (err) {
        cb(err);
        return;
      }
      if (queueBook) {
        background.queueBook(id);
      }
      read(id, cb);
    });
}

The worker application

The worker is a separate application that listens to pub/sub events instead of serving a user-facing web application. This splits the application into two independent services that communicate by using pub/sub, instead of directly with each other. Separating the services allows you to configure and scale the number of frontend and worker instances separately.

Listening for events

The worker service accepts messages from Cloud Pub/Sub at /endpoint:

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

App Engine distributes requests (Pub/Sub messages) between all the available instances in the worker service based on their current workload and by default scales the service if there is a surge in the number of messages.

Processing books

To process the book, the worker fetches the book by its ID, finds additional information, and then saves the updated information back to the database:

function processBook (bookId, callback) {
  waterfall([
    // Load the current data
    (cb) => {
      model.read(bookId, cb);
    },
    // Find the information from Google
    findBookInfo,
    // Save the updated data
    (updated, cb) => {
      model.update(updated.id, updated, false, cb);
    }
  ], (err) => {
    if (err) {
      logging.error('Error occurred', err);
      if (callback) {
        callback(err);
      }
      return;
    }
    logging.info(`Updated book ${bookId}`);
    bookCount += 1;
    if (callback) {
      callback();
    }
  });
}

The findBookInfo function handles processing the new information about the book:

function findBookInfo (book, cb) {
  queryBooksApi(book.title, (err, r) => {
    if (!err && !r.items) {
      err = 'Not found';
    }
    if (err) {
      cb(err);
      return;
    }
    const top = r.items[0];

    book.title = top.volumeInfo.title;
    book.author = (top.volumeInfo.authors || []).join(', ');
    book.publishedDate = top.volumeInfo.publishedDate;
    book.description = book.description || top.volumeInfo.description;

    // If there is already an image for the book or if there's no
    // thumbnails, go ahead and return.
    if (book.imageUrl || !top.volumeInfo.imageLinks) {
      cb(null, book);
      return;
    }

    // Otherwise, try to fetch them and upload to cloud storage.
    const imageUrl =
      top.volumeInfo.imageLinks.thumbnail ||
      top.volumeInfo.imageLinks.smallThumbnail;
    const imageName = `${book.id}.jpg`;

    images.downloadAndUploadImage(
      imageUrl, imageName, (err, publicUrl) => {
        if (!err) {
          book.imageUrl = publicUrl;
        }
        cb(null, book);
      });
  });
}

The findBookInfo function calls queryBooksApi to fetch more information from Google Books:

function queryBooksApi (query, cb) {
  request(
    `https://www.googleapis.com/books/v1/volumes?country=US&q=${encodeURIComponent(query)}`,
    (err, resp, body) => {
      if (err || resp.statusCode !== 200) {
        console.log(`Error: ${err}`);
        console.log(`Response from Google Books: ${resp}`);
        console.log(`Response body from Google Books: ${body}`);
        cb(err || `Response returned ${resp.statusCode}`);
        return;
      }
      cb(null, JSON.parse(body));
    }
  );
}

The findBookInfo function also attempts to download any available cover photo and then upload it to Cloud Storage:

function downloadAndUploadImage (sourceUrl, destFileName, cb) {
  const file = bucket.file(destFileName);

  request
    .get(sourceUrl)
    .on('error', (err) => {
      logging.warn(`Could not fetch image ${sourceUrl}`, err);
      cb(err);
    })
    .pipe(file.createWriteStream({
      resumable: false
    }))
    .on('finish', () => {
      logging.info(`Uploaded image ${destFileName}`);
      file.makePublic(() => {
        cb(null, getPublicUrl(destFileName));
      });
    })
    .on('error', (err) => {
      logging.error('Could not upload image', err);
      cb(err);
    });
}

Running on Google Cloud Platform

Even though the worker does not serve any web requests to users, providing health checks is recommended for all applications running on Google App Engine. Health checking sends a periodic HTTP GET request to /_ah/health to ensure that your application instance is healthy. By default, any status response other than HTTP 500 indicates a healthy instance, including HTTP 404. You didn't need to explicitly define a health check in the frontend application because it responded with a 404 status code for /_ah/health.

To handle health checks for the backend worker, you can create a simple web server:

const app = express();

const jsonParser = bodyParser.json();

app.use(logging.requestLogger);

app.get('/_ah/health', (req, res) => {
  res.status(200).send('ok');
});

// Keep count of how many books this worker has processed
let bookCount = 0;

app.get('/', (req, res) => {
  res.status(200).send(`This worker has processed ${bookCount} books.`);
});

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

app.use(logging.errorLogger);

if (module === require.main) {
  const server = app.listen(config.get('PORT'), () => {
    const port = server.address().port;
    console.log(`App listening on port ${port}`);
  });
}

The worker will now listen for both health check requests as well as for events from Cloud Pub/Sub.

The worker also needs its own module configuration.

service: worker
runtime: nodejs8

env_variables:
  SCRIPT: worker.js

This configuration is very similar to the app.yaml file that is used for the frontend, but the key difference is the service: worker clause. App Engine applications can have multiple, independent services. This means that you can independently deploy, configure, scale, and update pieces of your application easily.

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the GCP Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click the Delete button at the top of the page to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the GCP Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click the checkbox next to the bucket you want to delete.
  3. Click the Delete button at the top of the page to delete the bucket.

What's next

Learn how to run the Node.js Bookshelf sample on Compute Engine.

Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Was this page helpful? Let us know how we did:

Send feedback about...