Using Cloud Pub/Sub with Node.js

Many apps do background processing outside the context of a web request. In this sample, the Bookshelf app sends tasks for a separate worker service to run. The worker gathers information from the Google Books API and updates the book information in the database.

This sample demonstrates how to set up separate services in App Engine, how to use Cloud Pub/Sub to distribute messages, and how to deal with lifecycle events.

This page is part of a multipage tutorial. To start from the beginning and read the setup instructions, go to Node.js Bookshelf app.

Configuring settings

You first create a topic in Cloud Pub/Sub, and then create a subscription to that topic.

  1. In the Google Cloud Platform Console, go to the Cloud Pub/Sub Topics page.

    Go to the Topics page

  2. Click Create Topic.

  3. Type in a name for your Cloud Pub/Sub topic, such as sample-topic.

  4. Click the name of the topic that you just created.

  5. Click Create Subscription.

  6. Type in a name for your Cloud Pub/Sub subscription, such as sample-subscription.

  7. For Delivery type, choose Push into an endpoint url.

  8. In the Endpoint URL field, enter https://worker-dot-[YOUR-PROJECT-ID].appspot.com/endpoint. Replace [YOUR-PROJECT-ID] with the name of your project ID.

  9. Copy your config.json file from the Authenticating users part of this tutorial to the nodejs-getting-started/6-pubsub directory.

  10. Add this line to the copied file:

    "TOPIC_NAME": "[YOUR_TOPIC_NAME]",
    

    Replace [YOUR_TOPIC_NAME] with the name of the topic that you just created.

Installing dependencies

Install dependencies in the nodejs-getting-started/6-pubsub directory by using npm:

npm install

Running the app on your local machine

  1. Start a local web server using npm:

    npm start
    
  2. Set the PORT environment variable, and start the worker by using npm:

    SCRIPT=worker.js PORT=8081 npm start
    
  3. In your web browser, enter the following address:

    http://localhost:8080

You can reach the worker instance at http://localhost:8081. The worker's web page displays status information about the number of books it has processed. Note that the worker service only starts processing messages after deployment.

Deploying the app to the App Engine standard environment

  1. Deploy the worker from the nodejs-getting-started/6-pubsub directory:

    gcloud app deploy worker.yaml
    
  2. Deploy the sample app from the nodejs-getting-started/6-pubsub directory:

    gcloud app deploy
    
  3. In your web browser, enter this address:

    https://[YOUR_PROJECT_ID].appspot.com
    

    Replace [YOUR_PROJECT_ID] with your project ID.

If you update your app, you deploy the updated version by entering the same command that you used to deploy the app. The new deployment creates a new version of your app and promotes it to the default version. The older versions of your app remain. By default the App Engine standard environment scales to 0 instances when there is no incoming traffic to a version. Thus, unused versions shouldn't cost anything. However, all of these app versions are still billable resources.

See the Cleaning up section in the final step of this tutorial for more information on cleaning up billable resources, including non-default app versions.

App structure

The following diagram shows the app's components and how they connect to each other.

Cloud Pub/Sub structure

The app publishes events to Cloud Pub/Sub whenever a book is updated in the database. The worker, running separately, detects these events. When the event is received, the worker requests information about the book from the Google Books API and updates the book's record in the database. You can then see the new information by refreshing the book's info page.

Understanding the code

This section walks you through the app's code and explains how it works.

Publishing events to Cloud Pub/Sub

Events are published to topics in Cloud Pub/Sub.

function getTopic (cb) {
  pubsub.createTopic(topicName, (err, topic) => {
    // topic already exists.
    if (err && err.code === 6) {
      cb(null, pubsub.topic(topicName));
      return;
    }
    cb(err, topic);
  });
}

The app sends an event to the topic containing the ID of the book that was updated. This process lets the worker know which book to process.

function queueBook (bookId) {
  getTopic((err, topic) => {
    if (err) {
      logging.error('Error occurred while getting pubsub topic', err);
      return;
    }

    const data = {
      action: 'processBook',
      bookId: bookId
    };

    const publisher = topic.publisher();
    publisher.publish(Buffer.from(JSON.stringify(data)), (err) => {
      if (err) {
        logging.error('Error occurred while queuing background task', err);
      } else {
        logging.info(`Book ${bookId} queued for background processing`);
      }
    });
  });
}

The queueBook function is called from the model whenever a book is created or updated:

function update (id, data, queueBook, cb) {
  let key;
  if (id) {
    key = ds.key([kind, parseInt(id, 10)]);
  } else {
    key = ds.key(kind);
  }

  const entity = {
    key: key,
    data: toDatastore(data, ['description'])
  };

  ds.save(
    entity,
    (err) => {
      if (err) {
        cb(err);
        return;
      }
      data.id = entity.key.id;
      if (queueBook) {
        background.queueBook(data.id);
      }
      cb(null, data);
    }
  );
}

The worker app

The worker is a separate app that monitors Cloud Pub/Sub events instead of serving a user-facing web app. Consequently the app is split into two independent services that communicate by using Cloud Pub/Sub instead of communicating directly with each other. Separating the services lets you configure and scale the number of frontend and worker instances separately.

Checking for events

The worker service accepts messages from Cloud Pub/Sub at /endpoint:

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

App Engine distributes requests (Cloud Pub/Sub messages) between all the available instances in the worker service based on their current workload. By default App Engine scales the service if the number of messages surges.

Processing books

To process the book, the worker retrieves the book by its ID, finds additional information, and then saves the updated information in the database:

function processBook (bookId, callback) {
  waterfall([
    // Load the current data
    (cb) => {
      model.read(bookId, cb);
    },
    // Find the information from Google
    findBookInfo,
    // Save the updated data
    (updated, cb) => {
      model.update(updated.id, updated, false, cb);
    }
  ], (err) => {
    if (err) {
      logging.error('Error occurred', err);
      if (callback) {
        callback(err);
      }
      return;
    }
    logging.info(`Updated book ${bookId}`);
    bookCount += 1;
    if (callback) {
      callback();
    }
  });
}

The findBookInfo function processes the new information about the book:

function findBookInfo (book, cb) {
  queryBooksApi(book.title, (err, r) => {
    if (!err && !r.items) {
      err = 'Not found';
    }
    if (err) {
      cb(err);
      return;
    }
    const top = r.items[0];

    book.title = top.volumeInfo.title;
    book.author = (top.volumeInfo.authors || []).join(', ');
    book.publishedDate = top.volumeInfo.publishedDate;
    book.description = book.description || top.volumeInfo.description;

    // If there is already an image for the book or if there's no
    // thumbnails, go ahead and return.
    if (book.imageUrl || !top.volumeInfo.imageLinks) {
      cb(null, book);
      return;
    }

    // Otherwise, try to fetch them and upload to cloud storage.
    const imageUrl =
      top.volumeInfo.imageLinks.thumbnail ||
      top.volumeInfo.imageLinks.smallThumbnail;
    const imageName = `${book.id}.jpg`;

    images.downloadAndUploadImage(
      imageUrl, imageName, (err, publicUrl) => {
        if (!err) {
          book.imageUrl = publicUrl;
        }
        cb(null, book);
      });
  });
}

The findBookInfo function calls queryBooksApi to retrieve more information from Google Books:

function queryBooksApi (query, cb) {
  request(
    `https://www.googleapis.com/books/v1/volumes?country=US&q=${encodeURIComponent(query)}`,
    (err, resp, body) => {
      if (err || resp.statusCode !== 200) {
        console.log(`Error: ${err}`);
        console.log(`Response from Google Books: ${resp}`);
        console.log(`Response body from Google Books: ${body}`);
        cb(err || `Response returned ${resp.statusCode}`);
        return;
      }
      cb(null, JSON.parse(body));
    }
  );
}

The findBookInfo function also downloads any available cover art and uploads it to Cloud Storage:

function downloadAndUploadImage (sourceUrl, destFileName, cb) {
  const file = bucket.file(destFileName);

  request
    .get(sourceUrl)
    .on('error', (err) => {
      logging.warn(`Could not fetch image ${sourceUrl}`, err);
      cb(err);
    })
    .pipe(file.createWriteStream({
      resumable: false
    }))
    .on('finish', () => {
      logging.info(`Uploaded image ${destFileName}`);
      file.makePublic(() => {
        cb(null, getPublicUrl(destFileName));
      });
    })
    .on('error', (err) => {
      logging.error('Could not upload image', err);
      cb(err);
    });
}

Running on Google Cloud Platform

Even though the worker doesn't serve any web requests to users, we recommend running health checks for all apps running on App Engine. Health checking sends a periodic HTTP GET request to /_ah/health to ensure that your app instance is healthy. By default, any status response other than HTTP 500 indicates a healthy instance, including HTTP 404. You didn't need to explicitly define a health check in the frontend app because it responded with a 404 status code for /_ah/health.

To handle health checks for the backend worker, you can create a simple web server:

const app = express();

const jsonParser = bodyParser.json();

app.use(logging.requestLogger);

app.get('/_ah/health', (req, res) => {
  res.status(200).send('ok');
});

// Keep count of how many books this worker has processed
let bookCount = 0;

app.get('/', (req, res) => {
  res.status(200).send(`This worker has processed ${bookCount} books.`);
});

app.post('/endpoint', jsonParser, (req, res) => {
  if (!req.body || !req.body.message || !req.body.message.data) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  const dataUtf8encoded = Buffer.from(req.body.message.data, 'base64')
    .toString('utf8');
  var content;
  try {
    content = JSON.parse(dataUtf8encoded);
  } catch (ex) {
    logging.warn('Bad request');
    return res.sendStatus(400);
  }

  if (content.action && content.action === 'processBook' && content.bookId) {
    logging.info(`Received request to process book ${content.bookId}`);
    processBook(content.bookId);
  } else {
    logging.warn('Bad request', content);
    return res.sendStatus(400);
  }
});

app.use(logging.errorLogger);

if (module === require.main) {
  const server = app.listen(config.get('PORT'), () => {
    const port = server.address().port;
    console.log(`App listening on port ${port}`);
  });
}

The worker now monitors health check requests as well as events from Cloud Pub/Sub.

The worker also needs its own module configuration.

service: worker
runtime: nodejs8

env_variables:
  SCRIPT: worker.js

This configuration is similar to the app.yaml file that is used for the frontend, but the key difference is the service: worker clause. App Engine apps can have multiple, independent services, which lets you independently deploy, configure, scale, and update pieces of your app.

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project that you created for the tutorial.

To delete the project:

  1. In the GCP Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the GCP Console, go to the Versions page for App Engine.

    Go to the Versions page

  2. Select the checkbox for the non-default app version you want to delete.
  3. Click Delete to delete the app version.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the GCP Console, go to the Cloud Storage Browser page.

    Go to the Cloud Storage Browser page

  2. Click the checkbox for the bucket you want to delete.
  3. Click Delete to delete the bucket.

What's next

Var denne siden nyttig? Si fra hva du synes:

Send tilbakemelding om ...