使用 Cloud Dataflow 大规模处理日志

Google Cloud 提供处理大型多样化日志分析操作所需的可扩缩基础架构。本教程介绍如何使用 Google Cloud 构建可处理多个来源的日志条目的分析流水线。您应以适当的方式组合日志数据,帮助您提取有意义信息并持久保存从数据中获得的深度分析,以用于分析、审核和报告。

概览

随着您的应用变得越来越复杂,从日志捕获的数据中挖掘数据洞见变得更具挑战性。日志的来源越来越多,因此很难整理和查询有用的信息。此外,为了构建、操作和维护自己的基础架构来大规模地分析日志数据,可能需要在运行分布式系统和存储方面拥有丰富的专业知识。这种专用基础架构通常意味着一次性的资本支出,导致其容量固定,很难在最初投资的规模上进一步扩展。这些限制可能会影响您的业务,因为它们无法根据您的数据及时生成有意义、富有实用价值的分析洞见。

本解决方案向您展示如何使用 Google Cloud 产品克服这些限制,如下图所示。

本解决方案使用多个 Google Cloud 组件

在此解决方案中,一组示例微服务在 Google Kubernetes Engine (GKE) 上运行,以实现一个网站。Cloud Logging 从这些服务收集日志,再将日志保存到 Cloud Storage 存储分区。然后,Dataflow 通过提取元数据和计算基本汇总数据来处理日志。Dataflow 流水线旨在每天处理日志元素,根据每天的日志为服务器响应时间生成汇总指标。最后,Dataflow 的输出被加载到 BigQuery 表中,在此进行分析以提供商业智能。另外,此解决方案还说明了如何更改该流水线,使其以流式模式运行,以实现低延时的异步日志处理。

本教程提供了示例 Dataflow 流水线、示例 Web 应用、配置信息以及运行示例的步骤。

费用

本教程使用 Google Cloud 的以下收费组件:

  • 用于部署微服务的 GKE。
  • 用于接收和导出日志的 Cloud Logging。
  • 用于以批量模式存储导出的日志的 Cloud Storage
  • 用于以流式模式传输导出的日志的 Pub/Sub。
  • 用于处理日志数据的 Dataflow
  • 用于存储处理输出并支持对该输出执行富查询的 BigQuery。

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册一个新帐号

  2. 在 Cloud Console 的项目选择器页面上,选择或创建 Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Google Cloud 项目已启用结算功能。 了解如何确认您的项目已启用结算功能

  4. 启用 BigQuery, Cloud Storage, Pub/Sub, Dataflow, GKE and Logging API。

    启用 API

  5. 创建 Cloud Monitoring 工作区

    转至 Monitoring

设置环境

在本教程中,您将使用 Cloud Shell 输入命令。Cloud Shell 让您能够使用 Cloud Console 中的命令行,并包含在 Cloud Console 中进行开发所需的 Cloud SDK 和其他工具。Cloud Shell 显示为 Cloud Console 底部的一个窗口。初始化可能需要几分钟,但窗口会立即显示。

如需使用 Cloud Shell 设置环境并克隆本教程中使用的 git 代码库,请执行以下操作:

  1. 在 Cloud Console 中,打开 Cloud Shell。

    打开 Cloud Shell

  2. 确保您正在使用刚刚创建的项目。将 [YOUR_PROJECT_ID] 替换为您刚创建的 Google Cloud 项目。

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. 设置默认计算地区。在本教程中,计算地区为 us-east1。如果要部署到生产环境,请部署到您选择的区域

    export REGION=us-east1
    gcloud config set compute/region $REGION
    

克隆示例代码库

  • 克隆包含本教程中使用的脚本和应用逻辑的代码库。

    git clone https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.git
    cd processing-logs-using-dataflow/services
    

配置环境变量

# name your bucket
export PROJECT_ID=[YOUR_PROJECT_ID]
# name your GKE cluster
export CLUSTER_NAME=cluster-processing-logs-using-dataflow

# name the bucket for this tutorial
export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

# name the logging sink for this tutorial
export SINK_NAME=sink-processing-logs-using-dataflow

# name the logging sink for this tutorial
export DATASET_NAME=processing_logs_using_dataflow

在新的 Google Kubernetes Engine 集群中部署示例应用

# create the cluster and deploy sample services
./cluster.sh $PROJECT_ID $CLUSTER_NAME up

关于示例应用部署

该示例部署模仿一个购物应用。在此示例中,用户可以访问零售网站的主页、浏览单个产品,然后尝试在附近的实体店中找到该产品。此应用包含三个微服务:HomeServiceBrowseServiceLocateService。每个服务都可以从共享命名空间中的 API 端点获得。用户通过将 /home/browse/locate 附加到基本网址末尾来访问这些服务。

应用被配置为将传入的 HTTP 请求记录到 stdout

将 Google Kubernetes Engine 与 Cloud Logging 搭配使用

在此示例中,微服务在 Kubernetes Engine 集群(一组运行 KubernetesCompute Engine 实例(亦称节点))中运行。默认情况下,GKE 会配置每个节点以提供多种服务,包括监控、运行状况检查和集中式日志记录。此解决方案使用对 Logging 的内置支持将每个微服务的日志发送到 Cloud Storage。您可以使用 Kubernetes 配置集群级层的日志记录,作为将日志信息记录到文件(此解决方案中未涵盖)的替代方法。

每个微服务都在集群中单独的 pod 上运行,每个 pod 都在一个节点上运行,并使用 GKE 服务公开为单个 HTTP 端点。

微服务在各个节点上运行。

集群中的每个节点都运行一个 Cloud Logging 代理,用于捕获日志消息。当 Logging 中提供日志后,脚本会使用 Cloud SDK 提供的 Logging 支持自动将日志导出到 Cloud Storage 存储分区。

请注意,您还可以使用日志查看器配置要导出到 Cloud Storage 的日志。此解决方案使用 Cloud SDK,因为导出多个日志时需要用到它。

将 Cloud Storage 用作日志导出目标时,LogEntry 类型的日志条目以每小时为一个批次,保存在一个 JSON 文件中。这些结构化 Logging 条目包括其他元数据,说明每条日志消息何时创建、生成它的资源或实例、其严重性级别等。在下面的 Logging 条目示例中,您可以在 structPayload.log 元素中看到微服务生成的原始日志消息:

 {
    "insertId": "ugjuig3j77zdi",
    "labels": {
        "compute.googleapis.com/resource_name": "fluentd-gcp-v3.2.0-9q4tr",
        "container.googleapis.com/namespace_name": "default",
        "container.googleapis.com/pod_name": "browse-service-rm7v9",
        "container.googleapis.com/stream": "stdout"
    },
    "logName": "projects/processing-logs-at-scale/logs/browse-service",
    "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
    "resource": {
        "labels": {
            "cluster_name": "cluster-processing-logs-using-dataflow",
            "container_name": "browse-service",
            "instance_id": "640697565266753757",
            "namespace_id": "default",
            "pod_id": "browse-service-rm7v9",
            "project_id": "processing-logs-at-scale",
            "zone": "us-east1-d"
        },
        "type": "container"
    },
    "severity": "INFO",
    "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs |      10.142.0.6 | GET      /browse/product/1\n",
    "timestamp": "2019-03-09T00:33:23.743466177Z"
 }

设置日志记录

集群运行且服务部署后,您可以配置应用的日志记录功能。

首先,获取集群的凭据,因为 kubectl 用于获取服务名称以配置 Cloud Logging 导出接收器。

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION

在代码库中,services/logging.sh 为批量模式或流式模式设置必要的组件。该脚本接受以下参数:

logging.sh [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]

在本教程中,启动批量日志记录:

./logging.sh $PROJECT_ID $BUCKET_NAME batch up

以下步骤说明了批量模式的命令运行情况:

  1. 创建 Cloud Storage 存储分区。

    gsutil -q mb gs://[BUCKET_NAME]

  2. 允许 Cloud Logging 访问此存储分区。

    gsutil -q acl ch -g cloud-logs@google.com:O gs://[BUCKET_NAME]

  3. 对于每个微服务,使用接收器设置 Cloud Logging 导出。

    gcloud logging sinks create [SINK_NAME] \ storage.googleapis.com/[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

更新目标位置权限

创建接收器时,目标位置(在本例中为您的 Cloud Storage 存储分区)的权限不会修改。您必须更改 Cloud Storage 存储分区的权限设置,以便向接收器授予写入权限。

如需更新 Cloud Storage 存储分区的权限,请执行以下操作:

  1. 确认接收器的写入者身份

    1. 转到日志查看器页面:

      转到“日志查看器”页面

    2. 在左侧菜单中选择导出以查看接收器的摘要,包括接收器的写入者身份

    3. 重要提示:这三 (3) 个接收器中的每一个都有一个单独的服务帐号电子邮件,必须为这些电子邮件授予对 Cloud Storage 存储分区的权限

  2. 在 Cloud Console 中,点击存储 > 浏览器

    转到浏览器

  3. 如需打开详细视图,请点击存储分区的名称。

  4. 选择权限,然后点击添加成员

  5. 角色设置为 Storage Object Creator,并输入您的接收器的写入者身份。

如需了解详情,请参阅目标位置权限

可使用以下命令查看日志对象路径:

gsutil ls gs://$BUCKET_NAME | grep service

当输出包含所有三个条目时,您可以继续执行运行数据流水线的步骤:

 gs://$BUCKET_NAME/browse-service/
 gs://$BUCKET_NAME/home-service/
 gs://$BUCKET_NAME/locate-service/

创建 BigQuery 数据集

bq mk $DATASET_NAME

为应用服务生成一些负载

安装 Apache HTTP 服务器实用程序

可使用 Apache HTTP 服务器基准化分析工具 (ab) 在服务上生成负载。

sudo apt-get update

sudo apt-get install -y apache2-utils

load.sh Shell 脚本通过从 HomeServiceBrowseServiceLocateService 请求响应,在微服务上生成负载。

单个负载集包含一个对 HomeService 的请求,以及各二十 (20) 个浏览和定位服务请求。

以下选项将生成一千 (1000) 个负载集(并发设置为 3 个同时处理的请求)

cd ../services
./load.sh 1000 3

让它运行几分钟,以便创建足够数量的日志。

启动 Dataflow 流水线

允许足够多的流量到达服务后,您就可启动 Dataflow 流水线了。

在本教程中,Dataflow 流水线以批量模式运行。可使用 pipeline.sh Shell 脚本手动启动此流水线。

cd ../dataflow
./pipeline.sh $PROJECT_ID $DATASET_NAME $BUCKET_NAME run

了解 Dataflow 流水线

Dataflow 可用于执行多种数据处理任务。Dataflow SDK 提供一个统一的数据模型,可以表示任意大小的数据集,包括来自持续更新数据源的无界限或无限数据集,因此它非常适合处理此解决方案中的日志数据。Dataflow 托管式服务可以运行批量和流式作业。这意味着您可以使用单个代码库执行异步或同步、实时以及事件驱动型数据处理。

Dataflow SDK 通过名为 PCollection 的专用集合类提供简单的数据表示形式。此 SDK 通过 PTransform 类提供内置和自定义数据转换。在 Dataflow 中,转换表示流水线的处理逻辑。具体来说,转换可用于各种处理操作,例如联接数据、以数学方法计算值、过滤数据输出或将数据从一种格式转换为另一种格式。如需详细了解流水线、PCollection、转换,以及 I/O 来源和接收器,请参阅 Dataflow 编程模型

下图展示了存储在 Cloud Storage 中的日志数据的流水线操作:

流水线的操作步骤。

虽然此图可能看起来很复杂,但 Dataflow 可让您轻松构建和使用流水线。以下部分介绍了流水线每个阶段的具体操作。

接收数据

流水线始于提取 Cloud Storage 存储分区的输入,这些存储分区包含三项微服务的日志。每个日志集合都成为一个包含 String 元素的 PCollection 类实例,其中每个元素对应一个 LogEntry 对象。在以下代码段中,homeLogsbrowseLogslocateLogs 的类型为 PCollection<String>:

homeLogs = p.apply("homeLogsTextRead", TextIO.read().from(options.getHomeLogSource()));
browseLogs = p.apply("browseLogsTextRead", TextIO.read().from(options.getBrowseLogSource()));
locateLogs = p.apply("locateLogsTextRead", TextIO.read().from(options.getLocateLogSource()));

为了应对持续更新的数据集带来的挑战,Dataflow SDK 使用了一种称为窗口化选取的技术。窗口化选取根据 PCollection 中各个元素的时间戳从逻辑上细分其中的数据。由于在本示例中来源类型为 TextIO,因此所有对象最初会被读取到单个全局窗口,这是默认行为。

将数据收集到对象中

下一步通过使用 Flatten 操作将各个微服务 PCollection 组合成单个 PCollection。

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

此操作很有用,因为每个来源 PCollection 包含相同的数据类型并使用相同的全局数据选取策略。虽然此解决方案中每个日志的来源和结构相同,但您也可以为来源和结构不同的日志使用此方法。

创建单个 PCollection 后,您现在可以使用对日志条目执行多个步骤的自定义转换来处理各个 String 元素。这些步骤如下图所示:

转换会处理字符串消息以创建日志消息。

  • 将 JSON 字符串反序列化为 Cloud Logging LogEntry Java 对象。
  • LogEntry 元数据中提取时间戳。
  • 使用正则表达式从日志消息中提取以下各个字段:timestampresponseTimehttpStatusCodehttpMethodsource IP 地址和 destination 端点。使用这些字段可以创建带时间戳的 LogMessage 自定义对象。
  • LogMessage 对象输出到新的 PCollection

以下代码执行这些步骤:

PCollection<LogMessage> allLogMessages = allLogs
  .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

按天汇总数据

回想一下,我们的目标是每天处理元素,以根据每天的日志生成汇总指标。为了实现此汇总,需要一个窗口化选取函数,该函数按天细分数据。由于 PCollection 中的每个 LogMessage 均有一个时间戳,这是可以实现的。在 Dataflow 按天划分 PCollection 之后,支持窗口化 PCollection 的操作将遵循窗口化选取方案。

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

有了单个窗口化的 PCollection,您现在就可以通过运行单个 Dataflow 作业来计算所有三个多天日志源的每日汇总指标。

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Max.<String>doublesPerKey());
 // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

首先,转换将 LogMessage 对象作为输入,然后输出键值对形式的 PCollection,这样会将目标端点映射为键以响应时间值,如下图所示。

计算每日汇总指标。

使用该 PCollection,您可以计算两个汇总指标,即每个目标的最长响应时间和每个目标的平均响应时间。由于 PCollection 仍然按日划分,因此每个计算的输出将代表一天的日志数据。这意味着您将有两个最终的 PCollection,一个包含每个目标每天的最长响应时间,另一个包含每个目标每天的平均响应时间。

将数据加载到 BigQuery 中

流水线中的最后一步将生成的 PCollection 输出到 BigQuery,以进行下游分析和数据仓储。

首先,流水线将包含所有日志源的 LogMessage 对象的 PCollection 转换为 BigQuery TableRow 对象的 PCollection。需要完成此步骤才能利用 Cloud Dataflow 的内置支持将 BigQuery 用作流水线的接收器。

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

BigQuery 表需要定义的架构。对于此解决方案,使用默认值注释在 LogAnalyticsPipelineOptions.java 中定义架构。例如,maximum-response-time 表的架构定义如下:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

对包含汇总响应时间值的 PCollection 执行操作会将其转换为 TableRow 对象的 PCollection,应用适当的架构并创建表(如果该表尚不存在)。

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

此解决方案始终将新数据附加到现有数据。这是一个合适的选择,因为此流水线会定期运行以分析新的日志数据。但在其他场景中也可以截断现有的表数据,或者仅在表为空的情况下向表中写入数据(如果其中的某个方法更合适的话)。

查询 BigQuery 中的数据

借助 BigQuery 控制台,您可以针对输出数据运行查询,并连接到第三方商业智能工具(如 TableauQlikView)以进行其他分析。

  1. 在 Cloud Console 中,打开 BigQuery。

    打开 BigQuery

  2. 点击项目 processing-logs-at-scale,然后点击数据集 processing_logs_using_dataflow

  3. 选择 all_logs_table,然后在数据窗格中选择预览,查看“all logs”表中的数据示例

  4. 查询编辑器中输入以下查询:

    SELECT *
    FROM `processing_logs_using_dataflow.max_response_time_table`
    ORDER BY aggResponseTime DESC
    LIMIT 100;
    
  5. 如需运行查询,请点击运行

    BigQuery 控制台针对日志数据运行查询。

使用流式传输流水线

此示例支持以批量或流式模式运行流水线。将流水线从批量更改为流式只需几个步骤。首先,Cloud Logging 设置会将日志记录信息导出到 Pub/Sub 而不是 Cloud Storage。下一步是将 Dataflow 流水线中的输入来源从 Cloud Storage 更改为 Pub/Sub 主题订阅。请注意,每个输入来源都需要一个订阅。

Pub/Sub 流水线使用订阅。

您可以在 logging.sh 中看到使用的 SDK 命令。

根据 Pub/Sub 输入数据创建的 PCollection 使用无界限全局窗口。但是,单个条目已包含时间戳。这意味着没有必要从 Cloud Logging LogEntry 对象提取时间戳数据;相反,只需提取日志时间戳即可创建自定义 LogMessage 对象。

使用 Pub/Sub 流水线时,您可以从日志中提取时间戳

流水线的其余部分保持原样,包括下游展平、转换、汇总和输出操作。

监控流水线

运行 Dataflow 作业时,您可以使用 Google Cloud Console 监控进度并查看流水线中各个阶段的相关信息。

下图展示了运行示例流水线时的 Cloud Console:

Cloud Console 显示正在运行的 Dataflow 作业。

清理

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”页面

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

删除所有组件

确保特定环境变量仍然设置为在设置过程中使用的值

  1. 删除 BigQuery 数据集:

    bq rm $DATASET_NAME
    
  2. 停用 ((logging_name)) 导出内容。此步骤将删除导出内容和指定的 Cloud Storage 存储分区:

    cd ../services
    ./logging.sh $PROJECT_ID $BUCKET_NAME batch down
    
  3. 删除用于运行示例 Web 应用的 Container Engine 集群:

    /cluster.sh $PROJECT_ID $CLUSTER_NAME down
    

扩展解决方案

本解决方案中介绍的流水线和操作集可以通过各种不同方式进行扩展。最显而易见的扩展是针对 LogMessage 数据执行其他汇总。例如,如果日志输出中包含会话或匿名用户信息,则可以围绕用户活动创建汇总。此外,您还可以使用 ApproximateQuantiles 转换生成响应时间的分布信息。

后续步骤

  • 试用其他 Google Cloud 功能。查阅我们的教程
  • 了解如何使用 Google Cloud 产品构建端到端解决方案