Processing logs at scale using Cloud Dataflow

Stay organized with collections Save and categorize content based on your preferences.

Google Cloud provides the scalable infrastructure you need to handle large and diverse log-analysis operations. This tutorial shows how to use Google Cloud to build analytical pipelines that process log entries from multiple sources. You combine the log data in ways that help you extract meaningful information and persist insights derived from the data, which can be used for analysis, review, and reporting.


As your application grows more complex, deriving insights from the data captured in your logs becomes more challenging. Logs come from an increasing number of sources, so they can be hard to collate and query for useful information. Building, operating, and maintaining your own infrastructure to analyze log data at scale can require extensive expertise in running distributed systems and storage. This kind of dedicated infrastructure often represents a one-time capital expense, resulting in fixed capacity, which makes it hard to scale beyond the initial investment. These limitations can impact your business because they lead to slowdowns in generating meaningful, actionable insights from your data.

This solution shows you how to move past these limitations by using Google Cloud products, as illustrated in the following diagram.

The solution uses several Google Cloud components

In this solution, a set of sample microservices run on Google Kubernetes Engine (GKE) to implement a website. Cloud Logging collects logs from these services and then saves them to Cloud Storage buckets. Dataflow then processes the logs by extracting metadata and computing basic aggregations. The Dataflow pipeline is designed to process the log elements daily to generate aggregate metrics for server response times, based on the logs for each day. Finally, the output from Dataflow is loaded into BigQuery tables, where it can be analyzed to provide business intelligence. This solution also explains how you can change the pipeline to run in streaming mode, for low-latency, asynchronous log processing.

This tutorial provides a sample Dataflow pipeline, a sample web application, configuration information, and steps to run the sample.


This tutorial uses the following billable components of Google Cloud:

  • GKE for deploying microservices.
  • Cloud Logging to receive and export logs.
  • Cloud Storage for storing exported logs in batch mode.
  • Pub/Sub for streaming exported logs in streaming mode.
  • Dataflow for processing log data.
  • BigQuery for storing processing output and supporting rich queries on that output.

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  4. Enable the BigQuery, Cloud Storage, Pub/Sub, Dataflow, GKE and Logging APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  7. Enable the BigQuery, Cloud Storage, Pub/Sub, Dataflow, GKE and Logging APIs.

    Enable the APIs

Setting up your environment

In this tutorial, you use Cloud Shell to enter commands. Cloud Shell gives you access to the command line in the Google Cloud console, and includes the Google Cloud CLI and other tools that you need to develop in Google Cloud. Cloud Shell appears as a window at the bottom of the Google Cloud console. It can take several minutes to initialize, but the window appears immediately.

To use Cloud Shell to set up your environment and to clone the git repository used in this tutorial:

  1. In the Google Cloud console, open Cloud Shell.

    Open Cloud Shell

  2. Make sure you are working in the project you created. Replace [YOUR_PROJECT_ID] with your newly created Google Cloud project.

    gcloud config set project [YOUR_PROJECT_ID]
  3. Set the default compute zone. For the purposes of this tutorial, it is us-east1. If you are deploying to a production environment, deploy to a region of your choice.

    export REGION=us-east1
    gcloud config set compute/region $REGION

Cloning the sample repository

  • Clone the repository containing the scripts and application logic you use in this tutorial.

    git clone
    cd processing-logs-using-dataflow/services

Configure environment variables

# name your bucket
# name your GKE cluster
export CLUSTER_NAME=cluster-processing-logs-using-dataflow

# name the bucket for this tutorial
export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

# name the logging sink for this tutorial
export SINK_NAME=sink-processing-logs-using-dataflow

# name the logging sink for this tutorial
export DATASET_NAME=processing_logs_using_dataflow

Deploy the sample application on a new Google Kubernetes Engine cluster

# create the cluster and deploy sample services

About the sample application deployment

The sample deployment models a shopping app. In this sample, users can visit the homepage of a retail site, browse for individual products, and then try to locate the products in nearby brick-and-mortar stores. The app consists of three microservices: HomeService, BrowseService, and LocateService. Each service is available from an API endpoint in a shared namespace. Users access the services by appending /home, /browse, and /locate to the base URL.

The application is configured to log incoming HTTP requests to stdout.

Using Google Kubernetes Engine with Cloud Logging

In this example, the microservices run in a Kubernetes Engine cluster, which is a group of Compute Engine instances, or nodes, that run Kubernetes. By default, GKE configures each node to provide a number of services, including monitoring, health checking, and centralized logging. This solution uses this built-in support for Logging to send logs from each microservice to Cloud Storage. As an alternative for applications that log information to files, not covered by this solution, you can configure cluster-level logging with Kubernetes.

Each microservice runs on an individual Pod in the cluster. Each Pod runs on a node and is exposed as a single HTTP endpoint by using GKE services.

Microservices run on individual nodes.

Each node in the cluster runs a Cloud Logging agent that captures the log messages. After the logs become available in Logging, a script automatically routes the logs to a Cloud Storage bucket by using the Logging support available from the gcloud CLI.

You can also configure logs to be exported to Cloud Storage by using the Logs Explorer. This solution uses the gcloud CLI because it's required when exporting multiple logs.

When using Cloud Storage as a logs export destination, log entries of type LogEntry are saved in hourly batches in individual JSON files. These structured Logging entries include additional metadata that specifies when each log message was created, which resource or instance generated it, what its severity level is, and so on. In the following example of a Logging entry, in the structPayload.log element you can see the original log message that the microservice generated:

    "insertId": "ugjuig3j77zdi",
    "labels": {
        "": "fluentd-gcp-v3.2.0-9q4tr",
        "": "default",
        "": "browse-service-rm7v9",
        "": "stdout"
    "logName": "projects/processing-logs-at-scale/logs/browse-service",
    "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
    "resource": {
        "labels": {
            "cluster_name": "cluster-processing-logs-using-dataflow",
            "container_name": "browse-service",
            "instance_id": "640697565266753757",
            "namespace_id": "default",
            "pod_id": "browse-service-rm7v9",
            "project_id": "processing-logs-at-scale",
            "zone": "us-east1-d"
        "type": "container"
    "severity": "INFO",
    "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs | | GET      /browse/product/1\n",
    "timestamp": "2019-03-09T00:33:23.743466177Z"

Set up logging

After the cluster is running, and the services are deployed, you can configure logging for the application.

First, get the credentials for the cluster, as the kubectl is used to obtain service names to configure the Cloud Logging export sinks.

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION

In the code repository, services/ sets up the necessary components for either batch or streaming mode. The script accepts these parameters: [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]

For the purposes of this tutorial, start batch logging:


The following steps demonstrate the commands run for the batch mode:

  1. Create a Cloud Storage bucket.

    gsutil -q mb gs://[BUCKET_NAME]

  2. Allow Cloud Logging access to the bucket.

    gsutil -q acl ch -g gs://[BUCKET_NAME]

  3. For each microservice, set up Cloud Logging exports using a sink.

    gcloud logging sinks create [SINK_NAME] \[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

Update destination permissions

The permissions of the destination, in this case, your Cloud Storage bucket, aren't modified when you create a sink. You must change the permission settings of your Cloud Storage bucket to grant write permission to your sink.

To update the permissions on your Cloud Storage bucket:

  1. Identify your sink's writer identity:

    1. Go to the Logs Router page:

      Go to the Logs Router page

      The Logs Router page contains a summary of your sinks, including the sink's writer identity.

    2. IMPORTANT: For each of the three (3) sinks, there is a separate service account email which must be granted permissions on the Cloud Storage bucket

  2. From the Google Cloud console, click Storage > Browser:

    Go to Browser

  3. To open the detailed view, click the name of your bucket.

  4. Select Permissions and click Add principals.

  5. Set the Role to Storage Object Creator and enter your sink's writer identity.

See Destination permissions for more information.

It is possible to check for the log object paths using the following command:

gsutil ls gs://$BUCKET_NAME | grep service

When the output contains all three entries, you can proceed with the steps to run the data pipeline:


Create the BigQuery dataset


Generate some load on the application services

Install the Apache HTTP server utilities

You use the Apache HTTP server benchmarking tool (ab) to generate the load on the services.

sudo apt-get update

sudo apt-get install -y apache2-utils

The shell script generates load on the microservices by requesting responses from HomeService, BrowseService and LocateService.

A single load set comprises of one request to the home service, and twenty (20) requests each to the browse and locate services.

The following option generates one thousand (1000) load sets (with concurrency set to 3 simultaneous requests)

cd ../services
./ 1000 3

Let this run for several minutes to create a sufficient amount of logs.

Start the Dataflow pipeline

After a sufficient amount of traffic has reached the services, you can start the dataflow pipeline.

For the purpose of this tutorial, the Dataflow pipeline is run in batch mode. The shell script manually starts the pipeline.

cd ../dataflow

Understanding the Dataflow pipeline

Dataflow can be used for many kinds of data processing tasks. The Dataflow SDK provides a unified data model that can represent a data set of any size, including an unbounded or infinite data set from a continuously updating data source; it's well-suited for working with the log data in this solution. The Dataflow managed service can run both batch and streaming jobs. This means that you can use a single codebase for asynchronous or synchronous, real-time, event-driven data processing.

The Dataflow SDK provides simple data representations through a specialized collection class named PCollection. The SDK provides built-in and custom data transforms through the PTransform class. In Dataflow, transforms represent the processing logic of a pipeline. Transforms can be used for a variety of processing operations such as joining data, computing values mathematically, filtering data output, or converting data from one format to another. For more information about pipelines, PCollections, transforms, and I/O sources and sinks, see Dataflow Programming Model.

The following diagram shows the pipeline operations for the log data stored in Cloud Storage:

The operations steps for the pipeline.

Though the diagram might look complex, Dataflow makes it easy to build and use the pipeline. The following sections describe the specific operations at each stage of the pipeline.

Receiving the data

The pipeline starts by consuming input from the Cloud Storage buckets that contain the logs from the three microservices. Each collection of logs becomes a PCollection of String elements, where each element corresponds to a single LogEntry object. In the following snippet, homeLogs, browseLogs, and locateLogs are of type PCollection<String>:

homeLogs = p.apply("homeLogsTextRead",;
browseLogs = p.apply("browseLogsTextRead",;
locateLogs = p.apply("locateLogsTextRead",;

To deal with the challenges of a continuously updating data set, the Dataflow SDK uses a technique called windowing. Windowing works by logically subdividing the data in a PCollection according to the timestamps of its individual elements. Because the source type is TextIO in this case, all the objects are initially read into a single global window, which is the default behavior.

Collecting the data into objects

The next step combines the individual microservice PCollections into a single PCollection using the Flatten operation.

PCollection<String> allLogs = PCollectionList

This operation is useful because each source PCollection contains the same data type and uses the same, global windowing strategy. Although the sources and structure of each log are the same in this solution, you could extend this approach into one where source and structure are different.

With a single PCollection created, you can now process the individual String elements by using a custom transform that performs several steps on the log entry. Here are the steps illustrated in the following diagram:

A transform processes string messages to create log messages.

  • Deserialize the JSON string into a Cloud Logging LogEntry Java object.
  • Extract the timestamp from the LogEntry metadata.
  • Extract the following individual fields from the log message by using regular expressions: timestamp, responseTime, httpStatusCode, httpMethod, source IP address, and destination endpoint. Use these fields to create a timestamped LogMessage custom object.
  • Output the LogMessage objects, into a new PCollection.

The following code performs the steps:

PCollection<LogMessage> allLogMessages = allLogs
  .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

Aggregating the data by days

Recall that the goal is to process the elements on a daily basis to generate aggregated metrics based on logs for each day. To achieve this aggregation requires a windowing function that subdivides the data by day, which is possible because each LogMessage in the PCollection has a timestamp. After Dataflow partitions the PCollection along daily boundaries, operations that support windowed PCollections honors the windowing scheme.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

With a single, windowed PCollection you can now compute aggregate daily metrics across all three multi-day log sources by running a single Dataflow job.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
 // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection

First, a transform takes LogMessage objects as input and then outputs a PCollection of key-value pairs that map the destination endpoints as keys to response time values, as illustrated in the following diagram.

Computing aggregate daily metrics.

Using that PCollection, you can compute two aggregate metrics: maximum response time per destination and average response time per destination. Because the PCollection is still partitioned by day, the output of each computation represents a single day’s log data. This means that you have two PCollections: one containing maximum response time per destination per day and one containing average response time per destination per day.

Loading the data into BigQuery

The final step in the pipeline outputs the resulting PCollections to BigQuery for downstream analysis and data warehousing.

First, the pipeline transforms the PCollection that contains LogMessage objects for all log sources into a PCollection of BigQuery TableRow objects. This step is required in order to leverage the built-in support in Dataflow to use BigQuery as a sink for a pipeline.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

BigQuery tables require defined schemas. For this solution, the schemas are defined in by using a default-value annotation. For example, the schema for the maximum-response-time table is defined as follows:


An operation on the PCollections that contain the aggregated response-time values converts them into PCollections of TableRow objects, applying the appropriate schemas and creating the tables, if they're missing.

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()

This solution always appends new data to the existing data. This is an appropriate choice because this pipeline runs on a periodic basis to analyze new log data. However, it's possible to truncate the existing table data or only write to the table if it's empty, if one of those options makes more sense in a different scenario.

Querying the data from BigQuery

The BigQuery console lets you run queries against the output data and connect to third-party business intelligence tools such as Tableau and QlikView for additional analysis.

  1. In the Google Cloud console, open BigQuery.

    Open BigQuery

  2. Click the project processing-logs-at-scale and then click the dataset processing_logs_using_dataflow.

  3. Select all_logs_table, and then in the data pane, select Preview, to view a sample of the data in the all logs table.

  4. In the Query editor enter the following query:

    SELECT *
    FROM `processing_logs_using_dataflow.max_response_time_table`
    ORDER BY aggResponseTime DESC
    LIMIT 100;
  5. To run the query, click Run.

    The BigQuery console runs a query against the log data.

Using a streaming pipeline

The sample includes support for running the pipeline in either batch or streaming mode. It takes only a few steps to change the pipeline from batch to streaming. First, the Cloud Logging setup exports logging information to Pub/Sub instead of Cloud Storage. The next step is to change the input sources in the Dataflow pipeline from Cloud Storage to Pub/Sub topic subscriptions. You need one subscription for each input source.

The Pub/Sub pipeline uses subscriptions.

You can see the SDK commands in use in

PCollections created from Pub/Sub input data use an unbounded global window. However, the individual entries already include timestamps. This means that it's not necessary to extract timestamp data from the Cloud Logging LogEntry object; instead, extract the log timestamps to create the custom LogMessage objects.

When using Pub/Sub pipeline you can extract timestamps from the logs

The rest of the pipeline stays as-is, including downstream flatten, transformation, aggregation, and output operations.

Monitoring the pipeline

When you run the Dataflow job, you can use the Google Cloud console to monitor progress and view information about each stage in the pipeline.

The following image shows the Google Cloud console while running an example pipeline:

The Google Cloud console shows a running Dataflow job.

Cleaning up

Delete the project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete all the components

Make certain environment variables are still set to the values used during the setup

  1. Delete the BigQuery dataset:

    bq rm $DATASET_NAME
  2. Deactivate the ((logging_name)) exports. This step deletes the exports and the specified Cloud Storage bucket:

    cd ../services
    ./ $PROJECT_ID $BUCKET_NAME batch down
  3. Delete the Compute Engine cluster used to run the sample web applications:


Extending the solution

The pipeline and set of operations described in this solution can be extended in a variety of different ways. The most obvious extensions would be to perform additional aggregations across the LogMessage data. For example, if session or anonymized user information were included in the log output, you could create aggregations around user activity. You could also use the ApproximateQuantiles transform to generate a distribution of response times.

What's next