本文說明如何使用 Dataplex Universal Catalog Explore,偵測零售交易資料集中的異常值。
資料探索工作平台 (即「探索」) 可讓資料分析師即時以互動方式查詢及探索大型資料集。Explore 可協助您從資料中取得洞察,並查詢儲存在 Cloud Storage 和 BigQuery 中的資料。Explore 使用無伺服器 Spark 平台,因此您不必管理及擴充基礎架構。
目標
本教學課程將說明如何完成下列工作:
- 使用 Explore 的 Spark SQL 工作台編寫及執行 Spark SQL 查詢。
- 使用 JupyterLab 筆記本查看結果。
- 排定 Notebook 的定期執行時間,以便監控資料是否有異常。
費用
在本文件中,您會使用 Google Cloud的下列計費元件:
您可以使用 Pricing Calculator 根據預測用量產生預估費用。
完成本文件所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱「清除所用資源」。
事前準備
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
準備探索資料
下載 Parquet 檔案
retail_offline_sales_march
。建立名為
offlinesales_curated
的 Cloud Storage 值區,如下所示:- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
-
In the Get started section, do the following:
- Enter a globally unique name that meets the bucket naming requirements.
- To add a
bucket label,
expand the Labels section ( ),
click add_box
Add label, and specify a
key
and avalue
for your label.
-
In the Choose where to store your data section, do the following:
- Select a Location type.
- Choose a location where your bucket's data is permanently stored from the Location type drop-down menu.
- If you select the dual-region location type, you can also choose to enable turbo replication by using the relevant checkbox.
- To set up cross-bucket replication, select
Add cross-bucket replication via Storage Transfer Service and
follow these steps:
Set up cross-bucket replication
- In the Bucket menu, select a bucket.
In the Replication settings section, click Configure to configure settings for the replication job.
The Configure cross-bucket replication pane appears.
- To filter objects to replicate by object name prefix, enter a prefix that you want to include or exclude objects from, then click Add a prefix.
- To set a storage class for the replicated objects, select a storage class from the Storage class menu. If you skip this step, the replicated objects will use the destination bucket's storage class by default.
- Click Done.
-
In the Choose how to store your data section, do the following:
- Select a default storage class for the bucket or Autoclass for automatic storage class management of your bucket's data.
- To enable hierarchical namespace, in the Optimize storage for data-intensive workloads section, select Enable hierarchical namespace on this bucket.
- In the Choose how to control access to objects section, select whether or not your bucket enforces public access prevention, and select an access control method for your bucket's objects.
-
In the Choose how to protect object data section, do the
following:
- Select any of the options under Data protection that you
want to set for your bucket.
- To enable soft delete, click the Soft delete policy (For data recovery) checkbox, and specify the number of days you want to retain objects after deletion.
- To set Object Versioning, click the Object versioning (For version control) checkbox, and specify the maximum number of versions per object and the number of days after which the noncurrent versions expire.
- To enable the retention policy on objects and buckets, click the Retention (For compliance) checkbox, and then do the following:
- To enable Object Retention Lock, click the Enable object retention checkbox.
- To enable Bucket Lock, click the Set bucket retention policy checkbox, and choose a unit of time and a length of time for your retention period.
- To choose how your object data will be encrypted, expand the Data encryption section (Data encryption method. ), and select a
- Select any of the options under Data protection that you
want to set for your bucket.
-
In the Get started section, do the following:
- Click Create.
按照「從檔案系統上傳物件」一節的步驟,將下載的
offlinesales_march_parquet
檔案上傳至您建立的offlinesales_curated
Cloud Storage 值區。按照「建立湖泊」中的步驟,建立 Dataplex Universal Catalog 湖泊並命名為
operations
。在
operations
湖泊中,按照「新增可用區」中的步驟新增可用區並命名為procurement
。在
procurement
區中,按照「新增素材資源」中的步驟,將您建立的offlinesales_curated
Cloud Storage 值區設為素材資源。
選取要探索的資料表
在 Google Cloud 控制台中,前往 Dataplex 通用目錄的「Explore」頁面。
在「Lake」(湖泊) 欄位中,選取
operations
湖泊。按一下
operations
湖泊。前往
procurement
區域,然後按一下資料表,即可探索中繼資料。在下圖中,所選購買區域包含名為
Offline
的資料表,其中包含中繼資料:orderid
、product
、quantityordered
、unitprice
、orderdate
和purchaseaddress
。在 Spark SQL 編輯器中,按一下
「新增」。系統會顯示 Spark SQL 指令碼。選用:在分割分頁檢視畫面中開啟指令碼,即可並排查看中繼資料和新指令碼。按一下新指令碼分頁中的
「更多」,然後選取「將分頁分割至右側」或「將分頁分割至左側」。
探索資料
環境會提供無伺服器運算資源,讓 Spark SQL 查詢和筆記本可在資料湖中執行。編寫 Spark SQL 查詢前,請建立環境來執行查詢。
使用下列 SparkSQL 查詢探索資料。在 SparkSQL 編輯器中,將查詢輸入「New Script」窗格。
表格 10 列的範例
輸入下列查詢:
select * from procurement.offlinesales where orderid != 'orderid' limit 10;
按一下「執行」。
取得資料集中的交易總數
輸入下列查詢:
select count(*) from procurement.offlinesales where orderid!='orderid';
按一下「執行」。
找出資料集中的不同產品類型數量
輸入下列查詢:
select count(distinct product) from procurement.offlinesales where orderid!='orderid';
按一下「執行」。
找出交易價值較高的產品
依產品類型和平均售價細分銷售資料,瞭解哪些產品的交易價值較高。
輸入下列查詢:
select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
按一下「執行」。
下圖顯示 Results
窗格,其中使用名為 product
的資料欄,找出交易價值較高的銷售商品,並顯示在名為 avg_sales_amount
的資料欄中。
使用變異係數偵測異常
上次查詢顯示筆電的平均交易金額很高。下列查詢說明如何偵測資料集中非異常的筆電交易。
以下查詢使用「變異係數」指標 rsd_value
,找出值分布相對於平均值較低的交易,這些交易並非異常。變異係數越低,異常值就越少。
輸入下列查詢:
WITH stats AS ( SELECT product, AVG(quantityordered * unitprice) AS avg_value, STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value FROM procurement.offlinesales GROUP BY product) SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount, ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg FROM procurement.offlinesales INNER JOIN stats USING (product) WHERE rsd_value <= 0.2 ORDER BY distance_from_avg DESC LIMIT 10
按一下「執行」。
查看指令碼結果。
在下圖中,結果窗格使用名為 product 的資料欄,找出交易值介於變異係數 0.2 以內的銷售項目。
使用 JupyterLab 筆記本以視覺化方式呈現異常現象
建構機器學習模型,以便大規模偵測及呈現異常資料。
在另一個分頁中開啟 Notebook,等待載入。執行 Spark SQL 查詢的工作階段會繼續執行。
匯入必要套件,並連線至包含交易資料的 BigQuery 外部資料表。請執行下列程式碼:
from google.cloud import bigquery from google.api_core.client_options import ClientOptions import os import warnings warnings.filterwarnings('ignore') import pandas as pd project = os.environ['GOOGLE_CLOUD_PROJECT'] options = ClientOptions(quota_project_id=project) client = bigquery.Client(client_options=options) client = bigquery.Client() #Load data into DataFrame sql = '''select * from procurement.offlinesales limit 100;''' df = client.query(sql).to_dataframe()
執行隔離森林演算法,找出資料集中的異常值:
to_model_columns = df.columns[2:4] from sklearn.ensemble import IsolationForest clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \ max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0) clf.fit(df[to_model_columns]) pred = clf.predict(df[to_model_columns]) df['anomaly']=pred outliers=df.loc[df['anomaly']==-1] outlier_index=list(outliers.index) #print(outlier_index) #Find the number of anomalies and normal points here points classified -1 are anomalous print(df['anomaly'].value_counts())
使用 Matplotlib 視覺化工具繪製預測異常:
import numpy as np from sklearn.decomposition import PCA pca = PCA(2) pca.fit(df[to_model_columns]) res=pd.DataFrame(pca.transform(df[to_model_columns])) Z = np.array(res) plt.title("IsolationForest") plt.contourf( Z, cmap=plt.cm.Blues_r) b1 = plt.scatter(res[0], res[1], c='green', s=20,label="normal points") b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20, edgecolor="red",label="predicted outliers") plt.legend(loc="upper right") plt.show()
這張圖片顯示交易資料,其中異常資料以紅色標示。
安排筆記本執行時間
您可以使用 Explore 排定筆記本的定期執行時間。按照步驟排定您建立的 Jupyter Notebook 執行時間。
Dataplex 通用目錄會建立排程工作,定期執行筆記本。如要監控工作進度,請按一下「查看時間表」。
分享或匯出筆記本
您可以使用 IAM 權限,與貴機構中的其他人共用 Notebook。
查看角色。為這個 Notebook 的使用者授予或撤銷 Dataplex 通用目錄檢視者 (roles/dataplex.viewer
)、Dataplex 通用目錄編輯者 (roles/dataplex.editor
) 和 Dataplex 通用目錄管理員 (roles/dataplex.admin
) 角色。共用筆記本後,具有湖泊層級檢視者或編輯者角色的使用者就能前往湖泊,並在共用筆記本上進行操作。
清除所用資源
如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。
刪除專案
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
刪除個別資源
-
刪除值區:
gcloud storage buckets delete BUCKET_NAME
-
刪除執行個體:
gcloud compute instances delete INSTANCE_NAME