Writing and Responding to Pub/Sub Messages

Google Cloud Pub/Sub provides reliable, many-to-many, asynchronous messaging between applications. Publisher applications can send messages to a topic, and other applications can subscribe to that topic to receive the messages.

Prerequisites

Follow the instructions in "Hello, World!" for Go on App Engine to set up your environment and project, and to understand how App Engine Go apps are structured. Write down and save your project ID, because you will need it to run the sample application described in this document.

Clone the sample app

Copy the sample apps to your local machine, and cd to the pubsub directory:

go get -u -d github.com/GoogleCloudPlatform/golang-samples/appengine_flexible/pubsub
cd $GOPATH/src/github.com/GoogleCloudPlatform/golang-samples/appengine_flexible/pubsub

Create a topic and subscription

Create a topic and subscription, which includes specifying the endpoint to which the Pub/Sub server should send requests:

gcloud alpha pubsub topics create [your-topic-name]
gcloud alpha pubsub subscriptions create [your-subscription-name] \
--topic [your-topic-name] \
--push-endpoint \
https://[your-app-id].appspot.com/pubsub/push?token=[your-token] \
--ack-deadline 30

Edit app.yaml

Edit app.yaml to set the environment variables for your project ID, and topic:

env_variables:
  GCLOUD_PROJECT: your-project-id
  PUBSUB_TOPIC: your-topic

Code review

The sample app uses the Google Cloud Client Libraries.

The sample app uses the values you set in the app.yaml file to configure environment variables:

os.Getenv("GCLOUD_PROJECT")
os.Getenv("PUBSUB_TOPIC")

The messages received by this instance are stored in a slice:

messages   []string

The pushHandler function receives pushed messages and adds them to the messages slice:

func pushHandler(w http.ResponseWriter, r *http.Request) {
	msg := &pushRequest{}
	if err := json.NewDecoder(r.Body).Decode(msg); err != nil {
		http.Error(w, fmt.Sprintf("Could not decode body: %v", err), http.StatusBadRequest)
		return
	}

	messagesMu.Lock()
	defer messagesMu.Unlock()
	// Limit to ten.
	messages = append(messages, string(msg.Message.Data))
	if len(messages) > maxMessages {
		messages = messages[len(messages)-maxMessages:]
	}
}

The publishHandler function publishes new messages to the topic.

func publishHandler(w http.ResponseWriter, r *http.Request) {
	ctx := context.Background()

	msg := &pubsub.Message{
		Data: []byte(r.FormValue("payload")),
	}

	if _, err := topic.Publish(ctx, msg); err != nil {
		http.Error(w, fmt.Sprintf("Could not publish message: %v", err), 500)
		return
	}

	fmt.Fprint(w, "Message published.")
}

Run the sample locally

When running locally, you can use the Google Cloud SDK to provide authentication to use Google Cloud APIs. Assuming you set up your environment as described in Prerequisites, you have already run the gcloud init command, which provides this authentication.

Then set environment variables before starting your application:

export GCLOUD_PROJECT=[your-project-id]
export PUBSUB_TOPIC=[your-topic]
go run pubsub.go

Run on App Engine

To deploy the demo app on App Engine using gcloud, run the following:

aedeploy gcloud app deploy

You can now access the application at https://[your-proj-id].appspot.com. You can use the form to submit messages, but there's no guarantee of which instance of your application will receive the notification. You can send multiple messages and refresh the page to see the received message.

Send feedback about...

App Engine flexible environment for Go