Application Development

Running large-scale C++ workloads with Cloud Pub/Sub and GKE

#appdev

For the past couple years, we've been working to make Google Cloud an excellent platform for running C++ workloads. To demonstrate some of the progress we've made so far, we'll show how you can use C++ with both Cloud Pub/Sub and Cloud Storage to build a highly scalable job queue running on Google Kubernetes Engine (GKE).

Such applications often need to distribute work to many compute nodes to achieve good performance. Part of the appeal of public cloud providers is the ability to schedule these kinds of parallel computations on demand, growing the size of the cluster that runs the computation as needed, and shrinking it when it’s no longer running. In this post we will explore how to realize this potential for C++ applications, using Pub/Sub and GKE.

A common pattern for running large-scale computations is a job queue, where work is represented by messages in the queue, and a number of worker applications pull items from the queue for processing. The recently released Pub/Sub (CPS) C++ client library makes it easy to implement this pattern. And with GKE autoscaling, the cluster running such a workload can grow and shrink on demand, saving C++ developers from the tedium of managing the cluster, and leaving them with more time to improve their applications.

Sample application

For our example, we will create millions of Cloud Storage objects; this models a parallel application that performs some computation (e.g., analyze a fraction of some large data set) and saves the results in separate Cloud Storage objects. We believe this workload is easier to understand than some exotic simulation, but it’s not purely artificial: from time-to-time our team needs to create large synthetic data sets for load testing.

Overview

The basic idea is to break the work into a small number of work items, such as, "create 1,000 objects with this prefix". We use a command-line tool to publish these work items to a Pub/Sub topic, which reliably delivers them to any number of worker nodes that execute the work items. We use GKE to run the worker nodes, as GKE automatically scales the cluster based on demand, and restarts the worker nodes if needed after a failure. 

Because Pub/Sub offers at-least-once delivery, and because the worker nodes may be restarted by GKE, it’s important to make these work items idempotent, that is, executing the work item multiple times produces the same objects in Cloud Storage as executing the work item a single time.

The code for this example is available in this GitHub repository.

Posting the work items

A simple C++ struct represents the work item:

  struct work_item {
  std::string task_id;
  std::string bucket;
  std::int64_t object_count;
  bool use_hash_prefix;
};

Converting this struct to a Pub/Sub message takes only a few lines of code:

  pubsub::Message format_work_item(work_item wi) {
  return pubsub::MessageBuilder()
      .SetAttributes({
          {"task_id", std::move(wi.task_id)},
          {"bucket", std::move(wi.bucket)},
          {"object_count", std::to_string(wi.object_count)},
          {"use_hash_prefix", wi.object_count ? "true" : "false"},
      })
      .Build();
}

Since the messages are posted using a Publisher, there’s no need to batch messages, or retry them, as the library takes care of these details:

  void schedule(...) {
  // … … … more details in the GitHub repository … ...
 auto const topic = pubsub::Topic(project_id, topic_id);
 auto publisher =
     pubsub::Publisher(pubsub::MakePublisherConnection(topic, {}));

 for (long offset = 0; offset < object_count; offset += task_size) {
   auto prefix = make_prefix(offset);
   auto const task_objects_count =
        (std::min)(task_size, object_count - offset);
   pending_publish.push_back(
       publisher
            .Publish(format_work_item(
                work_item{bucket, prefix, task_objects_count, use_hash_prefix}))
            .then([](auto f) { return f.get().status(); }));

 }
 // … wait until all events are published …
 for (auto& f : pending_publish) f.get();
  // … … … more details in the GitHub repository … ...
}

Reading the work items

To read the work items, create a Subscriber and associate a callback with it. We configure the subscription to only read a few messages at a time, as we prefer to keep the messages on the Pub/Sub service until the application is ready to act on the message.

  void worker(boost::program_options::variables_map const& vm) {
  // … … … more details in the GitHub repository … …
  auto const subscription = pubsub::Subscription(project_id, subscription_id);
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      subscription,
      pubsub::SubscriberOptions{}
          .set_max_outstanding_messages(concurrency)));

  std::atomic<std::int64_t> counter{0};
  auto handler = [&counter, cl = gcs::Client::CreateDefaultClient().value()](
                     pubsub::Message const& m, pubsub::AckHandler h) {
    process_one_item(cl, m);
    ++counter;
    std::move(h).ack();
  };
  // … … … more details in the GitHub repository … …
}

If the application running this function crashes, or needs to be rescheduled by GKE, the Pub/Sub service re-delivers the messages to a new instance. This produces the same results, as the process_one_item() function produces the same output, even if called multiple times:

  void process_one_item(gcs::Client client, pubsub::Message const& m) {
  auto wi = parse_message(m);
  for (std::int64_t i = 0; i != wi.object_count; ++i) {
    auto object_name = wi.prefix + "/object-" + std::to_string(i);
    auto hashed = hashed_name(wi.use_hash_prefix, std::move(object_name));
    client.InsertObject(wi.bucket, hashed, create_contents(wi, i)).value();
  }
}

Compiling the application

The GitHub repository includes the necessary CMake and Docker scripts to compile this code into a Docker image. We use Cloud Build to run the build, freeing our workstations to do useful work (and definitely not for playing video games):

  gcloud builds submit \
    "--project=${GOOGLE_CLOUD_PROJECT}" \
    "--substitutions=SHORT_SHA=$(git rev-parse --short HEAD)" \
    "--config=cloudbuild.yaml"

The first time you run that command it might take a while, as this builds all the dependencies from source. The intermediate results are cached, and used to save time in subsequent runs.

Deploying to GKE

Once the Docker image is created, you can deploy the application to a previously created GKE cluster. We use a script to generate the yaml file:

  ./deployment.py | kubectl apply -f -

And then you instruct GKE to autoscale as needed:

  kubectl autoscale deployment $DEPLOYMENT_NAME --max 200 --min 1 --cpu-percent 50

This starts at least one replica of the application in the GKE cluster, and configures the cluster to create additional replicates (up to 200) if their CPU load is over 50%. 

Summary

Using Pub/Sub as a work queue can simplify the implementation of parallel C++ applications. Pub/Sub distributes the work items across applications, retries them when a worker node terminates unexpectedly, and/or scales up as the number of worker nodes increases. Furthermore, you can deploy the worker nodes to GKE, which automatically takes care of finding or creating free virtual machines to run your worker application, scheduling your worker application in these virtual machines, and increasing or reducing the number of compute nodes as needed. If your C++ application has a lot of small work items and these can be made idempotent, consider using Pub/Sub and GKE for task scheduling.

To try these techniques in your own environment, just download the example from GitHub. Or just browse the code and use them in your own applications!