Running C++ Binaries using Apache Beam for Grid Computing Workloads

This article describes how to run C++ code (and other "non-native" code) in Dataflow to handle high-volume grid-computing workloads.

Overview

Embarrassingly parallel workloads are common in banking, media, and life-sciences enterprises. For parallel workloads like these, enterprises typically deploy a cluster of compute nodes that can each carry out independent processing tasks, in a configuration referred to as grid computing. To process data for parallel workloads like this, you can use Apache Beam.

Beam currently has native SDKs for Java and Python. However, many embarrassingly parallel workloads make use of code written in C++. This article explains how to run C++ binaries (libraries) as external (or "non-native") code using Apache Beam, the Dataflow runner, and other Google Cloud Platform (GCP) services. Using C++ binaries allows you to unlock these types of workloads using fully managed services and gives you the ability to build end-to-end pipelines using a sophisticated directed acyclic graph (DAG) in both batch and streaming mode.

The approach outlined by this article focuses on running C++ binaries. However, the approach is also relevant for code written in other languages where you can compile a standalone binary.

Components used in this scenario

Grid computing applications require data to be distributed to functions running on many cores. This pattern often requires high-concurrency reads and is often followed by a large fan-out of data absorbed by downstream systems.

The architecture described in this article uses the following GCP resources:

  • The Dataflow Runner for Apache Beam. This runner distributes work to the grid nodes with a processing flow derived from a directed acyclic graph (DAG). A single Beam DAG lets you define complex multi-stage pipelines in which parallel pipelined stages can be brought back together using side-inputs or joins.

  • Cloud Storage. This service provides a location to stage the C++ binaries. When large files need to be stored, as in many media use cases, those files also reside in Cloud Storage.

The scenario described in this article also uses Cloud Bigtable and BigQuery. These services are used as both sources and sinks. The following diagram outlines the high-level architecture.

Architecture of a grid computing solution

Other storage systems can also be used. For details, see the list of storage systems and streaming sources in the pipeline I/O page in the Apache Beam documentation.

The Dataflow runner for Apache Beam

Dataflow is a fully-managed GCP service that allows you to transform and enrich data in streaming (real time) and batch (historical) modes with equal reliability and expressiveness. Dataflow is based on Apache Beam.

Cloud Storage

Cloud Storage is a unified object storage that encompasses live data serving, data analytics, machine learning (ML), and data archiving. For the scenario described in this article, Cloud Storage provides access to the C++ binaries. In some use cases, Cloud Storage also provides the location for data needed by the processing phase.

For the high-burst loads required by grid computing, you need to understand Cloud Storage performance characteristics. For more information about Cloud Storage data-serving performance, see Request Rate and Access Distribution Guidelines in the Cloud Storage documentation.

Cloud Bigtable

Cloud Bigtable is a high-performance NoSQL database service optimized for large analytical and operational workloads. Cloud Bigtable is complementary to Dataflow, given that Cloud Bigtable's primary characteristics—low latency reads and writes (6ms on the 90th percentile)—allow it to handle many thousands of concurrent clients and heavy-burst workloads. These features make Cloud Bigtable ideal as a sink as well as a data source within the DoFn function in the processing phase of Dataflow, as explained later.

BigQuery

BigQuery is a fast, economical, and fully managed enterprise data warehouse for large-scale data analytics. Grid results are often used for analysis, and enable you to run large-scale aggregations against the grid's data output.

The Dataflow DAG

The Beam SDK enables you to build expressive DAGs, which in turn allows you to easily create stream or batch multi-stage pipelines. Data movement is handled by the runner, with data expressed as PCollection objects, which are immutable parallel element collections.

The following diagram illustrates this flow.

Flow using DAG

The Apache Beam SDK allows you to define a DAG. In the DAG you can include user-defined code as functions. Normally, the same programming language (Java or Python) is used for both the declaration of the DAG and for the user-defined code. However, for the scenario described here, the user-defined code is in C++.

For more information on creating a DAG, refer to the Dataflow documentation.

General approach for running external code

This section describes the general approach used for executing external (C++) code.

You can use the code to conduct quick experiments without requiring a full build. For a production system, you typically create your own binaries, which gives you the freedom to tune the process to your needs.

The following diagram illustrates the two usages of pipeline data:

  • Data is used to drive the process.
  • Data is acquired during processing and joined to the driver data.

Two stages of pipeline data

Ini this article, primary data (from the source) is referred to as driving data, and secondary data (from the processing phase) is referred to as joining data.

In a finance use case, the driving data might be a few hundred thousand trades. Each trade needs to be processed in conjunction with market data. In that case, the market data is the joining data. In a media use case, the driving data might be images files that require processing but don't need other data sources, and therefore don't use joining data.

Size considerations for driving data

If the size of the driving data element is in the low-megabyte range, you should treat it with the normal Beam paradigm of creating a PCollection object from the source and sending the object to the Beam transforms for processing.

If the size of the driving data element is in the high megabytes or in the gigabytes, as is typical for media, you can put the driving data into Cloud Storage. Then in the starting PCollection object, you reference the storage URI, and only a URI reference to that data used. This approach is discussed more fully in Running the external library later in this article.

Size considerations for joining data

If the joining data is a few hundred megabytes or less, use a side input to get this data to the Beam transforms. The side input sends the data packet to every worker that needs it.

If the joining data is in the gigabyte or terabyte range, use either Cloud Bigtable or Cloud Storage to merge the joining data to the driving data, depending on the nature of the data. Cloud Bigtable is ideal for finance scenarios where market data is often accessed as key-value lookups from Cloud Bigtable. For more information about designing your Cloud Bigtable schema, including recommendations for working with time-series data, see the following Cloud Bigtable documentation:

Running the external code

You can run external code in Beam in many ways. Options include creating a microservice that's called from a DoFn object inside a Dataflow transform, or using JNI when using the Java SDK. The approach described in this article relies on creating a subprocess directly from the DoFn object. Although it's not the most efficient approach, it's robust and simple to implement.

As you review this article, consider the complete end-to-end pipeline. Any inefficiencies in the way the process is executed are offset by the fact that the data movement from the source all the way to the sink is accomplished with a single pipeline. If you compare this approach to others, be sure to to look at the end-to-end times of the pipeline as well as end-to-end costs.

Pulling the binaries into the hosts

When you use a native Apache Beam language (Java or Python), the Beam SDK automatically moves all required code to the workers. However, when you make a call to external code, you need to move the code manually.

Binary files stored in buckets

To move the code, you do the following:

  1. Store the compiled external code, along with versioning information, in Cloud Storage.
  2. In the @Setup method, create a synchronized block to check whether the code file is available on the local resource. Rather than implementing a physical check, you can confirm availability using a static variable when the first thread finishes.
  3. If the file isn't available, use the Cloud Storage client library to pull the file from the Cloud Storage bucket to the local worker. A recommended approach is to use the Beam FileSystems class for this task.
  4. After the file is moved, confirm that the execute bit is set on the code file.
  5. In a production system, check the hash of the binaries to ensure that the file has been copied correctly.

Running the external binaries

Before you can run external code, you need to build a wrapper for it. You write this wrapper in the same language as the external code (for example, C++) or as a shell script. The wrapper lets you pass file handles and implement optimizations as described later in Designing processing for small CPU cycles. Your wrapper does not need to be sophisticated. The following snippet shows an outline of a wrapper in C++.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

This code reads two parameters from the argument list. The first parameter is the location of the return file where the data will be pushed. The second parameter is the data that the code will echo to the user. (In real-world implementations, this code would do more than echo "Hello, world"!)

After you've written the wrapper code, you run the external code by doing the following:

  1. Transmit the data to the external code binaries.
  2. Execute the binaries, catch any errors, and log errors and results.
  3. Handle the logging information.
  4. Capture data from the completed processing.

Transmitting the data to the binaries

To start the process of running the library, you transmit data to the C++ code. This is where you can take advantage of Dataflow integration with other GCP tools. A tool like Cloud Bigtable can deal with very large datasets and handle low-latency and high-concurrency access. This allows thousands of cores to simultaneously access the dataset. In addition, Cloud Bigtable can preprocess data, allowing data shaping, enrichment, and filtering. All of this work can be done in Beam transforms before you run the external code.

For a production system, the recommended path is to use a protocol buffer to encapsulate the input data. The input data can be converted to bytes and can be base64 encoded before being passed to the external library. There are two ways to pass this data to the external library:

  • Small input data. For small data that doesn't exceed the system's maximum length for a command argument, pass the argument in position 2 of the process being built with java.lang.ProcessBuilder.
  • Large input data. For larger data sizes, create a file whose name includes a UUID in order to contain the data required by the process.

Executing the C++ code, catching errors, and logging

Capturing and handling error information is a critical part of this solution. The resources used by the Dataflow runner are ephemeral and it's often difficult to inspect worker log files. You must make sure that you capture and push all useful information to Dataflow runner logging, and that you store the logging data in one or more Cloud Storage buckets.

The recommended approach is to redirect stdout and stderr to files, which allows you to avoid any out-of-memory considerations. For example, in the Dataflow runner that calls the C++ code, you could include lines like the following:

import java.lang.ProcessBuilder.Redirect;
...
    processbuilder.redirectError(Redirect.appendTo(errfile));
    processbuilder.redirectOutput(Redirect.appendTo(outFile));

Handling logging information

Many use cases involve processing millions of elements. Successful processing generates logs with little or no value, so you must make a business decision about retaining the log data. For example, consider these alternatives to simply retaining all log data:

  • If information contained in logs from successful element processing isn't valuable, don't keep it.
  • Create logic that samples the log data, such as sampling only every 10,000 log entries. If the processing is homogenous (that is, many iterations of the code generate essentially identical log data), this approach provides an effective balance between retaining log data and optimizing processing.

For failure conditions, the amount of data dumped to logs can be large. An effective strategy for handling large quantities of error log data is to read the first few lines of the log entry and push just those lines to Cloud Logging. The rest of the log file can be loaded into Cloud Storage buckets. This allows you to look at the first lines of the error logs later and then if needed, dig in to Cloud Storage for the whole file.

It's always worth checking the size of the log file. If the file size is zero, you can safely ignore it or record a simple log message that the file had no data.

Capturing data from completed processing

It's not recommended that you use stdout to pass the result of the computation back to the DoFn function. Other code that your C++ code calls, and even your own code, might send messages to stdout as well, polluting the stdoutput stream that otherwise contains logging data. Instead, it's a better practice to make a change to the C++ wrapper code to allow the code to accept a parameter indicating where to create a .ret file that stores the value. Ideally, this file should be stored in a language-neutral way using protocol buffers, which allows the C++ code to pass an object back to the Java code. The DoFn object can read the result directly from the .ret file and pass the result information on to its own output call.

Additional considerations

This section outlines additional considerations for running C++ code and other non-native binaries in Dataflow.

Designing processing for small CPU cycles

Calling a subprocess has some overhead. Depending on your workload, you might need to do some extra work to reduce the ratio between work being done and the administrative overhead of starting and shutting down the process.

In the media use case, processing for each data element can take many minutes. In that case, the cost of calling the subprocess is insignificant compared to the overall processing time. The best approach in this situation is to have a single element start its own process.

However, in other use cases (for example, finance), processing requires very small units of CPU time (tens of milliseconds). In that case, the overhead of calling the subprocess is disproportionately large. A simple solution to this issue is to make use of Beam's GroupByKey transform to create batches of between 50 and 100 elements to be fed into the process. For example, you can do this:

  • In a DoFn function, create a key-value pair. If you are processing financial trades, you can use the trade number as the key. Or if you don't have a unique number to use as the key, you can generate a checksum from the data and use a modulo function to create partitions of 50 elements.
  • Send the key to a GroupByKey.create function, which returns a KV<key,Iterable<data>> collection that contains the 50 elements that you can then send to the process.

Limiting worker parallelism

When you work with a language like Java or Python that's natively supported in the Dataflow runner, you never need to think about what's happening to the worker. Dataflow has many processes that oversee flow control and threads in batch or stream mode.

However, if you're using an external language like C++, you should be aware that you're doing something a little out of the ordinary by starting subprocesses. In batch mode, the Dataflow runner uses a small ratio of working threads to CPUs compared to using stream mode. It's recommended, especially in stream mode, that you create a semaphore within your class to more directly control an individual worker's parallelism.

For example, with media processing, you might not want hundreds of transcoding elements to be processed in parallel by a single worker. In cases like those, you can create a utility class that provides permits to the DoFn function for the work being carried out. Using this class allows you to take direct control of the worker threads within your pipeline.

Using high-capacity data sinks in GCP

After the data has been processed, it's sent to a data sink. The sink needs to be able to handle the volume of results that are created by your grid processing solution.

The following diagram shows some of the sinks available in GCP when Dataflow is running a grid workload.

Sinks available in GCP

Cloud Bigtable, BigQuery, and Cloud Pub/Sub can all deal with very large streams of data. For example, each Cloud Bigtable node can handle 10,000 inserts per second of up to 1K in size with easy horizontal scalability. This allows a 100-node Cloud Bigtable cluster to absorb 1,000,000 messages per second that are being generated by the Dataflow grid.

What's next

To get example code that demonstrates how to use external libraries with Beam, follow the Add examples of running external libraries on workers Jira request on the Apache site.

To explore some standard patterns for working with DAGs, see these blog posts:

You can also try out other GCP features. Have a look at our tutorials.