Using Cloud Pub/Sub with Go

Many applications need to do background processing outside of the context of a web request. In this sample, the Bookshelf app sends tasks to a separate background worker for execution. The worker gathers information from the Google Books API and updates the book information in the database. This sample demonstrates how to set up separate services in Google App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multi-page tutorial. To start from the beginning and see instructions for setting up, go to Go Bookshelf App.

Configuring settings

  1. Go to the directory that contains the sample code:

    Linux/Mac OS X

    cd $GOPATH/src/github.com/GoogleCloudPlatform/golang-samples/getting-started/bookshelf
    

    Windows

    cd %GOPATH%\src\github.com\GoogleCloudPlatform\golang-samples\getting-started\bookshelf
    

  2. Open config.go for editing.

  3. Uncomment this line:

    // PubsubClient, err = configurePubsub("<your-project-id>")
    
  4. Replace "<your-project-id>" with your project ID.

  5. Save and close config.go.

Running the app on your local machine

  1. Run the app to start a local web server:

    cd app
    go run app.go auth.go template.go
    
  2. In your web browser, enter this address:

    http://localhost:8080

    You can see the app running, but don't add any books yet.

  3. The worker can be started in the same way as the frontend app after you set the PORT environment variable:

    Linux/Mac OS X

    cd $GOPATH/src/github.com/GoogleCloudPlatform/golang-samples/getting-started/bookshelf
    cd pubsub_worker
    PORT=8081 go run worker.go
    

    Windows

    cd %GOPATH%\src\github.com\GoogleCloudPlatform\golang-samples\getting-started\bookshelf
    cd pubsub_worker
    set PORT=8081
    go run worker.go
    

You can browse to http://localhost:8081 to reach the worker instance. The worker's web page shows you status about the number of books that it has processed.

Now add some well-known books to the bookshelf. If you have both the application and worker instance running locally, you can watch the worker update the book information in the background.

It's important that you run the worker at least once before you add books. The worker establishes a Pub/Sub subscription to listen for events. Without the subscription, events published to the topic will be lost and you won't see any change to the bookshelf data. After the subscription exists, the events are queued even if there is no worker currently listening for events. When a worker comes online, Pub/Sub will deliver any queued events.

Press Control+C to exit the local web server.

Deploying the app to the App Engine flexible environment

  1. In the app directory, enter these commands to deploy the sample:

    Linux/Mac OS X

    $ cd $GOPATH/src/github.com/GoogleCloudPlatform/golang-samples/getting-started/bookshelf
    
    # Deploy the worker
    cd pubsub_worker
    aedeploy gcloud app deploy
    
    # Deploy the main app
    cd ../app
    aedeploy gcloud app deploy
    

    Windows

    cd %GOPATH%\src\github.com\GoogleCloudPlatform\golang-samples\getting-started\bookshelf
    
    # Deploy the worker
    cd pubsub_worker
    aedeploy gcloud app deploy
    
    # Deploy the main app
    cd ..\app
    aedeploy gcloud app deploy
    

    Those commands deploy both the frontend and worker modules. This means that you don't have to start the worker separately like you did when you ran the app on your computer. You can read more about modules in the App Engine documentation.

  2. In your web browser, enter this address. Replace [YOUR_PROJECT_ID] with your project ID:

    https://[YOUR_PROJECT_ID].appspot.com
    
  3. In your web browser, enter this address to reach the worker instance:

    https://worker-dot-[YOUR_PROJECT_ID].appspot.com
    

If you update your app, you can deploy the updated version by entering the same command you used to deploy the app the first time. The new deployment creates a new version of your app and promotes it to the default version. The older versions of your app remain, as do their associated VM instances. Be aware that all of these app versions and VM instances are billable resources.

You can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the Cloud Platform Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

For complete information about cleaning up billable resources, see the Cleaning up section in the final step of this tutorial.

Application structure

This diagram shows the application components and how they connect to each other.

Cloud Pub/Sub sample structure

The application publishes events to Cloud Pub/Sub whenever a book is updated in the database. The worker, running separately, listens for these events. When an event is received, the worker makes a request to the Books API for information about the book and updates the book's record in the database. After the record is updated, you can refresh the book's info page and see the new information.

Understanding the code

This section walks you through the application code and explains how it works.

Creating a topic and subscription

Before sending and receiving messages to Cloud Pub/Sub, you need to create a topic and a subscription. Cloud Pub/Sub allows you to publish messages on one topic to many subscribers. In this application, though, there is only one subscriber.

This code creates the topic and the subscription, using constant topic and subscription names. It's safe to run these two lines of code repeatedly. For example, the Bookshelf app's worker.go calls them in the main function:

// ignore returned errors, which will be "already exists". If they're fatal
// errors, then following calls (e.g. in the subscribe function) will also fail.
topic, _ := bookshelf.PubsubClient.CreateTopic(ctx, bookshelf.PubsubTopicID)
subscription, _ = bookshelf.PubsubClient.CreateSubscription(ctx, subName, topic, 0, nil)

Publishing events

The application sends an event, containing the ID of the book that was updated, to the topic. This allows the worker to know which book should be processed:

// publishUpdate notifies Pub/Sub subscribers that the book identified with
// the given ID has been added/modified.
func publishUpdate(bookID int64) {
	if bookshelf.PubsubClient == nil {
		return
	}

	ctx := context.Background()

	b, err := json.Marshal(bookID)
	if err != nil {
		return
	}
	topic := bookshelf.PubsubClient.Topic(bookshelf.PubsubTopicID)
	_, err = topic.Publish(ctx, &pubsub.Message{Data: b}).Get(ctx)
	log.Printf("Published update to Pub/Sub for Book ID %d: %v", bookID, err)
}

The application calls the publishUpdate function whenever a user creates or updates a book.

The worker application

The worker is a separate application that listens to pub/sub events instead of serving a user-facing web application. This splits the application into two independent services that communicate by using pub/sub, instead of directly with each other. Separating the services allows you to configure and scale the number of front-end and worker instances separately.

Listening for events

To receive messages, a worker must create or use a subscription to a topic. The front end publishes events to a specific topic, and the worker subscribes to that same topic.

The worker uses the subscribe function to listen to events and trigger processing:

func subscribe() {
	ctx := context.Background()
	err := subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		var id int64
		if err := json.Unmarshal(msg.Data, &id); err != nil {
			log.Printf("could not decode message data: %#v", msg)
			msg.Ack()
			return
		}

		log.Printf("[ID %d] Processing.", id)
		if err := update(id); err != nil {
			log.Printf("[ID %d] could not update: %v", id, err)
			msg.Nack()
			return
		}

		countMu.Lock()
		count++
		countMu.Unlock()

		msg.Ack()
		log.Printf("[ID %d] ACK", id)
	})
	if err != nil {
		log.Fatal(err)
	}
}

Processing books

To process the book, the worker fetches the book by its ID, finds additional information, and then saves the updated information back to the database:

// update retrieves the book with the given ID, finds metata from the Books
// server and updates the database with the book's details.
func update(bookID int64) error {
	book, err := bookshelf.DB.GetBook(bookID)
	if err != nil {
		return err
	}

	vols, err := booksClient.Volumes.List(book.Title).Do()
	if err != nil {
		return err
	}

	if len(vols.Items) == 0 {
		return nil
	}

	info := vols.Items[0].VolumeInfo
	book.Title = info.Title
	book.Author = strings.Join(info.Authors, ", ")
	book.PublishedDate = info.PublishedDate
	if book.Description == "" {
		book.Description = info.Description
	}
	if book.ImageURL == "" && info.ImageLinks != nil {
		url := info.ImageLinks.Thumbnail
		// Replace http with https to prevent Content Security errors on the page.
		book.ImageURL = strings.Replace(url, "http://", "https://", 1)
	}

	return bookshelf.DB.UpdateBook(book)
}

Running on Cloud Platform

By default, an application running in the App Engine flexible environment must respond to HTTP requests to indicate a healthy status for the health checker. To satisfy this requirement, the Bookshelf pub/sub worker listens for HTTP requests and responds with the number of books processed:

// Publish a count of processed requests to the server homepage.
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
	countMu.Lock()
	defer countMu.Unlock()
	fmt.Fprintf(w, "This worker has processed %d books.", count)
})

port := "8080"
if p := os.Getenv("PORT"); p != "" {
	port = p
}
log.Fatal(http.ListenAndServe(":"+port, nil))

The worker needs its own module configuration. This configuration is similar to the app.yaml file that is used for the front end, but the key difference is the module: worker clause. App Engine applications can have multiple, independent services. This means that you can independently deploy, configure, scale, and update pieces of your application easily.

runtime: go
env: flex
api_version: 1
service: worker

resources:
  cpu: .5
  memory_gb: 1.3
  disk_size_gb: 10

automatic_scaling:
  min_num_instances: 1
  max_num_instances: 2
  cool_down_period_sec: 60
  cpu_utilization:
    target_utilization: 0.75

Cleaning up

To avoid incurring charges to your Google Cloud Platform account for the resources used in this tutorial:

Delete the project

The easiest way to eliminate billing is to delete the project you created for the tutorial.

To delete the project:

  1. In the Cloud Platform Console, go to the Projects page.

    Go to the Projects page

  2. In the project list, select the project you want to delete and click Delete project. After selecting the checkbox next to the project name, click
      Delete project
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete non-default versions of your app

If you don't want to delete your project, you can reduce costs by deleting the non-default versions of your app.

To delete an app version:

  1. In the Cloud Platform Console, go to the App Engine Versions page.

    Go to the Versions page

  2. Click the checkbox next to the non-default app version you want to delete.
  3. Click the Delete button at the top of the page to delete the app version.

Delete your Cloud SQL instance

To delete a Cloud SQL instance:

  1. In the Cloud Platform Console, go to the SQL Instances page.

    Go to the SQL Instances page

  2. Click the name of the SQL instance you want to delete.
  3. Click the Delete button at the top of the page to delete the instance.

Delete your Cloud Storage bucket

To delete a Cloud Storage bucket:

  1. In the Cloud Platform Console, go to the Cloud Storage browser.

    Go to the Cloud Storage browser

  2. Click the checkbox next to the bucket you want to delete.
  3. Click the Delete button at the top of the page to delete the bucket.

What's next

Learn how to run the Go Bookshelf sample on Compute Engine.

Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.

Send feedback about...