Processing Logs at Scale Using Cloud Dataflow

Google Cloud Platform provides the scalable infrastructure you need to handle large and diverse log-analysis operations. In this solution, you will learn how to use Cloud Platform to build analytical pipelines that process log entries from multiple sources. You will combine the log data in ways that help you extract meaningful information and persist insights derived from the data, which can be used for analysis, review, and reporting.

Overview

As your application grows more complex, gleaning insights from the data captured in your logs becomes more challenging. Logs come from an increasing number of sources, so they can be hard to collate and query for useful information. Building, operating, and maintaining your own infrastructure to analyze log data at scale can require extensive expertise in running distributed systems and storage. This kind of dedicated infrastructure often represents a one-time capital expense, resulting in fixed capacity, which makes it hard to scale beyond the initial investment. These limitations can impact your business because they lead to slowdowns in generating meaningful, actionable insights from your data.

This solution shows you how to move past these limitations by using Cloud Platform. In this solution, a set of sample microservices run on Google Container Engine to implement a website. Stackdriver Logging collects logs from these services and then saves them to Google Cloud Storage buckets. Google Cloud Dataflow then processes the logs by extracting metadata and computing basic aggregations. The Cloud Dataflow pipeline is designed to process the log elements daily to generate aggregate metrics for server response times, based on the logs for each day. Finally, the output from Cloud Dataflow is loaded into Google BigQuery tables, where it can be analyzed to provide business intelligence. This solution also explains how you can change the pipeline to run in streaming mode, for low-latency, asynchronous log processing.

The solution uses several Cloud Platform components

The included tutorial provides a sample Cloud Dataflow pipeline, a sample web application, configuration information, and steps to run the sample.

About the app

The sample deployment models a shopping app. In this sample, users can visit the homepage of a retail site, browse for individual products, and then try to locate the products in nearby brick-and-mortar stores. The app consists of three microservices: HomeService, BrowseService, and LocateService. Each service is available from an API endpoint in a shared namespace. Users access the services by appending /home, /browse, and /locate to the base URL.

The application is configured to log incoming HTTP requests to stdout.

Using Container Engine with Stackdriver Logging

In this example, the microservices run in a Container Engine cluster, which is a group of Google Compute Engine instances, or nodes, that run Kubernetes. By default, Container Engine configures each node to provide a number of services, including monitoring, health checking, and centralized logging. This solution uses this built-in support for Stackdriver Logging to send logs from each microservice to Cloud Storage. As an alternative for applications that log information to files, not covered by this solution, you can configure cluster-level logging with Kubernetes.

Each microservice runs on an individual pod in the cluster. Each pod runs on a node and is exposed as a single HTTP endpoint by using Container Engine services.

Microservices run on individual nodes

Each node in the cluster runs a Stackdriver Logging agent that captures the log messages. After the logs become available in Stackdriver Logging, a script automatically exports the logs to a Cloud Storage bucket by using the Stackdriver Logging support available from the Cloud SDK. In logging.sh, the sample runs commands similar to the following examples:

# Create a Cloud Storage Bucket
gsutil -q mb gs://BUCKET_NAME

# Allow Stackdriver Logging access to the bucket
gsutil -q acl ch -g cloud-logs@google.com:O gs://BUCKET_NAME

# For each microservice, set up Stackdriver Logging exports
gcloud beta logging sinks create SINK_NAME \
  storage.googleapis.com/BUCKET_NAME \
  --log=”kubernetes.home_service…” --project=PROJECT_ID

Note that you can also configure logs to be exported to Cloud Storage by using the Logs Viewer. This solution uses the SDK because it's required when exporting multiple logs.

When using Cloud Storage as a logs export destination, log entries of type LogEntry are saved in hourly batches in individual JSON files. These structured Cloud Logging entries include additional metadata that specifies when each log message was created, which resource or instance generated it, what its severity level is, and so on. In the following example of a Stackdriver Logging entry, in the structPayload.log element you can see the original log message that the microservice generated:

{
  "metadata": {
    "projectId": "...",
    "serviceName": "compute.googleapis.com",
    "zone": "us-central1-f",
    "labels": {
      "compute.googleapis.com/resource_id": "4154944251817867710",
      "compute.googleapis.com/resource_type": "instance"
    },
    "timestamp": "2015-09-22T20:01:13Z"
  },
  "insertId": "2015-09-22|13:01:17.636360-07|10.106.196.103|1124257302",
  "log": "kubernetes.browse-service-iaus6_default_sample-browse-service",
  "structPayload": {
    "stream": "stdout",
    "log": "2015/09/22 - 20:01:13 | 404 | 176ns | 10.160.0.1:34790 | GET /browse/46"
  }
}

Creating the Cloud Dataflow pipeline

Cloud Dataflow is a simple, yet powerful system that can be used for many kinds of data processing tasks. The Dataflow SDK provides a unified data model that can represent a data set of any size, including an unbounded or infinite data set from a continuously updating data source; it's well-suited for working with the log data in this solution. The Dataflow managed service can run both batch and streaming jobs. This means that you can use a single codebase for asynchronous or synchronous, real-time, event-driven data processing.

The Dataflow SDK provides simple data representations through a specialized collection class named PCollection. The SDK provides built-in and custom data transforms through the PTransform class. In Cloud Dataflow, transforms represent the processing logic of a pipeline. Transforms can be used for a variety of processing operations such as joining data, computing values mathematically, filtering data output, or converting data from one format to another. For more information about pipelines, PCollections, transforms, and I/O sources and sinks, see Dataflow Programming Model.

The following diagram shows the pipeline operations for the log data stored in Cloud Storage:

The Cloud Dataflow pipeline has several steps

Though the diagram might look complex, Cloud Dataflow makes it easy to build and use the pipeline. The following sections describe the specific operations at each stage of the pipeline.

Receiving the data

The pipeline starts by consuming input from the Cloud Storage buckets that contain the logs from the three microservices. Each collection of logs becomes a PCollection of String elements, where each element corresponds to a single LogEntry object. In the following snippet, homeLogs, browseLogs, and locateLogs are of type PCollection<String>:

homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));

To deal with the challenges of a continuously updating data set, the Dataflow SDK uses a technique called windowing. Windowing works by logically subdividing the data in a PCollection according to the timestamps of its individual elements. Because the source type is TextIO in this case, all the objects are initially read into a single global window, which is the default behavior.

Collecting the data into objects

The next step combines the individual microservice PCollections into a single PCollection by using the Flatten operation.

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

This operation is useful because each source PCollection contains the same data type and uses the same, global windowing strategy. Although the sources and structure of each log are the same in this solution, you could extend this approach into one where source and structure are different.

With a single PCollection created, you can now process the individual String elements by using a custom transform that performs several steps on the log entry. Here are the steps:

A transform processes string messages to create log messages

  • Deserialize the JSON String into a Stackdriver Logging LogEntry Java object.
  • Extract the timestamp from the LogEntry metadata.
  • Extract the following individual fields from the log message by using regular expressions: timestamp, responseTime, httpStatusCode, httpMethod, source IP address, and destination endpoint. Use these fields to create a timestamped LogMessage custom object.
  • Output the LogMessage objects, into a new PCollection.

The following code performs the steps:

PCollection<LogMessage> allLogMessages = allLogs
  .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

Aggregating the data by days

Recall that the goal is to process the elements on a daily basis to generate aggregated metrics based on logs for each day. To achieve this aggregation requires a windowing function that subdivides the data by day, which is possible because each LogMessage in the PCollection has a timestamp. After Cloud Dataflow partitions the PCollection along daily boundaries, operations that support windowed PCollections will honor the windowing scheme.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply(Window.named("allLogMessageToDaily").<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

With a single, windowed PCollection you can now compute aggregate daily metrics across all three multi-day log sources by running a single Cloud Dataflow job.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Combine.<String,Double,Double>perKey(new Max.MaxDoubleFn()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

First, a transform takes LogMessage objects as input and then outputs a PCollection of key-value pairs that map the destination endpoints as keys to response time values. Using that PCollection, you can compute two aggregate metrics: maximum response time per destination and average response time per destination. Because the PCollection is still partitioned by day, the output of each computation will represent a single day’s log data. This means that you will have two final PCollections: one containing maximum response time per destination per day and one containing average response time per destination per day.

Computing aggregate daily metrics

Loading the data into BigQuery

The final step in the pipeline outputs the resulting PCollections to BigQuery for downstream analysis and data warehousing.

First, the pipeline transforms the PCollection that contains LogMessage objects for all log sources into a PCollection of BigQuery TableRow objects. This step is required in order to leverage the built-in support in Cloud Dataflow to use BigQuery as a sink for a pipeline.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));

BigQuery tables require defined schemas. For this solution, the schemas are defined in LogAnalyticsPipelineOptions.java by using a default-value annotation. For example, the schema for the maximum-response-time table is defined as follows:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

An operation on the PCollections that contain the aggregated response-time values converts them into PCollections of TableRow objects, applying the appropriate schemas and creating the tables, if they're missing.

logsAsTableRows.apply(BigQueryIO.Write
  .named("allLogsToBigQuery")
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

This solution always appends new data to the existing data. This is an appropriate choice because this pipeline runs on a periodic basis to analyze new log data. However, it's possible to truncate the existing table data or only write to the table if it's empty, if one of those options makes more sense in a different scenario.

Querying the data

The BigQuery console lets you run queries against the output data and connect to third-party business intelligence tools such as Tableau and QlikView for additional analysis.

The BigQuery console runs a query against the log data

Using a streaming pipeline

The sample includes support for running the pipeline in either batch or streaming mode. It takes only a few steps to change the pipeline from batch to streaming. First, the Stackdriver Logging setup exports logging information to Cloud Pub/Sub instead of Cloud Storage. The next step is to change the input sources in the Cloud Dataflow pipeline from Cloud Storage to Cloud Pub/Sub topic subscriptions. You need one subscription for each input source.

The Cloud Pub/Sub pipeline uses subscriptions

You can see the SDK commands in use in logging.sh.

PCollections created from Cloud Pub/Sub input data use an unbounded global window. However, the individual entries already include timestamps. This means that it's not necessary to extract timestamp data from the Stackdriver Logging LogEntry object; just extract the log timestamps to create the custom LogMessage objects.

When using Cloud Pub/Sub pipeline you can extract timestamps from the logs

The rest of the pipeline stays as-is, including downstream flatten, transformation, aggregation, and output operations.

Running the pipeline

The steps to configure, build, and deploy this Cloud Dataflow pipeline, along with the steps required to deploy the microservices and configure Stackdriver Logging exports can be found in the tutorial. When you run the Cloud Dataflow job, you can use the Google Cloud Platform Console to monitor progress and view information about each stage in the pipeline.

The following image shows the console user interface while running an example pipeline:

The Cloud Platform Console shows a running Cloud Dataflow job

Extending the solution

The pipeline and set of operations described in this solution can be extended in a variety of different ways. The most obvious extensions would be to perform additional aggregations across the LogMessage data. For example, if session or anonymized user information were included in the log output, you could create aggregations around user activity. You could also use the ApproximateQuantiles transform to generate a distribution of response times.

Resources and cost

This tutorial uses several billable components of Google Cloud Platform, including:

  • Container Engine for deploying microservices
  • Stackdriver Logging to receive and export logs
  • Cloud Storage for storing exported logs in batch mode
  • Cloud Pub/Sub for streaming exported logs in streaming mode
  • Cloud Dataflow for processing log data
  • BigQuery for storing processing output and supporting rich queries on that output

The cost of running this tutorial will vary depending on run time. Use the pricing calculator to generate a cost estimate based on your projected usage. New Cloud Platform users may be eligible for a free trial.

Tutorial

You can get the complete contents of the tutorial, including setup instructions and source code, from GitHub at https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.

Next steps

  • Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.
  • Learn how to use Google Cloud Platform products to build end-to-end solutions.

Send feedback about...