Processing logs at scale using Cloud Dataflow

Google Cloud Platform (GCP) provides the scalable infrastructure you need to handle large and diverse log-analysis operations. This tutorial shows how to use GCP to build analytical pipelines that process log entries from multiple sources. You combine the log data in ways that help you extract meaningful information and persist insights derived from the data, which can be used for analysis, review, and reporting.

Overview

As your application grows more complex, deriving insights from the data captured in your logs becomes more challenging. Logs come from an increasing number of sources, so they can be hard to collate and query for useful information. Building, operating, and maintaining your own infrastructure to analyze log data at scale can require extensive expertise in running distributed systems and storage. This kind of dedicated infrastructure often represents a one-time capital expense, resulting in fixed capacity, which makes it hard to scale beyond the initial investment. These limitations can impact your business because they lead to slowdowns in generating meaningful, actionable insights from your data.

This solution shows you how to move past these limitations by using GCP products, as illustrated in the following diagram.

The solution uses several GCP components

In this solution, a set of sample microservices run on Google Kubernetes Engine (GKE) to implement a website. Stackdriver Logging collects logs from these services and then saves them to Cloud Storage buckets. Cloud Dataflow then processes the logs by extracting metadata and computing basic aggregations. The Cloud Dataflow pipeline is designed to process the log elements daily to generate aggregate metrics for server response times, based on the logs for each day. Finally, the output from Cloud Dataflow is loaded into BigQuery tables, where it can be analyzed to provide business intelligence. This solution also explains how you can change the pipeline to run in streaming mode, for low-latency, asynchronous log processing.

This tutorial provides a sample Cloud Dataflow pipeline, a sample web application, configuration information, and steps to run the sample.

Costs

This tutorial uses the following billable components of Google Cloud Platform:

  • GKE for deploying microservices.
  • Logging to receive and export logs.
  • Cloud Storage for storing exported logs in batch mode.
  • Cloud Pub/Sub for streaming exported logs in streaming mode.
  • Cloud Dataflow for processing log data.
  • BigQuery for storing processing output and supporting rich queries on that output.

To generate a cost estimate based on your projected usage, use the pricing calculator. New GCP users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Cleaning up.

Before you begin

  1. Sign in to your Google Account.

    If you don't already have one, sign up for a new account.

  2. Select or create a GCP project.

    Go to the project selector page

  3. Make sure that billing is enabled for your Google Cloud Platform project. Learn how to enable billing.

  4. Enable the BigQuery, Cloud Storage, Cloud Pub/Sub, Cloud Dataflow, GKE and Logging APIs.

    Enable the APIs

  5. Create a Stackdriver Workspace. For more information about Workspaces, see Managing Workspaces.

    GO TO Stackdriver

Setting up your environment

In this tutorial, you use Cloud Shell to enter commands. Cloud Shell gives you access to the command line in the GCP Console, and includes the Cloud SDK and other tools that you need to develop in GCP. Cloud Shell appears as a window at the bottom of the GCP Console. It can take several minutes to initialize, but the window appears immediately.

To use Cloud Shell to set up your environment and to clone the git repository used in this tutorial:

  1. In the GCP Console, open Cloud Shell.

    OPEN Cloud Shell

  2. Make sure you are working in the project you just created. Replace [YOUR_PROJECT_ID] with your newly created GCP project.

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. Set the default compute zone. For the purposes of this tutorial, it is us-east1. If you are deploying to a production environment, deploy to a region of your choice.

    export REGION=us-east1
    gcloud config set compute/region $REGION
    

Cloning the sample repository

  • Clone the repository containing the scripts and application logic you will use in this tutorial.

    git clone https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.git
    cd processing-logs-using-dataflow/services
    

Configure environment variables

# name your bucket
export PROJECT_ID=[YOUR_PROJECT_ID]
# name your GKE cluster
export CLUSTER_NAME=cluster-processing-logs-using-dataflow

# name the bucket for this tutorial
export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

# name the logging sink for this tutorial
export SINK_NAME=sink-processing-logs-using-dataflow

# name the logging sink for this tutorial
export DATASET_NAME=processing_logs_using_dataflow

Deploy the sample application on a new Google Kubernetes Engine cluster

# create the cluster and deploy sample services
./cluster.sh $PROJECT_ID $CLUSTER_NAME up

About the sample application deployment

The sample deployment models a shopping app. In this sample, users can visit the homepage of a retail site, browse for individual products, and then try to locate the products in nearby brick-and-mortar stores. The app consists of three microservices: HomeService, BrowseService, and LocateService. Each service is available from an API endpoint in a shared namespace. Users access the services by appending /home, /browse, and /locate to the base URL.

The application is configured to log incoming HTTP requests to stdout.

Using Google Kubernetes Engine with Stackdriver Logging

In this example, the microservices run in a Kubernetes Engine cluster, which is a group of Compute Engine instances, or nodes, that run Kubernetes. By default, GKE configures each node to provide a number of services, including monitoring, health checking, and centralized logging. This solution uses this built-in support for Logging to send logs from each microservice to Cloud Storage. As an alternative for applications that log information to files, not covered by this solution, you can configure cluster-level logging with Kubernetes.

Each microservice runs on an individual pod in the cluster. Each pod runs on a node and is exposed as a single HTTP endpoint by using GKE services.

Microservices run on individual nodes.

Each node in the cluster runs a Stackdriver Logging agent that captures the log messages. After the logs become available in Logging, a script automatically exports the logs to a Cloud Storage bucket by using the Logging support available from the Cloud SDK.

Note that you can also configure logs to be exported to Cloud Storage by using the Logs Viewer. This solution uses the Cloud SDK because it's required when exporting multiple logs.

When using Cloud Storage as a logs export destination, log entries of type LogEntry are saved in hourly batches in individual JSON files. These structured Loggingentries include additional metadata that specifies when each log message was created, which resource or instance generated it, what its severity level is, and so on. In the following example of a Logging entry, in the structPayload.log element you can see the original log message that the microservice generated:

 {
    "insertId": "ugjuig3j77zdi",
    "labels": {
        "compute.googleapis.com/resource_name": "fluentd-gcp-v3.2.0-9q4tr",
        "container.googleapis.com/namespace_name": "default",
        "container.googleapis.com/pod_name": "browse-service-rm7v9",
        "container.googleapis.com/stream": "stdout"
    },
    "logName": "projects/processing-logs-at-scale/logs/browse-service",
    "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
    "resource": {
        "labels": {
            "cluster_name": "cluster-processing-logs-using-dataflow",
            "container_name": "browse-service",
            "instance_id": "640697565266753757",
            "namespace_id": "default",
            "pod_id": "browse-service-rm7v9",
            "project_id": "processing-logs-at-scale",
            "zone": "us-east1-d"
        },
        "type": "container"
    },
    "severity": "INFO",
    "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs |      10.142.0.6 | GET      /browse/product/1\n",
    "timestamp": "2019-03-09T00:33:23.743466177Z"
 }

Set up logging

After the cluster is running, and the services are deployed, you can configure logging for the application.

First, get the credentials for the cluster, as the kubectl is used to obtain service names to configure the Stackdriver Logging export sinks.

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION

In the code repository, services/logging.sh sets up the necessary components for either batch or streaming mode. The script accepts these parameters:

logging.sh [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]

For the purposes of this tutorial, start batch logging:

./logging.sh $PROJECT_ID $BUCKET_NAME batch up

The steps below are illustrative of the commands run for the batch mode:

  1. Create a Cloud Storage bucket.

    gsutil -q mb gs://[BUCKET_NAME]

  2. Allow Stackdriver Logging access to the bucket.

    gsutil -q acl ch -g cloud-logs@google.com:O gs://[BUCKET_NAME]

  3. For each microservice, set up Stackdriver exports using a sink.

    gcloud logging sinks create [SINK_NAME] \ storage.googleapis.com/[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

Update destination permissions

The permissions of the destination, in this case, your Cloud Storage bucket, aren't modified when you create a sink. You must change the permission settings of your Cloud Storage bucket to grant write permission to your sink.

To update the permissions on your Cloud Storage bucket:

  1. Identify your sink's Writer Identity:

    1. Go to the Logs Viewer page:

      Go to the Logs Viewer page

    2. Select Exports in the left menu to see a summary of your sinks, including the sink's Writer Identity.

    3. IMPORTANT: For each of the three (3) sinks, there is a separate service account email which must be granted permissions on the Cloud Storage bucket

  2. From the GCP Console, click Storage > Browser:

    Go to Browser

  3. To open the detailed view, click the name of your bucket.

  4. Select Permissions and click Add members.

  5. Set the Role to Storage Object Creator and enter your sink's writer identity.

See Destination permissions for more information.

It is possible to check for the log object paths using the following command:

gsutil ls gs://processing-logs-at-scale-processing-logs-using-dataflow/ | grep service

When the output contains all three entries, you can proceed with the steps to run the data pipeline:

 gs://processing-logs-at-scale-processing-logs-using-dataflow/browse-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/home-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/locate-service/

Create the BigQuery dataset

bq mk $DATASET_NAME

Generate some load on the application services

Install the Apache HTTP server utilities

You will use the Apache HTTP server benchmarking tool (ab) to generate the load on the services.

sudo apt-get update

sudo apt-get install -y apache2-utils

The load.sh shell script generates load on the microservices by requesting responses from HomeService, BrowseService and LocateService.

A single load set comprises of one request to the home service, and twenty (20) requests each to the browse and locate services.

The option below will generate one thousand (1000) load sets (with concurrency set to 3 simulataneous requests)

cd ../services
./load.sh 1000 3

Let this run for several minutes to allow for a sufficient amount of logs to be created.

Start the Cloud Dataflow pipeline

Once you have allowed a sufficient amount of traffic to reach the services, you can start the dataflow pipeline.

For the purpose of this tutorial, the Cloud Dataflow pipeline is run in batch mode. The pipeline.sh shell script manually starts the pipeline.

cd ../dataflow
./pipeline.sh $PROJECT_ID $DATASET_NAME $BUCKET_NAME run

Understanding the Cloud Dataflow pipeline

Cloud Dataflow can be used for many kinds of data processing tasks. The Cloud Dataflow SDK provides a unified data model that can represent a data set of any size, including an unbounded or infinite data set from a continuously updating data source; it's well-suited for working with the log data in this solution. The Cloud Dataflow managed service can run both batch and streaming jobs. This means that you can use a single codebase for asynchronous or synchronous, real-time, event-driven data processing.

The Cloud Dataflow SDK provides simple data representations through a specialized collection class named PCollection. The SDK provides built-in and custom data transforms through the PTransform class. In Cloud Dataflow, transforms represent the processing logic of a pipeline. Transforms can be used for a variety of processing operations such as joining data, computing values mathematically, filtering data output, or converting data from one format to another. For more information about pipelines, PCollections, transforms, and I/O sources and sinks, see Dataflow Programming Model.

The following diagram shows the pipeline operations for the log data stored in Cloud Storage:

The operations steps for the pipeline.

Though the diagram might look complex, Cloud Dataflow makes it easy to build and use the pipeline. The following sections describe the specific operations at each stage of the pipeline.

Receiving the data

The pipeline starts by consuming input from the Cloud Storage buckets that contain the logs from the three microservices. Each collection of logs becomes a PCollection of String elements, where each element corresponds to a single LogEntry object. In the following snippet, homeLogs, browseLogs, and locateLogs are of type PCollection<String>:

homeLogs = p.apply("homeLogsTextRead", TextIO.read().from(options.getHomeLogSource()));
browseLogs = p.apply("browseLogsTextRead", TextIO.read().from(options.getBrowseLogSource()));
locateLogs = p.apply("locateLogsTextRead", TextIO.read().from(options.getLocateLogSource()));

To deal with the challenges of a continuously updating data set, the Dataflow SDK uses a technique called windowing. Windowing works by logically subdividing the data in a PCollection according to the timestamps of its individual elements. Because the source type is TextIO in this case, all the objects are initially read into a single global window, which is the default behavior.

Collecting the data into objects

The next step combines the individual microservice PCollections into a single PCollection using the Flatten operation.

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

This operation is useful because each source PCollection contains the same data type and uses the same, global windowing strategy. Although the sources and structure of each log are the same in this solution, you could extend this approach into one where source and structure are different.

With a single PCollection created, you can now process the individual String elements by using a custom transform that performs several steps on the log entry. Here are the steps illustrated in the following diagram:

A transform processes string messages to create log messages.

  • Deserialize the JSON string into a Stackdriver Logging LogEntry Java object.
  • Extract the timestamp from the LogEntry metadata.
  • Extract the following individual fields from the log message by using regular expressions: timestamp, responseTime, httpStatusCode, httpMethod, source IP address, and destination endpoint. Use these fields to create a timestamped LogMessage custom object.
  • Output the LogMessage objects, into a new PCollection.

The following code performs the steps:

PCollection<LogMessage> allLogMessages = allLogs
  .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

Aggregating the data by days

Recall that the goal is to process the elements on a daily basis to generate aggregated metrics based on logs for each day. To achieve this aggregation requires a windowing function that subdivides the data by day, which is possible because each LogMessage in the PCollection has a timestamp. After Cloud Dataflow partitions the PCollection along daily boundaries, operations that support windowed PCollections will honor the windowing scheme.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

With a single, windowed PCollection you can now compute aggregate daily metrics across all three multi-day log sources by running a single Cloud Dataflow job.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Max.<String>doublesPerKey());
 // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

First, a transform takes LogMessage objects as input and then outputs a PCollection of key-value pairs that map the destination endpoints as keys to response time values, as illustrated in the following diagram.

Computing aggregate daily metrics.

Using that PCollection, you can compute two aggregate metrics: maximum response time per destination and average response time per destination. Because the PCollection is still partitioned by day, the output of each computation will represent a single day’s log data. This means that you will have two final PCollections: one containing maximum response time per destination per day and one containing average response time per destination per day.

Loading the data into BigQuery

The final step in the pipeline outputs the resulting PCollections to BigQuery for downstream analysis and data warehousing.

First, the pipeline transforms the PCollection that contains LogMessage objects for all log sources into a PCollection of BigQuery TableRow objects. This step is required in order to leverage the built-in support in Cloud Dataflow to use BigQuery as a sink for a pipeline.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

BigQuery tables require defined schemas. For this solution, the schemas are defined in LogAnalyticsPipelineOptions.java by using a default-value annotation. For example, the schema for the maximum-response-time table is defined as follows:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

An operation on the PCollections that contain the aggregated response-time values converts them into PCollections of TableRow objects, applying the appropriate schemas and creating the tables, if they're missing.

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

This solution always appends new data to the existing data. This is an appropriate choice because this pipeline runs on a periodic basis to analyze new log data. However, it's possible to truncate the existing table data or only write to the table if it's empty, if one of those options makes more sense in a different scenario.

Querying the data from BigQuery

The BigQuery console lets you run queries against the output data and connect to third-party business intelligence tools such as Tableau and QlikView for additional analysis.

  1. In the GCP Consolel, open BigQuery.

    OPEN BigQuery

  2. Click the project processing-logs-at-scale and then click the dataset processing_logs_using_dataflow.

  3. Select all_logs_table, and then in the data pane, select Preview, to view a sample of the data in the all logs table.

  4. In the Query editor enter the following query:

    SELECT *
    FROM `processing_logs_using_dataflow.max_response_time_table`
    ORDER BY aggResponseTime DESC
    LIMIT 100;
    
  5. To run the query, click Run.

    The BigQuery console runs a query against the log data.

Using a streaming pipeline

The sample includes support for running the pipeline in either batch or streaming mode. It takes only a few steps to change the pipeline from batch to streaming. First, the Stackdriver Logging setup exports logging information to Cloud Pub/Sub instead of Cloud Storage. The next step is to change the input sources in the Cloud Dataflow pipeline from Cloud Storage to Cloud Pub/Sub topic subscriptions. You need one subscription for each input source.

The Cloud Pub/Sub pipeline uses subscriptions.

You can see the SDK commands in use in logging.sh.

PCollections created from Cloud Pub/Sub input data use an unbounded global window. However, the individual entries already include timestamps. This means that it's not necessary to extract timestamp data from the Stackdriver Logging LogEntry object; just extract the log timestamps to create the custom LogMessage objects.

When using Cloud Pub/Sub pipeline you can extract timestamps from the logs

The rest of the pipeline stays as-is, including downstream flatten, transformation, aggregation, and output operations.

Monitoring the pipeline

When you run the Cloud Dataflow job, you can use the Google Cloud Platform Console to monitor progress and view information about each stage in the pipeline.

The following image shows the GCP Console while running an example pipeline:

The GCP Console shows a running Cloud Dataflow job.

Cleaning up

Delete the project

  1. In the GCP Console, go to the Manage resources page.

    Go to the Manage resources page

  2. In the project list, select the project you want to delete and click Delete .
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Delete all the components

Make certain environment variables are still set to the values used during the setup

  1. Delete the BigQuery dataset:

    bq rm $DATASET_NAME
    
  2. Deactivate the Cloud Logging exports. This step deletes the exports and the specified Cloud Storage bucket:

    cd ../services
    ./logging.sh $PROJECT_ID $BUCKET_NAME batch down
    
  3. Delete the Compute Engine cluster used to run the sample web applications:

    /cluster.sh $PROJECT_ID $CLUSTER_NAME down
    

Extending the solution

The pipeline and set of operations described in this solution can be extended in a variety of different ways. The most obvious extensions would be to perform additional aggregations across the LogMessage data. For example, if session or anonymized user information were included in the log output, you could create aggregations around user activity. You could also use the ApproximateQuantiles transform to generate a distribution of response times.

What's next

  • Try out other Google Cloud Platform features for yourself. Have a look at our tutorials.
  • Learn how to use Google Cloud Platform products to build end-to-end solutions.
หน้านี้มีประโยชน์ไหม โปรดแสดงความคิดเห็น