Java 工作模式

電子商務應用程式範例示範如何使用 Dataflow 導入串流資料分析和即時 AI 的最佳做法。這個範例包含工作模式,可說明完成 Java 程式設計工作的最佳方式。建立電子商務應用程式時,通常需要這些工作。

應用程式包含下列 Java 工作模式:

使用 Apache Beam 結構定義處理結構化資料

您可以使用 Apache Beam 結構定義,簡化結構化資料的處理程序。

將物件轉換為「列」,可產生非常乾淨的 Java 程式碼,讓您更輕鬆地建構有向無環圖 (DAG)。您也可以在建立的 Analytics 陳述式中,將物件屬性參照為欄位,不必呼叫方法。

範例

CountViewsPerProduct.java

使用 JsonToRow 轉換 JSON 資料

在 Dataflow 中處理 JSON 字串是常見需求。舉例來說,系統會處理從網路應用程式擷取的點擊流資訊串流中的 JSON 字串。如要處理 JSON 字串,您需要在管道處理期間,將其轉換為 Rowsplain old Java objects (POJOs)

您可以使用 Apache Beam 內建的轉換 JsonToRow,將 JSON 字串轉換為 Rows。不過,如果您想要建立佇列來處理未成功傳送的訊息,則需要另外建立,請參閱「將無法處理的資料排入佇列,以進行進一步的分析」。

如要使用 AutoValue 將 JSON 字串轉換為 POJO,請使用 @DefaultSchema(AutoValueSchema.class) 註解註冊型別的結構定義,然後使用 Convert 公用程式類別。產生的程式碼類似於下列程式碼:

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

如要進一步瞭解如何從不同的 Java 型別推斷結構定義,請參閱「建立結構定義」。

如果 JsonToRow 無法處理您的資料,可以改用 Gson。Gson 在預設處理資料時相當寬鬆,因此您可能需要在資料轉換程序中加入更多驗證。

範例

使用 AutoValue 程式碼產生器產生 POJO

Apache Beam 結構定義通常是管道中物件的最佳表示方式,因為這類結構定義可讓您處理結構化資料。不過,有時仍需要一般 Java 物件 (POJO),例如處理鍵/值物件或處理物件狀態時。手動建構 POJO 時,您必須為 equals()hashcode() 方法編寫覆寫程式碼,這不僅費時,還容易出錯。不正確的覆寫可能會導致應用程式行為不一致或資料遺失。

如要產生 POJO,請使用 AutoValue 類別建構工具。這個選項可確保使用必要的覆寫,並避免潛在錯誤。Apache Beam 程式碼集內大量使用 AutoValue,因此如果您想使用 Java 在 Dataflow 上開發 Apache Beam 管道,熟悉這個類別建構工具會很有幫助。

您也可以在新增 @DefaultSchema(AutoValueSchema.class) 註解時,使用 Apache Beam 架構。AutoValue詳情請參閱建立結構定義一文。

如要進一步瞭解 AutoValue,請參閱「為何要使用 AutoValue?」和 AutoValue 說明文件

範例

Clickstream.java

將無法處理的資料排入佇列,以進行進一步的分析

在正式版系統中,處理有問題的資料非常重要。盡可能在串流中驗證及修正資料。如果無法修正,請將值記錄到未處理的訊息佇列 (有時稱為「無法傳送的郵件佇列」),以供日後分析。從一種格式轉換為另一種格式的資料時,通常會發生問題,例如將 JSON 字串轉換為「資料列」

如要解決這個問題,請使用多輸出轉換,將包含未處理資料的元素傳輸至另一個 PCollection,以進行進一步分析。這項處理作業很常見,您可能會想在管道中的許多位置使用。請盡量讓轉換夠通用,可在多個位置使用。首先,請建立錯誤物件,包裝常見屬性,包括原始資料。接著,建立具有多個目的地選項的接收器轉換。

範例

依序套用資料驗證轉換

從外部系統收集的資料通常需要清理。盡可能在管道中修正有問題的資料。視需要將資料傳送至佇列以供進一步分析

由於單一訊息可能有多個問題需要修正,請規劃所需的有向無環圖 (DAG)。如果元素包含多個有缺陷的資料,請務必確保元素會經過適當的轉換。

舉例來說,假設某個元素具有下列值,且這兩個值都不應為空值:

{"itemA": null,"itemB": null}

請確認元素會透過轉換流程,修正以下潛在問題:

badElements.apply(fixItemA).apply(fixItemB)

您的管道可能會有更多連續步驟,但融合有助於盡量減少導入的處理負擔。

範例

ValidateAndCorrectCSEvt.java

使用 DoFn.StartBundle 對外部服務進行微批次呼叫

您可能需要在管道中叫用外部 API。由於管道會將工作分配給許多運算資源,因此針對流經系統的每個元素進行單一呼叫,可能會導致外部服務端點不堪負荷。如果您未套用任何縮減函式,就特別容易發生這個問題。

為避免發生這個問題,請批次呼叫外部系統。

您可以使用 GroupByKey 轉換或 Apache Beam Timer API 批次處理呼叫。不過,這兩種方法都需要重組,這會造成一些處理負擔,而且需要魔術數字來判斷鍵盤空間。

請改用 StartBundleFinishBundle 生命週期元素,將資料分批處理。使用這些選項時,不需要重組資料。

這個選項的缺點是,套件大小會由執行器的實作項目動態決定,取決於管道及其工作站目前執行的作業。在串流模式中,通常會使用較小的套件。Dataflow 捆綁作業會受到後端因素影響,例如分片使用情形、特定鍵可用的資料量,以及管道的輸送量。

範例

EventItemCorrectionService.java

使用適當的側邊輸入模式來擴充資料

在串流分析應用程式中,資料通常會加入額外資訊,以利進一步分析。舉例來說,如果您有交易的商店 ID,可能想加入商店位置的相關資訊。通常是透過擷取元素,並從查閱表匯入資訊,來新增這類額外資訊。

如果對照表變更速度緩慢且大小較小,將表格做為實作 Map<K,V> 介面的單例類別帶入管道,效果會很好。這個選項可避免每個元素都進行 API 呼叫來查詢。在管道中加入表格副本後,您必須定期更新,確保內容為最新狀態。

如要處理更新速度緩慢的側邊輸入,請使用 Apache Beam Side input patterns

快取

側邊輸入會載入記憶體,因此會自動快取。

您可以使用 --setWorkerCacheMb 選項設定快取大小。

您可以在 DoFn 執行個體之間共用快取,並使用外部觸發條件重新整理快取。

範例

SlowMovingStoreLocationDimension.java