使用 Cloud Dataflow 大規模處理記錄

Google Cloud Platform 提供可擴充的基礎架構,方便您處理大型及多樣性記錄分析作業。在本解決方案中,您會學到如何使用 Cloud Platform 建構分析管道,以處理多個來源的記錄項目。您可以合併記錄資料以協助擷取實用資訊,並留存從資料導出的深入分析,以用來進行分析、審查和報告。

總覽

隨著您的應用程序越來越複雜,從記錄擷取資料中收集深入分析也會變得更具挑戰性。記錄的來源日益增多,因此也增加了收集和查詢實用資訊的困難度。如要建構、操作和維護您自己的基礎架構,以利大規模分析記錄資料,則需具備執行分散式系統和儲存空間的廣泛專業知識。這種專用基礎架構通常代表一次性資本支出,導致容量固定,而且很難超出一開始的投資予以擴充。這些限制會拖慢從資料產生實用可行性深入分析的速度,進而影響您的業務。

這套解決方案將示範如何使用 Cloud Platform 克服上述限制。解決方案在 Google Kubernetes Engine 上執行一組實作網站的範例微服務。 Stackdriver Logging 會從這些服務收集記錄,然後將記錄儲存至 Google Cloud Storage 值區。 隨後由 Google Cloud Dataflow 擷取中繼資料以及進行基本匯總運算以處理這些記錄。Cloud Dataflow 管道設計為每日處理記錄元素,根據每日的記錄產生伺服器回應時間的匯總指標。最後將 Cloud Dataflow 的輸出載入 Google BigQuery 表格,並在其中進行分析以產生商業智慧。本解決方案也會說明如何將管道變更為執行「串流模式」,以進行低延遲時間的非同步記錄處理作業。

解決方案使用幾項 Cloud Platform 元件

隨附的教學課程會提供範例 Cloud Dataflow 管道、範例網路應用程式、設定資訊以及執行範例的步驟。

關於應用程式

部署範例會製作購物應用程式的模型。在此範例中,使用者可以造訪零售網站的首頁、瀏覽個別產品,然後嘗試在附近的實體店找到產品。此應用程式由下列三種微服務組成:HomeServiceBrowseServiceLocateService。每項服務均可從共用命名空間的 API 端點取得。使用者可透過將 /home/browse/locate 附加至基本網址來存取服務。

應用程式設定為將傳入 HTTP 要求記錄到 stdout

使用 Kubernetes Engine 搭配 Stackdriver Logging

此範例是在 Kubernetes Engine 叢集中執行微服務,而該叢集是由一組 Google Compute Engine 執行個體或執行 Kubernetes 的「節點」組成。根據預設,Kubernetes Engine 會將每個節點設定為提供監控、健康狀態檢查和集中記錄等各種服務。本解決方案會利用 Stackdriver Logging 的這項內建支援功能,從各個微服務傳送記錄至 Cloud Storage。如果不採用應用程式將資訊記錄至檔案,您也可以設定使用 Kubernetes 進行叢集層級記錄 (但是本解決方案中不提供說明)。

每項微服務都是在叢集的個別 pod 上執行,而每個 pod 則是在節點上執行,並藉由使用 Kubernetes Engine 服務呈現為單一 HTTP 端點。

微服務是在個別節點上執行

叢集中的每個節點都會執行擷取記錄訊息的 Stackdriver Logging 代理程式。Stackdriver Logging 中的記錄日漸完備後,指令碼會利用 Cloud SDK 中提供的 Stackdriver Logging 支援,自動匯出記錄至 Cloud Storage 值區。在 logging.sh 中,範例程式執行的指令類似下方範例:

# Create a Cloud Storage Bucket
gsutil -q mb gs://BUCKET_NAME

# Allow Stackdriver Logging access to the bucket
gsutil -q acl ch -g cloud-logs@google.com:O gs://BUCKET_NAME

# For each microservice, set up Stackdriver Logging exports
gcloud beta logging sinks create SINK_NAME \
  storage.googleapis.com/BUCKET_NAME \
  --log=”kubernetes.home_service…” --project=PROJECT_ID

請注意,您也可以透過記錄檢視器,將記錄設為匯出至 Cloud Storage。本解決方案會採用 SDK,因為那是匯出多個記錄的必要做法。

如果使用 Cloud Storage 當做記錄匯出目標,系統會以小時為單位,將類型為 LogEntry 的記錄項目分批儲存於個別的 JSON 檔案中。這些結構化的 Cloud Logging 項目包含指定建立每個記錄訊息的時機、產生訊息的資源或執行個體,以及訊息的嚴重性等級之類的其他中繼資料。在以下的 Stackdriver Logging 項目範例中,您可於 structPayload.log 元素中查看微服務產生的原始記錄訊息:

{
  "metadata": {
    "projectId": "...",
    "serviceName": "compute.googleapis.com",
    "zone": "us-central1-f",
    "labels": {
      "compute.googleapis.com/resource_id": "4154944251817867710",
      "compute.googleapis.com/resource_type": "instance"
    },
    "timestamp": "2015-09-22T20:01:13Z"
  },
  "insertId": "2015-09-22|13:01:17.636360-07|10.106.196.103|1124257302",
  "log": "kubernetes.browse-service-iaus6_default_sample-browse-service",
  "structPayload": {
    "stream": "stdout",
    "log": "2015/09/22 - 20:01:13 | 404 | 176ns | 10.160.0.1:34790 | GET /browse/46"
  }
}

建立 Cloud Dataflow 管道

Cloud Dataflow 是一項簡單卻強大的系統,可用來執行各種資料處理工作。Dataflow SDK 提供用以表示任何大小的資料集的整合式資料模型,包括持續更新的資料來源不斷產生的資料;因此很適合用來處理本解決方案中的記錄資料。Dataflow 代管服務可執行批次與串流工作。這表示您可以使用單一程式碼基底來進行非同步或同步的即時事件導向資料處理。

Dataflow SDK 經由名為 PCollection 的專用集合類別提供簡單的資料呈現。SDK 則透過 PTransform 類別提供內建和自訂資料轉換。在 Cloud Dataflow 中,轉換代表管道的處理邏輯。轉換可用來進行各種處理作業,例如彙整資料、為值進行數學運算、篩選資料輸出或轉換資料格式。如要進一步瞭解管道、PCollection、轉換以及 I/O 來源和接收器,請參閱 Dataflow 程式設計模型頁面。

下圖顯示針對 Cloud Storage 上儲存的記錄資料進行的管道作業:

Cloud Dataflow 管道具備幾個步驟

圖表看起來可能很複雜,但透過 Cloud Dataflow 即可輕鬆地建構及使用管道。以下幾節說明管道每個階段的具體操作。

接收資料

管道一開始會使用來自 Cloud Storage 值區的輸入,其中包含三項微服務的記錄。每個記錄集合都會成為 String 元素的 PCollection,其中每個元素均對應至單一 LogEntry 物件。在下列程式碼片段中,homeLogsbrowseLogslocateLogs 屬於 PCollection<String>: 類型:

homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));

Dataflow SDK 採用稱為時間區間設定的技術,以因應持續更新的資料集帶來的挑戰。「時間區間設定」的運作方式是依據 PCollection 中個別元素的時間戳記以邏輯方式細分其中的資料。這個範例中的來源類型是 TextIO,因此一開始就會將所有物件讀取至單一全域時間區間 (預設行為)。

將資料收集到物件中

下一步會透過 Flatten 作業,將個別的微服務 PCollection 合併至單一 PCollection。

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

每個來源 PCollection 都含有相同的資料類型,而且採用相同的全域時間區間設定策略,因此非常適合進行這項作業。雖然這套解決方案中每個記錄都擁有相同的來源和結構,但您可在來源和結構不同的情況下延伸使用這種方法。

建立單一 PCollection 後,您現在可以使用自訂轉換,針對記錄項目執行若干步驟,以處理個別 String 元素。步驟如下:

轉換會處理字串訊息以產生記錄訊息

  • 將 JSON 字串取消序列化成 Stackdriver Logging LogEntry Java 物件。
  • LogEntry 中繼資料擷取時間戳記。
  • 使用規則運算式從記錄訊息擷取下列個別欄位:timestampresponseTimehttpStatusCodehttpMethodsource IP 位址和 destination 端點。使用這些欄位建立加上時間戳記的 LogMessage 自訂物件。
  • LogMessage 物件輸出到新的 PCollection

後續程式碼執行的步驟如下:

PCollection<LogMessage> allLogMessages = allLogs
  .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

按日匯總資料

請回想一下,我們的目標是每日處理元素,以根據每日的記錄產生匯總指標。如要達成這項匯總目標,則需採用按日細分資料的時間區間設定功能,藉助 PCollection 中每個 LogMessage 都擁有的時間戳記來完成。Cloud Dataflow 沿著每日界線將 PCollection 分區之後,則支援以時間區間設定 PCollections 的作業將會遵循時間區間設定配置。

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply(Window.named("allLogMessageToDaily").<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

如果已有採用時間區間設定的單一 PCollection,您現在就能針對所有三個多日記錄來源執行單一 Cloud Dataflow 工作,以計算每日匯總指標。

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Combine.<String,Double,Double>perKey(new Max.MaxDoubleFn()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

首先,轉換作業會將 LogMessage 物件當做輸入,然後輸出鍵/值組合的 PCollection,將目標端點當做鍵對應至回應時間值。您可以透過該 PCollection 計算以下兩種匯總指標:每個目標的最大回應時間,以及每個目標的平均回應時間。PCollection 仍是按日分割,因此每筆運算的輸出都會顯示為單日的記錄資料。這表示您最終會擁有兩個 PCollection:一個包含每日每個目標的最大回應時間,另一個則包含每日每個目標的平均回應時間。

計算每日匯總指標

將資料載入 BigQuery

管道的最後一個步驟會將產生的 PCollection 輸出至 BigQuery,以供下游分析和資料倉儲使用。

首先,管道會將包含所有記錄來源之 LogMessage 物件的 PCollection 轉換為 BigQuery TableRow 物件的 PCollection。為了利用 Cloud Dataflow 的內建支援以使用 BigQuery 做為管道的接收器,必須執行此步驟。

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));

BigQuery 資料表必須已定義結構定義。本解決方案會透過預設值註解在 LogAnalyticsPipelineOptions.java 中定義結構定義。比方說,最大回應時間資料表的結構定義如下定義:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

針對包含匯總回應時間值的 PCollection 執行操作,將其轉換成 TableRow 物件的 PCollection 時,則會套用適當的結構定義並建立資料表 (如有遺失)。

logsAsTableRows.apply(BigQueryIO.Write
  .named("allLogsToBigQuery")
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

本解決方案一律會在現有資料附加新資料。此管道會定期執行以分析新的記錄資料,因此很適合選擇這種做法。但是在某些情境下,如果選擇其他做法更具意義,那麼也可能會截斷現有資料表的資料,或是僅將資料寫入空白的資料表中。

查詢資料

BigQuery 主控台可讓您針對輸出資料執行查詢,以及連線至 TableauQlikView 等第三方商業智慧工具以進行其他分析。

BigQuery 主控台會針對記錄資料執行查詢

使用串流管道

範例包括支援採用批次或串流模式來執行管道。只要幾個步驟,即可將管道從批次變更為串流。首先,Stackdriver Logging 設定會將記錄資訊匯出至 Cloud Pub/Sub (而非 Cloud Storage)。下一步則是將 Cloud Dataflow 管道中的輸入來源從 Cloud Storage 變更為 Cloud Pub/Sub 主題訂閱項目。每個輸入來源都必須有一個訂閱項目。

Cloud Pub/Sub 管道會使用訂閱

您可以查看在 logging.sh 中使用的 SDK 指令。

從 Cloud Pub/Sub 輸入資料建立的 PCollection 會使用不受限的全域時間區間。但是,個別項目已包含時間戳記。這表示不需要從 Stackdriver Logging LogEntry 物件擷取時間戳記資料,只要擷取記錄時間戳記,即可建立自訂的 LogMessage 物件。

使用 Cloud Pub/Sub 管道時,您可以從記錄擷取時間戳記

管道的下游整併、轉換、匯總及輸出作業等其餘部分則會保持原樣。

執行管道

您可在教學課程中進一步瞭解設定、建構和部署這個 Cloud Dataflow 管道的步驟,以及部署微服務以及設定 Stackdriver Logging 匯出的必要步驟。執行 Cloud Dataflow 工作時,您可以使用 Google Cloud Platform 主控台監控管道中各個階段的進度及查看相關資訊。

下圖顯示執行範例管道時的主控台使用者介面:

GCP 主控台會顯示正在執行的 Cloud Dataflow 工作

擴充解決方案

您可透過多種不同的方式擴充本解決方案提及的管道和作業組合。最顯而易見的擴充就是針對 LogMessage 資料執行其他匯總作業。舉例來說,如果記錄輸出包含工作階段或匿名的使用者資訊,即可建立和使用者活動相關的匯總資料。您也可以使用 ApproximateQuantiles 轉換來產生回應時間的分佈情形。

資源與費用

本教學課程課程使用幾個 Google Cloud Platform 收費元件,包括:

  • 部署微服務的 Kubernetes Engine
  • 接收及匯出記錄的 Stackdriver Logging
  • 以批次模式儲存匯出記錄的 Cloud Storage
  • 以串流模式串流匯出記錄的 Cloud Pub/Sub
  • 處理記錄資料的 Cloud Dataflow
  • 儲存處理輸出及支援針對該輸出執行各類查詢的 BigQuery

執行本教學課程的費用將會根據執行時間而有所差異。使用 Pricing Calculator 根據您的預測使用量來產生預估費用。新的 Cloud Platform 使用者可能符合申請免費試用的資格。

教學課程

您可以從 GitHub 取得教學課程的完整內容,包括設定的操作說明和原始碼,網址為 https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow

後續步驟

  • 自行試用其他 Google Cloud Platform 功能。請參考我們的教學課程
  • 瞭解如何使用 Google Cloud Platform 產品建構端對端解決方案
本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
解決方案