Using Cloud Pub/Sub with .NET

Many applications need to do background processing outside of the context of a web request. In this sample, the Bookshelf app sends tasks to a separate background worker for execution. The worker gathers information from the Google Books API and updates the book information in the database. This sample demonstrates how to set up separate services in Google App Engine, how to run a worker process in the App Engine flexible environment, and how to deal with lifecycle events.

This page is part of a multi-page tutorial. To start from the beginning and see instructions for setting up, go to .NET Bookshelf App.

Configuring settings

  1. In the getting-started-dotnet\aspnet\5-pubsub directory, double click 5-pubsub to open the sample app in Visual Studio.

  2. In the Solution Explorer pane, click bookshelf > Web.config.

  3. In bookshelf\Web.config:

    • Set GoogleCloudSamples:ProjectId to your project ID.

    • Set the value of GoogleCloudSamples:BookStore to the same value you used during the Using Structured Data part of this tutorial.

    • If you used Cloud SQL or SQL Server during the structured data step, find the <connectionStrings> XML element and set the connectionString to the same value you used during that step.

    • Set GoogleCloudSamples:BucketName to the name of the Cloud Storage bucket you created previously.

  4. Save and close bookshelf\Web.config.

  5. In the Solution Explorer pane, click worker > Web.config.

  6. In worker\Web.config:

    • Set GoogleCloudSamples:ProjectId to your project ID.

    • Set the value of GoogleCloudSamples:BookStore to the same value you used during the Using Structured Data step of this tutorial.

    • If you used Cloud SQL or SQL Server during the structured data step, find the <connectionStrings> XML element and set the connectionString to the same value you used during that step.

    • Set GoogleCloudSamples:BucketName to the name of your Cloud Storage bucket.

  7. Save and close worker\Web.config.

Running the app on your local machine

In Visual Studio, in the Solution Explorer pane, right-click Solution, and choose Set StartUp Projects.

Set start up projects

Select Multiple startup projects, and set the Action for bookshelf and worker to Start. Click OK.

Set bookshelf and worker to start In Visual Studio, press F5 to run the projects.

Now add some well-known books to the bookshelf. If you have both the application and worker instance running locally, you can watch the worker update the book information in the background.

Deploying the Bookshelf app to Compute Engine

  1. In Visual Studio, in the Solution Explorer pane, right click bookshelf, and choose Publish. Publish application

  2. Create a new custom profile as you did in the Using Structured Data part of this tutorial.

  3. Click Publish.

Deploying the worker to Compute Engine

  1. In Visual Studio, in the Solution Explorer pane, right click worker, and choose Publish.

  2. Create a new custom profile as you did in the Using Structured Data part of this tutorial.

  3. Click Publish.

Running the app on Compute Engine

In your web browser, enter the address of your first Compute Engine instance.

Application structure

This diagram shows the app's components and how they fit together.

Auth sample structure

Understanding the code

This section walks you through how to create queue, add tasks to the queue, and use the worker to process tasks.

Creating a queue

A Pub/Sub topic and subscription together form a queue. Diagram of a topic and subscription forming a queue

A QueueMessage contains the ID of a book to look up in Google Books API:

private class QueueMessage
{
    public long BookId;
};

QueueMessages are added to a topic named book-process-queue. A subscription named shared-worker-subscription subscribes to this topic. The worker watches this subscription for tasks to execute.

The full topic and subscription paths include the project name:

_topicName = new TopicName(projectId, options.TopicId);
_subscriptionName = new SubscriptionName(projectId, options.SubscriptionId);

CreateTopicAndSubscription() does what it says:

public void CreateTopicAndSubscription()
{
    try
    {
        _pub.CreateTopic(_topicName);
        _logger.LogVerbose("Created topic " + _topicName);
    }
    catch (Grpc.Core.RpcException e)
    when (e.Status.StatusCode == Grpc.Core.StatusCode.AlreadyExists)
    {
        // The topic already exists.  Ok.
        _logger.LogError(_topicName + " already exists", e);
    }
    try
    {
        _sub.CreateSubscription(_subscriptionName, _topicName, null, 0);
        _logger.LogVerbose("Created subscription " + _subscriptionName);
    }
    catch (Grpc.Core.RpcException e)
    when (e.Status.StatusCode == Grpc.Core.StatusCode.AlreadyExists)
    {
        // The subscription already exists.  Ok.
        _logger.LogError(_subscriptionName + " already exists", e);
    }
}

Queueing tasks

The QueueMessage is JSON-encoded, and the resulting JSON is base64-encoded. While this is overkill for encoding a simple long, this is the preferred way to encode messages so that they are compatible the Pub/Sub API:

public void EnqueueBook(long bookId)
{
    var message = new QueueMessage() { BookId = bookId };
    var json = JsonConvert.SerializeObject(message);
    _pub.Publish(_topicName, new[] { new PubsubMessage()
    {
        Data = Google.Protobuf.ByteString.CopyFromUtf8(json)
    } });
}

The worker

The worker is a separate application that listens to Pub/Sub events. This splits the application into two independent processes that communicate by using Pub/Sub, instead of directly with each other.

Processing books

To process a book, the task fetches the book by its ID, finds additional information, and then saves the updated information back to the database:

public void ProcessBook(IBookStore bookStore, long bookId)
{
    var book = bookStore.Read(bookId);
    _logger.LogVerbose($"Found {book.Title}.  Updating.");
    var query = "https://www.googleapis.com/books/v1/volumes?q="
        + Uri.EscapeDataString(book.Title);
    var response = WebRequest.Create(query).GetResponse();
    var reader = new StreamReader(response.GetResponseStream());
    var json = reader.ReadToEnd();
    UpdateBookFromJson(json, book);
    bookStore.Update(book);
}

The function PullOnce reads messages from the subscription, and calls ProcessBook for every message:

        private void PullOnce(Action<long> callback, CancellationToken cancellationToken)
        {
            _logger.LogVerbose($"Pulling messages from {_subscriptionName}...");
            // Pull some messages from the subscription.

            var response = _sub.Pull(_subscriptionName, false, 3,
                CallSettings.FromCallTiming(
                    CallTiming.FromExpiration(
                        Expiration.FromTimeout(
                            TimeSpan.FromSeconds(90)))));
            if (response.ReceivedMessages == null)
            {
                // HTTP Request expired because the queue was empty.  Ok.
                _logger.LogVerbose("Pulled no messages.");
                return;
            }
            _logger.LogVerbose($"Pulled {response.ReceivedMessages.Count} messages.");
            foreach (var message in response.ReceivedMessages)
            {
                try
                {
                    // Unpack the message.
                    byte[] json = message.Message.Data.ToByteArray();
                    var qmessage = JsonConvert.DeserializeObject<QueueMessage>(
                        Encoding.UTF8.GetString(json));
                    // Invoke ProcessBook().
                    callback(qmessage.BookId);
                }
                catch (Exception e)
                {
                    _logger.LogError("Error processing book.", e);
                }
            }
            // Acknowledge the message so we don't see it again.
            var ackIds = new string[response.ReceivedMessages.Count];
            for (int i = 0; i < response.ReceivedMessages.Count; ++i)
                ackIds[i] = response.ReceivedMessages[i].AckId;
            _sub.Acknowledge(_subscriptionName, ackIds);
        }

Monitor your resources on the go

Get the Google Cloud Console app to help you manage your projects.

Send feedback about...