本文档介绍了如何使用 Dataplex 探索功能检测零售交易数据集中的异常值。
借助数据探索工作台(简称“探索”),数据分析师可以实时地以交互方式查询和探索大型数据集。“探索”工具可帮助您从数据中获取数据洞见,并可让您查询存储在 Cloud Storage 和 BigQuery 中的数据。探索使用无服务器 Spark 平台,因此您无需管理和扩缩底层基础架构。
目标
本教程介绍如何完成以下任务:
- 使用探索的 Spark SQL 工作台编写和执行 Spark SQL 查询。
- 使用 JupyterLab 笔记本查看结果。
- 安排笔记本定期执行,以便监控数据异常。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID
替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID
替换为您的 Google Cloud 项目 名称。
-
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID
替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID
替换为您的 Google Cloud 项目 名称。
-
准备数据以进行探索
下载 Parquet 文件
retail_offline_sales_march
。创建一个名为
offlinesales_curated
的 Cloud Storage 存储桶,如下所示:按照从文件系统上传对象中的步骤,将您下载的
offlinesales_march_parquet
文件上传到您创建的offlinesales_curated
Cloud Storage 存储桶。按照创建数据湖中的步骤创建一个 Dataplex 数据湖,并将其命名为
operations
。在
operations
数据湖中,按照添加区域中的步骤添加一个区域并将其命名为procurement
。按照添加资产中的步骤,在
procurement
区域中,将您创建的offlinesales_curated
Cloud Storage 存储桶添加为资产。
选择要探索的表
在 Google Cloud 控制台中,转到 Dataplex 探索页面。
在湖泊字段中,选择
operations
数据湖。点击
operations
数据湖。导航到
procurement
区域,然后点击该表以浏览其元数据。在下图中,所选采购区域有一个名为
Offline
的表,其中包含以下元数据:orderid
、product
、quantityordered
、unitprice
、orderdate
和purchaseaddress
。在 Spark SQL Editor 中,点击
添加。此时会出现 Spark SQL 脚本。可选:在拆分标签页视图中打开脚本,并排查看元数据和新脚本。点击新脚本标签页中的
更多,然后选择将标签页拆分到右侧或将标签页拆分到左侧。
探索数据
环境为 Spark SQL 查询和笔记本提供无服务器计算资源,以便在数据湖内运行。在编写 Spark SQL 查询之前,请创建环境以运行查询。
使用以下 SparkSQL 查询来探索数据。在 SparkSQL 编辑器的新建脚本窗格中,输入查询。
对表的 10 行进行采样
输入以下查询:
select * from procurement.offlinesales where orderid != 'orderid' limit 10;
点击运行。
获取数据集中的总事务数
输入以下查询:
select count(*) from procurement.offlinesales where orderid!='orderid';
点击运行。
确定数据集中不同商品类型的数量
输入以下查询:
select count(distinct product) from procurement.offlinesales where orderid!='orderid';
点击运行。
查找交易价值较大的产品
通过按产品类型和平均售价对销量进行细分,了解哪些产品的交易价值较大。
输入以下查询:
select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
点击运行。
下图显示了一个 Results
窗格,该窗格使用 product
列来识别交易价值较大的销售商品,如 avg_sales_amount
列中所示。
使用变异系数检测异常情况
最后一个查询显示笔记本电脑的平均交易金额较高。以下查询展示了如何检测数据集中未异常的笔记本电脑事务。
以下查询使用“变异系数”指标 rsd_value
来查找不异常的交易,其中值的差距低于平均值。变异系数越低,表示异常值越少。
输入以下查询:
WITH stats AS ( SELECT product, AVG(quantityordered * unitprice) AS avg_value, STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value FROM procurement.offlinesales GROUP BY product) SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount, ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg FROM procurement.offlinesales INNER JOIN stats USING (product) WHERE rsd_value <= 0.2 ORDER BY distance_from_avg DESC LIMIT 10
点击运行。
查看脚本结果。
在下图中,“结果”窗格使用名为 product 的列来识别交易价值在变体系数为 0.2 范围内的销售商品。
使用 JupyterLab 笔记本直观呈现异常情况
构建机器学习模型,以大规模检测和直观呈现异常情况。
在单独的标签页中打开笔记本,等待加载。您执行 Spark SQL 查询的会话将继续进行。
导入必要的软件包,并连接到包含交易数据的 BigQuery 外部表。运行以下代码:
from google.cloud import bigquery from google.api_core.client_options import ClientOptions import os import warnings warnings.filterwarnings('ignore') import pandas as pd project = os.environ['GOOGLE_CLOUD_PROJECT'] options = ClientOptions(quota_project_id=project) client = bigquery.Client(client_options=options) client = bigquery.Client() #Load data into DataFrame sql = '''select * from procurement.offlinesales limit 100;''' df = client.query(sql).to_dataframe()
运行隔离林算法,以发现数据集中的异常值:
to_model_columns = df.columns[2:4] from sklearn.ensemble import IsolationForest clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \ max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0) clf.fit(df[to_model_columns]) pred = clf.predict(df[to_model_columns]) df['anomaly']=pred outliers=df.loc[df['anomaly']==-1] outlier_index=list(outliers.index) #print(outlier_index) #Find the number of anomalies and normal points here points classified -1 are anomalous print(df['anomaly'].value_counts())
使用 Matplotlib 可视化结果绘制预测的异常值:
import numpy as np from sklearn.decomposition import PCA pca = PCA(2) pca.fit(df[to_model_columns]) res=pd.DataFrame(pca.transform(df[to_model_columns])) Z = np.array(res) plt.title("IsolationForest") plt.contourf( Z, cmap=plt.cm.Blues_r) b1 = plt.scatter(res[0], res[1], c='green', s=20,label="normal points") b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20, edgecolor="red",label="predicted outliers") plt.legend(loc="upper right") plt.show()
此图片显示了交易数据,并用红色突出显示了异常情况。
安排笔记本
借助“探索”功能,您可以安排定期运行笔记本。按照相应步骤安排您创建的 Jupyter 笔记本。
Dataplex 会创建一个调度任务来定期运行您的笔记本。如需监控任务进度,请点击查看时间表。
共享或导出笔记本
借助“探索”功能,您可以使用 IAM 权限与组织中的其他人共享笔记本。
查看角色。向用户授予或撤消此笔记本的 Dataplex Viewer (roles/dataplex.viewer
)、Dataplex Editor (roles/dataplex.editor
) 和 Dataplex Administrator (roles/dataplex.admin
) 角色。共享笔记本后,在数据湖级别具有 Viewer 或 Editor 角色的用户可以导航到数据湖并处理共享的笔记本。
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
删除 Google Cloud 项目:
gcloud projects delete PROJECT_ID
逐个删除资源
- 删除存储分区:
gcloud storage buckets delete BUCKET_NAME
- 删除实例:
gcloud compute instances delete INSTANCE_NAME
后续步骤
- 详细了解 Dataplex 探索。
- 安排脚本和笔记本。