This article describes how to run C++ code (and other "non-native" code) in Dataflow to handle high-volume grid-computing workloads.
Overview
Embarrassingly parallel workloads are common in banking, media, and life-sciences enterprises. For parallel workloads like these, enterprises typically deploy a cluster of compute nodes that can each carry out independent processing tasks, in a configuration referred to as grid computing. To process data for parallel workloads like this, you can use Apache Beam.
Beam currently has native SDKs for Java and Python. However, many embarrassingly parallel workloads make use of code written in C++. This article explains how to run C++ binaries (libraries) as external (or "non-native") code using Apache Beam, the Dataflow runner, and other Google Cloud Platform (GCP) services. Using C++ binaries allows you to unlock these types of workloads using fully managed services and gives you the ability to build end-to-end pipelines using a sophisticated directed acyclic graph (DAG) in both batch and streaming mode.
The approach outlined by this article focuses on running C++ binaries. However, the approach is also relevant for code written in other languages where you can compile a standalone binary.
Components used in this scenario
Grid computing applications require data to be distributed to functions running on many cores. This pattern often requires high-concurrency reads and is often followed by a large fan-out of data absorbed by downstream systems.
The architecture described in this article uses the following GCP resources:
The Dataflow Runner for Apache Beam. This runner distributes work to the grid nodes with a processing flow derived from a directed acyclic graph (DAG). A single Beam DAG lets you define complex multi-stage pipelines in which parallel pipelined stages can be brought back together using side-inputs or joins.
Cloud Storage. This service provides a location to stage the C++ binaries. When large files need to be stored, as in many media use cases, those files also reside in Cloud Storage.
The scenario described in this article also uses Cloud Bigtable and BigQuery. These services are used as both sources and sinks. The following diagram outlines the high-level architecture.
Other storage systems can also be used. For details, see the list of storage systems and streaming sources in the pipeline I/O page in the Apache Beam documentation.
The Dataflow runner for Apache Beam
Dataflow is a fully-managed GCP service that allows you to transform and enrich data in streaming (real time) and batch (historical) modes with equal reliability and expressiveness. Dataflow is based on Apache Beam.
Cloud Storage
Cloud Storage is a unified object storage that encompasses live data serving, data analytics, machine learning (ML), and data archiving. For the scenario described in this article, Cloud Storage provides access to the C++ binaries. In some use cases, Cloud Storage also provides the location for data needed by the processing phase.
For the high-burst loads required by grid computing, you need to understand Cloud Storage performance characteristics. For more information about Cloud Storage data-serving performance, see Request Rate and Access Distribution Guidelines in the Cloud Storage documentation.
Cloud Bigtable
Cloud Bigtable is a high-performance NoSQL database service optimized
for large analytical and operational workloads. Cloud Bigtable is
complementary to Dataflow, given that Cloud Bigtable's
primary characteristics—low latency reads and writes (6ms on the 90th
percentile)—allow it to handle many thousands of concurrent clients and
heavy-burst workloads. These features make Cloud Bigtable
ideal as a sink as well as a data source within the
DoFn
function in the processing phase of Dataflow, as explained later.
BigQuery
BigQuery is a fast, economical, and fully managed enterprise data warehouse for large-scale data analytics. Grid results are often used for analysis, and enable you to run large-scale aggregations against the grid's data output.
The Dataflow DAG
The Beam SDK enables you to build expressive DAGs, which in turn allows you to
easily create stream or batch multi-stage pipelines. Data movement is handled by
the runner, with data expressed as
PCollection
objects, which are immutable parallel element collections.
The following diagram illustrates this flow.
The Apache Beam SDK allows you to define a DAG. In the DAG you can include user-defined code as functions. Normally, the same programming language (Java or Python) is used for both the declaration of the DAG and for the user-defined code. However, for the scenario described here, the user-defined code is in C++.
For more information on creating a DAG, refer to the Dataflow documentation.
General approach for running external code
This section describes the general approach used for executing external (C++) code.
You can use the code to conduct quick experiments without requiring a full build. For a production system, you typically create your own binaries, which gives you the freedom to tune the process to your needs.
The following diagram illustrates the two usages of pipeline data:
- Data is used to drive the process.
- Data is acquired during processing and joined to the driver data.
Ini this article, primary data (from the source) is referred to as driving data, and secondary data (from the processing phase) is referred to as joining data.
In a finance use case, the driving data might be a few hundred thousand trades. Each trade needs to be processed in conjunction with market data. In that case, the market data is the joining data. In a media use case, the driving data might be images files that require processing but don't need other data sources, and therefore don't use joining data.
Size considerations for driving data
If the size of the driving data element is in the low-megabyte range, you should
treat it with the normal Beam paradigm of creating a PCollection
object from
the source and sending the object to the Beam transforms for processing.
If the size of the driving data element is in the high megabytes or in the
gigabytes, as is typical for media, you can put the driving data into
Cloud Storage. Then in the starting PCollection
object, you reference the
storage URI, and only a URI reference to that data used. This approach is
discussed more fully in
Running the external library
later in this article.
Size considerations for joining data
If the joining data is a few hundred megabytes or less, use a side input to get this data to the Beam transforms. The side input sends the data packet to every worker that needs it.
If the joining data is in the gigabyte or terabyte range, use either Cloud Bigtable or Cloud Storage to merge the joining data to the driving data, depending on the nature of the data. Cloud Bigtable is ideal for finance scenarios where market data is often accessed as key-value lookups from Cloud Bigtable. For more information about designing your Cloud Bigtable schema, including recommendations for working with time-series data, see the following Cloud Bigtable documentation:
Running the external code
You can run external code in Beam in many ways. Options include creating a
microservice that's called from a DoFn
object inside a Dataflow
transform, or using
JNI
when using the Java SDK. The approach described in this article relies on
creating a subprocess directly from the DoFn
object. Although it's not the
most efficient approach, it's robust and simple to implement.
As you review this article, consider the complete end-to-end pipeline. Any inefficiencies in the way the process is executed are offset by the fact that the data movement from the source all the way to the sink is accomplished with a single pipeline. If you compare this approach to others, be sure to to look at the end-to-end times of the pipeline as well as end-to-end costs.
Pulling the binaries into the hosts
When you use a native Apache Beam language (Java or Python), the Beam SDK automatically moves all required code to the workers. However, when you make a call to external code, you need to move the code manually.
To move the code, you do the following:
- Store the compiled external code, along with versioning information, in Cloud Storage.
- In the
@Setup
method, create a synchronized block to check whether the code file is available on the local resource. Rather than implementing a physical check, you can confirm availability using a static variable when the first thread finishes. - If the file isn't available, use the Cloud Storage client library to
pull the file from the Cloud Storage bucket to the local worker. A
recommended approach is to use the Beam
FileSystems
class for this task. - After the file is moved, confirm that the execute bit is set on the code file.
- In a production system, check the hash of the binaries to ensure that the file has been copied correctly.
Running the external binaries
Before you can run external code, you need to build a wrapper for it. You write this wrapper in the same language as the external code (for example, C++) or as a shell script. The wrapper lets you pass file handles and implement optimizations as described later in Designing processing for small CPU cycles. Your wrapper does not need to be sophisticated. The following snippet shows an outline of a wrapper in C++.
int main(int argc, char* argv[])
{
if(argc < 3){
std::cerr << "Required return file and data to process" << '\n';
return 1;
}
std::string returnFile = argv[1];
std::string word = argv[2];
std::ofstream myfile;
myfile.open (returnFile);
myfile << word;
myfile.close();
return 0;
}
This code reads two parameters from the argument list. The first parameter is the location of the return file where the data will be pushed. The second parameter is the data that the code will echo to the user. (In real-world implementations, this code would do more than echo "Hello, world"!)
After you've written the wrapper code, you run the external code by doing the following:
- Transmit the data to the external code binaries.
- Execute the binaries, catch any errors, and log errors and results.
- Handle the logging information.
- Capture data from the completed processing.
Transmitting the data to the binaries
To start the process of running the library, you transmit data to the C++ code. This is where you can take advantage of Dataflow integration with other GCP tools. A tool like Cloud Bigtable can deal with very large datasets and handle low-latency and high-concurrency access. This allows thousands of cores to simultaneously access the dataset. In addition, Cloud Bigtable can preprocess data, allowing data shaping, enrichment, and filtering. All of this work can be done in Beam transforms before you run the external code.
For a production system, the recommended path is to use a protocol buffer to encapsulate the input data. The input data can be converted to bytes and can be base64 encoded before being passed to the external library. There are two ways to pass this data to the external library:
- Small input data. For small data that doesn't exceed the system's
maximum length for a command argument, pass the argument in position 2 of
the process being built with
java.lang.ProcessBuilder
. - Large input data. For larger data sizes, create a file whose name includes a UUID in order to contain the data required by the process.
Executing the C++ code, catching errors, and logging
Capturing and handling error information is a critical part of this solution. The resources used by the Dataflow runner are ephemeral and it's often difficult to inspect worker log files. You must make sure that you capture and push all useful information to Dataflow runner logging, and that you store the logging data in one or more Cloud Storage buckets.
The recommended approach is to redirect stdout
and stderr
to files, which
allows you to avoid any out-of-memory considerations. For example, in the
Dataflow runner that calls the C++ code, you could include lines
like the following:
import java.lang.ProcessBuilder.Redirect;
...
processbuilder.redirectError(Redirect.appendTo(errfile));
processbuilder.redirectOutput(Redirect.appendTo(outFile));
Handling logging information
Many use cases involve processing millions of elements. Successful processing generates logs with little or no value, so you must make a business decision about retaining the log data. For example, consider these alternatives to simply retaining all log data:
- If information contained in logs from successful element processing isn't valuable, don't keep it.
- Create logic that samples the log data, such as sampling only every 10,000 log entries. If the processing is homogenous (that is, many iterations of the code generate essentially identical log data), this approach provides an effective balance between retaining log data and optimizing processing.
For failure conditions, the amount of data dumped to logs can be large. An effective strategy for handling large quantities of error log data is to read the first few lines of the log entry and push just those lines to Cloud Logging. The rest of the log file can be loaded into Cloud Storage buckets. This allows you to look at the first lines of the error logs later and then if needed, dig in to Cloud Storage for the whole file.
It's always worth checking the size of the log file. If the file size is zero, you can safely ignore it or record a simple log message that the file had no data.
Capturing data from completed processing
It's not recommended that you use stdout
to pass the result of the computation
back to the DoFn
function. Other code that your C++ code calls, and even your
own code, might send messages to stdout
as well, polluting the stdoutput
stream that otherwise contains logging data. Instead, it's a better practice to
make a change to the C++ wrapper code to allow the code to accept a parameter
indicating where to create a .ret
file that stores the value. Ideally, this
file should be stored in a language-neutral way using
protocol buffers,
which allows the C++ code to pass an object back to the Java code. The DoFn
object can read the result directly from the .ret
file and pass the result
information on to its own output
call.
Additional considerations
This section outlines additional considerations for running C++ code and other non-native binaries in Dataflow.
Designing processing for small CPU cycles
Calling a subprocess has some overhead. Depending on your workload, you might need to do some extra work to reduce the ratio between work being done and the administrative overhead of starting and shutting down the process.
In the media use case, processing for each data element can take many minutes. In that case, the cost of calling the subprocess is insignificant compared to the overall processing time. The best approach in this situation is to have a single element start its own process.
However, in other use cases (for example, finance), processing requires very
small units of CPU time (tens of milliseconds). In that case, the overhead of
calling the subprocess is disproportionately large. A simple solution to this
issue is to make use of Beam's
GroupByKey
transform to create batches of between 50 and 100 elements to be fed into the
process. For example, you can do this:
- In a
DoFn
function, create a key-value pair. If you are processing financial trades, you can use the trade number as the key. Or if you don't have a unique number to use as the key, you can generate a checksum from the data and use a modulo function to create partitions of 50 elements. - Send the key to a
GroupByKey.create
function, which returns aKV<key,Iterable<data>>
collection that contains the 50 elements that you can then send to the process.
Limiting worker parallelism
When you work with a language like Java or Python that's natively supported in the Dataflow runner, you never need to think about what's happening to the worker. Dataflow has many processes that oversee flow control and threads in batch or stream mode.
However, if you're using an external language like C++, you should be aware that you're doing something a little out of the ordinary by starting subprocesses. In batch mode, the Dataflow runner uses a small ratio of working threads to CPUs compared to using stream mode. It's recommended, especially in stream mode, that you create a semaphore within your class to more directly control an individual worker's parallelism.
For example, with media processing, you might not want hundreds of transcoding
elements to be processed in parallel by a single worker. In cases like those,
you can create a utility class that provides permits to the DoFn
function for
the work being carried out. Using this class allows you to take direct control
of the worker threads within your pipeline.
Using high-capacity data sinks in GCP
After the data has been processed, it's sent to a data sink. The sink needs to be able to handle the volume of results that are created by your grid processing solution.
The following diagram shows some of the sinks available in GCP when Dataflow is running a grid workload.
Cloud Bigtable, BigQuery, and Cloud Pub/Sub can all deal with very large streams of data. For example, each Cloud Bigtable node can handle 10,000 inserts per second of up to 1K in size with easy horizontal scalability. This allows a 100-node Cloud Bigtable cluster to absorb 1,000,000 messages per second that are being generated by the Dataflow grid.
What's next
To get example code that demonstrates how to use external libraries with Beam, follow the Add examples of running external libraries on workers Jira request on the Apache site.
To explore some standard patterns for working with DAGs, see these blog posts:
- Guide to common Dataflow use-case patterns, Part 1
- Guide to common Dataflow use-case patterns, Part 2
You can also try out other GCP features. Have a look at our tutorials.