使用 Cloud Dataflow 大規模處理記錄

Google Cloud Platform (GCP) 提供可擴充的基礎架構,方便您處理大型及多樣性記錄分析作業。本教學課程說明如何使用 GCP 建構分析管道,以處理多個來源的記錄項目。您可以根據需求合併記錄資料,以利擷取實用的資訊,並留存從資料導出的深入分析,然後進一步用於分析、審查和報告。

總覽

隨著您的應用程式越來越複雜,針對從記錄檔擷取的資料進行深入分析,也將變得更具挑戰性。記錄的來源日益增多,因此也增加了收集和查詢實用資訊的困難度。如要建構、操作和維護您自己的基礎架構,以利大規模分析記錄資料,則需具備執行分散式系統和儲存空間的廣泛專業知識。這種專用基礎架構通常代表一次性資本支出,導致容量固定,而且難以將最初的投資再加以擴充。這些限制會拖慢從資料產生實用可行性深入分析的速度,進而影響您的業務。

這個解決方案說明如何使用 GCP 產品來克服上述限制,如下圖所示。

解決方案使用數個 GCP 元件

在這個解決方案中,會在 Google Kubernetes Engine (GKE) 上執行一組實作網站的範例微服務。 Stackdriver Logging 會從這些服務收集記錄,然後將記錄儲存至 Cloud Storage 值區。 隨後由 Cloud Dataflow 擷取中繼資料並進行基本匯總運算來處理相關記錄。Cloud Dataflow 管道設計為每日處理記錄元素,根據每日的記錄產生伺服器回應時間的匯總指標。最後將 Cloud Dataflow 的輸出載入 BigQuery 表格,並在其中進行分析以產生可行的商務情報。本解決方案也會說明如何將管道變更為執行「串流模式」,以進行低延遲時間的非同步記錄處理作業。

本教學課程將提供一個範例 Cloud Dataflow 管道、一個網頁應用程式範例、設定資訊,以及執行範例的相關步驟。

費用

本教學課程使用下列 Google Cloud Platform 計費元件:

  • 部署微服務的 GKE。
  • 接收及匯出記錄的 Logging。
  • 以批次模式儲存匯出記錄的 Cloud Storage。
  • 以串流模式串流匯出記錄的 Cloud Pub/Sub。
  • 處理記錄資料的 Cloud Dataflow。
  • 儲存處理輸出及支援針對該輸出執行各類查詢的 BigQuery。

您可以使用 Pricing Calculator,根據您的預測使用量來產生預估費用。 初次使用 GCP 的使用者可能符合申請免費試用的資格。

完成此教學課程後,您可刪除已建立的資源以免繼續計費。詳情請參閱清除所用資源一節。

事前準備

  1. 登入您的 Google 帳戶。

    如果您沒有帳戶,請申請新帳戶

  2. 選取或建立 Google Cloud Platform 專案。

    前往「Manage resources」(管理資源) 頁面

  3. 請確認您已啟用 Google Cloud Platform 專案的計費功能。

    瞭解如何啟用計費功能

  4. 啟用BigQuery, Cloud Storage, Cloud Pub/Sub, Cloud Dataflow, GKE and Logging API。

    啟用 API

  5. 建立 Stackdriver 工作區。如要進一步瞭解工作區,請參閱管理工作區一文。

    前往 Stackdriver

設定您的環境

在本教學課程中,您將使用 Cloud Shell 輸入指令。Cloud Shell 可讓您在 Google Cloud Platform 主控台中存取指令列,其中包括 Cloud SDK 以及在 GCP 中進行開發所需的其他工具。Cloud Shell 會以視窗的形式顯示在 GCP 主控台的底部。初始化可能需要幾分鐘的時間,但該視窗會立即顯示。

如何使用 Cloud Shell 來設定您的環境並複製此教學課程中使用的 Git 存放區:

  1. 在 GCP 主控台中,開啟 Cloud Shell。

    開啟 Cloud Shell

  2. 確認您使用的是剛才建立的專案。請將 [YOUR_PROJECT_ID] 替換成您新建立的 GCP 專案。

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. 設定預設運算區域。基於本教學課程的用途,請設定為 us-east1。如果您要部署到實際工作環境,請部署到您選擇的地區

    export REGION=us-east1
    gcloud config set compute/region $REGION
    

複製範例存放區

  • 複製內有本教學課程所用指令碼和應用程式邏輯的存放區。

    git clone https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.git
    cd processing-logs-using-dataflow/services
    

設定環境變數

# name your bucket
export PROJECT_ID=[YOUR_PROJECT_ID]
# name your GKE cluster
export CLUSTER_NAME=cluster-processing-logs-using-dataflow

# name the bucket for this tutorial
export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

# name the logging sink for this tutorial
export SINK_NAME=sink-processing-logs-using-dataflow

# name the logging sink for this tutorial
export DATASET_NAME=processing_logs_using_dataflow

在新的 Google Kubernetes Engine 叢集上部署範例應用程式

# create the cluster and deploy sample services
./cluster.sh $PROJECT_ID $CLUSTER_NAME up

關於範例應用程式部署

部署範例會製作購物應用程式的模型。在此範例中,使用者可以造訪零售網站的首頁、瀏覽個別產品,然後嘗試在附近的實體店找到產品。這個應用程式由 HomeServiceBrowseServiceLocateService 這三種微服務組成。每項服務均可從共用命名空間的 API 端點取得。使用者可透過將 /home/browse/locate 附加至基本網址來存取服務。

應用程式設定為將傳入 HTTP 要求記錄到 stdout

使用 Google Kubernetes Engine 搭配 Stackdriver Logging

這個範例是在 Kubernetes Engine 叢集中執行微服務,而該叢集是由一組 Compute Engine 執行個體或執行 Kubernetes 的「節點」組成。根據預設,GKE 會將每個節點設定為提供監控、健康狀態檢查和集中記錄等各種服務。本解決方案會利用 Logging 的這項內建支援功能,從各個微服務傳送記錄至 Cloud Storage。如果不採用應用程式將資訊記錄至檔案,您也可以設定為使用 Kubernetes 進行叢集層級記錄 (不過這部分不在本解決方案的說明範圍)。

每項微服務都是在叢集的個別 pod 上執行,而每個 pod 則是在節點上執行,並藉由使用 GKE 服務呈現為單一 HTTP 端點。

微服務是在個別節點上執行。

叢集中的每個節點都會執行 Stackdriver Logging 代理程式來擷取記錄訊息。Logging 中有記錄檔之後,指令碼會從 Cloud SDK 使用可用的 Logging 支援,自動將記錄檔匯出至 Cloud Storage 值區。

請注意,您也可以透過記錄檢視器,將記錄設為匯出至 Cloud Storage。由於匯出多個記錄檔需要 Cloud SDK,因此這個解決方案會採用 Cloud SDK。

如果使用 Cloud Storage 當做記錄檔匯出目標,系統會以小時為單位,將類型為 LogEntry 的記錄項目分批儲存於個別的 JSON 檔案中。這些結構化的 Logging 項目包含指定建立每個記錄訊息的時機、產生訊息的資源或執行個體,以及訊息的嚴重性等級等其他中繼資料。在下列 Logging 項目範例的 structPayload.log 元素中,您可以看到微服務產生的原始記錄訊息。

 {
    "insertId": "ugjuig3j77zdi",
    "labels": {
        "compute.googleapis.com/resource_name": "fluentd-gcp-v3.2.0-9q4tr",
        "container.googleapis.com/namespace_name": "default",
        "container.googleapis.com/pod_name": "browse-service-rm7v9",
        "container.googleapis.com/stream": "stdout"
    },
    "logName": "projects/processing-logs-at-scale/logs/browse-service",
    "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
    "resource": {
        "labels": {
            "cluster_name": "cluster-processing-logs-using-dataflow",
            "container_name": "browse-service",
            "instance_id": "640697565266753757",
            "namespace_id": "default",
            "pod_id": "browse-service-rm7v9",
            "project_id": "processing-logs-at-scale",
            "zone": "us-east1-d"
        },
        "type": "container"
    },
    "severity": "INFO",
    "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs |      10.142.0.6 | GET      /browse/product/1\n",
    "timestamp": "2019-03-09T00:33:23.743466177Z"
 }

設定記錄功能

當服務已部署完成,且叢集開始執行之後,您就可以為應用程式設定記錄功能。

首先請為叢集取得憑證,而 kubectl 可取得用來設定 Stackdriver Logging 匯出接收器的服務名稱。

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION

在程式碼存放區中,services/logging.sh 會針對批次或串流模式設定必要元件。這個指令碼可接受以下參數:

logging.sh [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]

為配合本教學課程,請啟動批次記錄:

./logging.sh $PROJECT_ID $BUCKET_NAME batch up

以下步驟說明執行批次模式時所用的指令:

  1. 建立 Cloud Storage 值區。

    gsutil -q mb gs://[BUCKET_NAME]

  2. 允許 Stackdriver Logging 存取值區。

    gsutil -q acl ch -g cloud-logs@google.com:O gs://[BUCKET_NAME]

  3. 針對每個微服務,使用接收器來設定 Stackdriver 匯出。

    gcloud logging sinks create [SINK_NAME] \ storage.googleapis.com/[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

更新目的地權限

在這個情況下,您無法在建立接收器時修改目的地權限,也就是您 Cloud Storage 值區的權限。您必須變更 Cloud Storage 值區的權限設定,才能授予接收器寫入權限。

如要更新 Cloud Storage 值區權限:

  1. 識別接收器的「Writer Identity」(寫入者身分)

    1. 前往「Logs Viewer」(記錄檢視器) 頁面:

      前往記錄檢視器頁面

    2. 在左側選單中,選取 [Exports] (匯出) 來查看接收器摘要,包括接收器的「Writer Identity」(寫入者身分)

    3. 重要事項:這 3 個接收器有各自的服務帳戶電子郵件,您必須為每一個電子郵件授予 Cloud Storage 值區的權限。

  2. 從 GCP 主控台按一下 [Storage] (儲存空間) > [Browser] (瀏覽器)

    前往瀏覽器

  3. 如要開啟詳細資料視圖,請按值區名稱。

  4. 選取 [Permissions] (權限),然後按一下 [Add members] (新增成員)

  5. 將「Role」(角色) 設定為 Storage Object Creator,然後輸入接收器的寫入者身分。

詳情請參閱目的地權限

您可以使用下列指令來查看記錄物件路徑:

gsutil ls gs://processing-logs-at-scale-processing-logs-using-dataflow/ | grep service

等輸出結果包含下方全部三個項目時,您就可以繼續執行資料管道:

 gs://processing-logs-at-scale-processing-logs-using-dataflow/browse-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/home-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/locate-service/

建立 BigQuery 資料集

bq mk $DATASET_NAME

在應用程式服務上產生一些負載

安裝 Apache HTTP 伺服器公用程式

您可使用 Apache HTTP 伺服器基準化工具 (ab),在應用程式服務中產生負載。

sudo apt-get update

sudo apt-get install -y apache2-utils

load.sh 殼層指令碼會要求 HomeServiceBrowseServiceLocateService 回應,藉此在微服務上產生負載。

單一負載集包含一個對主畫面服務的要求,以及二十 (20) 個分別對瀏覽和定位服務的要求。

以下選項將會產生一千 (1000) 個負載集 (已啟用設為 3 個同時要求的並行服務)

cd ../services
./load.sh 1000 3

請讓這個指令執行數分鐘,等候系統產生足夠的記錄。

啟動 Cloud Dataflow 管道

等服務收到足夠的流量之後,您就可以啟動 Dataflow 管道。

為配合本教學課程,Cloud Datalflow 管道將會以批次模式執行。pipeline.sh 殼層指令碼會手動啟動管道。

cd ../dataflow
./pipeline.sh $PROJECT_ID $DATASET_NAME $BUCKET_NAME run

Cloud Dataflow 管道簡介

Cloud Dataflow 可用來執行各種資料處理工作。Cloud Dataflow SDK 提供的是可呈現任何大小的資料集的整合式資料模型,包括來源持續更新而產生的大型資料集或無限資料集;因此很適合用來處理此解決方案中的記錄檔資料。Cloud Dataflow 代管服務可執行批次與串流工作。這表示您可以使用單一程式碼基底來進行非同步或同步的即時事件導向資料處理。

Dataflow SDK 經由名為 PCollection 的專用集合類別提供簡單的資料呈現。SDK 則透過 PTransform 類別提供內建和自訂資料轉換。在 Cloud Dataflow 中,轉換代表管道的處理邏輯。轉換可用來進行各種處理作業,例如彙整資料、為值進行數學運算、篩選資料輸出或轉換資料格式。如要進一步瞭解管道、PCollection、轉換以及 I/O 來源和接收器,請參閱 Dataflow 程式設計模型頁面。

下圖顯示針對 Cloud Storage 上儲存的記錄資料進行的管道作業:

管道的作業步驟。

圖表看起來可能很複雜,但透過 Cloud Dataflow 即可輕鬆地建構及使用管道。以下幾節說明管道每個階段的具體操作。

接收資料

管道一開始會使用來自 Cloud Storage 值區的輸入,其中包含三項微服務的記錄。每個記錄集合都會成為 String 元素的 PCollection,其中每個元素均對應至單一 LogEntry 物件。在下列程式碼片段中,homeLogsbrowseLogslocateLogs 屬於 PCollection<String>: 類型:

homeLogs = p.apply("homeLogsTextRead", TextIO.read().from(options.getHomeLogSource()));
browseLogs = p.apply("browseLogsTextRead", TextIO.read().from(options.getBrowseLogSource()));
locateLogs = p.apply("locateLogsTextRead", TextIO.read().from(options.getLocateLogSource()));

Dataflow SDK 採用稱為時間區間設定的技術,以因應持續更新的資料集帶來的挑戰。「時間區間設定」的運作方式是依據 PCollection 中個別元素的時間戳記,以邏輯方式細分其中的資料。由於這個範例中的來源類型是 TextIO,所以系統一開始就將所有物件讀入單一全域時間區間 (預設行為)。

將資料收集到物件中

下一步會透過 Flatten 作業,將個別的微服務 PCollection 合併至單一 PCollection。

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

每個來源 PCollection 都含有相同的資料類型,而且採用相同的全域時間區間設定策略,因此非常適合進行這項作業。雖然這套解決方案中每個記錄都擁有相同的來源和結構,但您可在來源和結構不同的情況下延伸使用這種方法。

建立單一 PCollection 後,您現在可以使用自訂轉換,針對記錄項目執行若干步驟,以處理個別 String 元素。步驟如下圖所示:

轉換會處理字串訊息以產生記錄訊息。

  • 將 JSON 字串取消序列化成 Stackdriver Logging LogEntry Java 物件。
  • LogEntry 中繼資料擷取時間戳記。
  • 使用規則運算式從記錄訊息擷取下列個別欄位:timestampresponseTimehttpStatusCodehttpMethodsource IP 位址和 destination 端點。使用這些欄位建立加上時間戳記的 LogMessage 自訂物件。
  • LogMessage 物件輸出到新的 PCollection

後續程式碼執行的步驟如下:

PCollection<LogMessage> allLogMessages = allLogs
  .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

按日匯總資料

請回想一下,我們的目標是每日處理元素,以根據每日的記錄產生匯總指標。如要達成這項匯總目標,則需採用按日細分資料的時間區間設定功能,藉助 PCollection 中每個 LogMessage 都擁有的時間戳記來完成。Cloud Dataflow 沿著每日界線將 PCollection 分區之後,支援以時間區間設定 PCollections 的作業則會遵循時間區間設定配置。

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

如果已有採用時間區間設定的單一 PCollection,您現在就能針對所有三個多日記錄來源執行單一 Cloud Dataflow 工作,以計算每日匯總指標。

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Max.<String>doublesPerKey());
 // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

首先,轉換作業會將 LogMessage 物件做為輸入,然後輸出鍵/值組合的 PCollection,將目標端點當做鍵,對應至回應時間值,如下圖所示。

計算每日匯總指標。

您可以透過該 PCollection 計算以下兩種匯總指標:每個目標的最大回應時間,以及每個目標的平均回應時間。PCollection 仍是按日分割,因此每筆運算的輸出都會顯示為單日的記錄資料。這表示您最終會擁有兩個 PCollection:一個包含每日每個目標的最大回應時間,另一個則包含每日每個目標的平均回應時間。

將資料載入 BigQuery

管道的最後一個步驟會將產生的 PCollection 輸出至 BigQuery,以供下游分析和資料倉儲使用。

首先,管道會將包含所有記錄來源之 LogMessage 物件的 PCollection 轉換為 BigQuery TableRow 物件的 PCollection。為了利用 Cloud Dataflow 的內建支援以使用 BigQuery 做為管道的接收器,必須執行此步驟。

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

BigQuery 資料表必須已定義結構定義。本解決方案會透過預設值註解在 LogAnalyticsPipelineOptions.java 中定義結構定義。比方說,最大回應時間資料表的結構定義如下定義:

@Default.String("destination:STRING,aggResponseTime:FLOAT")

針對包含匯總回應時間值的 PCollection 執行操作,將其轉換成 TableRow 物件的 PCollection 時,則會套用適當的結構定義並建立資料表 (如有遺失)。

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

本解決方案一律會在現有資料附加新資料。此管道會定期執行以分析新的記錄資料,因此很適合選擇這種做法。但是在某些情境下,如果選擇其他做法更具意義,那麼也可能會截斷現有資料表的資料,或是僅將資料寫入空白的資料表中。

從 BigQuery 查詢資料

BigQuery 主控台可讓您針對輸出資料執行查詢,以及連線至 TableauQlikView 等第三方商業智慧工具以進行其他分析。

  1. 在 GCP 主控台中,開啟 BigQuery。

    開啟 BigQuery

  2. 按一下專案 processing-logs-at-scale,然後按一下資料集 processing_logs_using_dataflow

  3. 選取 [all_logs_table],在資料窗格中選取 [Preview] (預覽),然後查看「all logs」(所有記錄檔) 資料表中的資料範例。

  4. 在「Query editor」(查詢編輯器) 中輸入以下查詢:

    SELECT *
    FROM `processing_logs_using_dataflow.max_response_time_table`
    ORDER BY aggResponseTime DESC
    LIMIT 100;
    
  5. 如要執行查詢,請按一下 [Run] (執行)

    BigQuery 主控台會針對記錄資料執行查詢。

使用串流管道

範例包括支援採用批次或串流模式來執行管道。只要幾個步驟,即可將管道從批次變更為串流。首先,Stackdriver Logging 設定會將記錄資訊匯出至 Cloud Pub/Sub (而非 Cloud Storage)。下一步則是將 Cloud Dataflow 管道中的輸入來源從 Cloud Storage 變更為 Cloud Pub/Sub 主題訂閱項目。每個輸入來源都必須有一個訂閱項目。

Cloud Pub/Sub 管道會使用訂閱。

您可以查看在 logging.sh 中使用的 SDK 指令。

從 Cloud Pub/Sub 輸入資料建立的 PCollection 會使用不受限的全域時間區間。但是,個別項目已包含時間戳記。這表示不需要從 Stackdriver Logging LogEntry 物件擷取時間戳記資料,只要擷取記錄時間戳記,即可建立自訂的 LogMessage 物件。

使用 Cloud Pub/Sub 管道時,您可以從記錄擷取時間戳記

管道的下游整併、轉換、匯總及輸出作業等其餘部分則會保持原樣。

監控管道

執行 Cloud Dataflow 工作時,您可以使用 Google Cloud Platform 主控台,監控管道中各個階段的進度及查看相關資訊。

下圖顯示執行範例管道時的 GCP 主控台:

GCP 主控台會顯示正在執行的 Cloud Dataflow 工作。

清除

刪除專案

  1. 前往 GCP 主控台的「Projects」(專案) 頁面。

    前往專案頁面

  2. 在專案清單中選取要刪除的專案,然後按一下 [Delete] (刪除)
  3. 在對話方塊中輸入專案 ID,按一下 [Shut down] (關閉) 即可刪除專案。

刪除所有元件

確保特定環境變數仍維持為在設定期間使用的值。

  1. 刪除 BigQuery 資料集:

    bq rm $DATASET_NAME
    
  2. 停用 Cloud Logging 匯出功能。這個步驟將刪除匯出項目及指定的 Cloud Storage 值區:

    cd ../services
    ./logging.sh $PROJECT_ID $BUCKET_NAME batch down
    
  3. 刪除用來執行範例網頁應用程式的 Compute Engine 叢集:

    /cluster.sh $PROJECT_ID $CLUSTER_NAME down
    

擴充解決方案

您可透過多種不同的方式擴充本解決方案提及的管道和作業組合。最顯而易見的擴充就是針對 LogMessage 資料執行其他匯總作業。舉例來說,如果記錄輸出包含工作階段或匿名的使用者資訊,即可建立和使用者活動相關的匯總資料。您也可以使用 ApproximateQuantiles 轉換來產生回應時間的分佈情形。

後續步驟

  • 歡迎自行試用其他 Google Cloud Platform 功能。請參考我們的教學課程
  • 瞭解如何使用 Google Cloud Platform 產品建構端對端解決方案
本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
解決方案