检测数据异常


本文档介绍了如何使用 Dataplex 探索功能检测零售交易数据集中的异常值。

借助数据探索工作台(简称“探索”),数据分析师可以实时地以交互方式查询和探索大型数据集。“探索”工具可帮助您从数据中获取数据洞见,并可让您查询存储在 Cloud Storage 和 BigQuery 中的数据。探索使用无服务器 Spark 平台,因此您无需管理和扩缩底层基础架构。

目标

本教程介绍如何完成以下任务:

  • 使用探索的 Spark SQL 工作台编写和执行 Spark SQL 查询。
  • 使用 JupyterLab 笔记本查看结果。
  • 安排笔记本定期执行,以便监控数据异常。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. 安装 Google Cloud CLI。
  7. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  8. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  9. 确保您的 Google Cloud 项目已启用结算功能

准备数据以进行探索

  1. 下载 Parquet 文件 retail_offline_sales_march

    下载 Parquet 文件

  2. 创建一个名为 offlinesales_curated 的 Cloud Storage 存储桶,如下所示:

    1. 在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。

      进入“存储桶”页面

    2. 点击创建存储分区
    3. 创建存储分区页面上,输入您的存储分区信息。要转到下一步,请点击继续
      • 指定存储分区的名称中,输入符合存储分区命名要求的名称。
      • 对于选择数据存储位置,执行以下操作:
        • 选择位置类型选项。
        • 选择位置选项。
      • 对于为数据选择一个默认存储类别,请选择一个存储类别
      • 对于选择如何控制对象的访问权限,请选择访问权限控制选项。
      • 对于高级设置(可选),请指定加密方法保留政策存储分区标签
    4. 点击创建

  3. 按照从文件系统上传对象中的步骤,将您下载的 offlinesales_march_parquet 文件上传到您创建的 offlinesales_curated Cloud Storage 存储桶。

  4. 按照创建数据湖中的步骤创建一个 Dataplex 数据湖,并将其命名为 operations

  5. operations 数据湖中,按照添加区域中的步骤添加一个区域并将其命名为 procurement

  6. 按照添加资产中的步骤,在 procurement 区域中,将您创建的 offlinesales_curated Cloud Storage 存储桶添加为资产。

选择要探索的表

  1. 在 Google Cloud 控制台中,转到 Dataplex 探索页面。

    前往“探索”页面

  2. 湖泊字段中,选择 operations 数据湖。

  3. 点击 operations 数据湖。

  4. 导航到 procurement 区域,然后点击该表以浏览其元数据。

    在下图中,所选采购区域有一个名为 Offline 的表,其中包含以下元数据:orderidproductquantityorderedunitpriceorderdatepurchaseaddress

    选择要探索的表

  5. Spark SQL Editor 中,点击 添加。此时会出现 Spark SQL 脚本。

  6. 可选:在拆分标签页视图中打开脚本,并排查看元数据和新脚本。点击新脚本标签页中的 更多,然后选择将标签页拆分到右侧将标签页拆分到左侧

探索数据

环境为 Spark SQL 查询和笔记本提供无服务器计算资源,以便在数据湖内运行。在编写 Spark SQL 查询之前,请创建环境以运行查询。

使用以下 SparkSQL 查询来探索数据。在 SparkSQL 编辑器新建脚本窗格中,输入查询。

对表的 10 行进行采样

  1. 输入以下查询:

    select * from procurement.offlinesales where orderid != 'orderid' limit 10;
    
  2. 点击运行

获取数据集中的总事务数

  1. 输入以下查询:

    select count(*) from procurement.offlinesales where orderid!='orderid';
    
  2. 点击运行

确定数据集中不同商品类型的数量

  1. 输入以下查询:

    select count(distinct product) from procurement.offlinesales where orderid!='orderid';
    
  2. 点击运行

查找交易价值较大的产品

通过按产品类型和平均售价对销量进行细分,了解哪些产品的交易价值较大。

  1. 输入以下查询:

    select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
    
  2. 点击运行

下图显示了一个 Results 窗格,该窗格使用 product 列来识别交易价值较大的销售商品,如 avg_sales_amount 列中所示。

查看脚本结果。

使用变异系数检测异常情况

最后一个查询显示笔记本电脑的平均交易金额较高。以下查询展示了如何检测数据集中未异常的笔记本电脑事务。

以下查询使用“变异系数”指标 rsd_value 来查找不异常的交易,其中值的差距低于平均值。变异系数越低,表示异常值越少。

  1. 输入以下查询:

    WITH stats AS (
    SELECT product,
          AVG(quantityordered * unitprice)  AS avg_value,
          STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value
    FROM procurement.offlinesales
    GROUP BY product)
    SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount,
        ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg
    FROM procurement.offlinesales INNER JOIN stats USING (product)
    WHERE rsd_value <= 0.2
    ORDER BY distance_from_avg DESC
    LIMIT 10
    
  2. 点击运行

  3. 查看脚本结果。

    在下图中,“结果”窗格使用名为 product 的列来识别交易价值在变体系数为 0.2 范围内的销售商品。

    查看脚本结果。

使用 JupyterLab 笔记本直观呈现异常情况

构建机器学习模型,以大规模检测和直观呈现异常情况。

  1. 创建笔记本

  2. 在单独的标签页中打开笔记本,等待加载。您执行 Spark SQL 查询的会话将继续进行。

  3. 导入必要的软件包,并连接到包含交易数据的 BigQuery 外部表。运行以下代码:

    from google.cloud import bigquery
    from google.api_core.client_options import ClientOptions
    import os
    import warnings
    warnings.filterwarnings('ignore')
    import pandas as pd
    
    project = os.environ['GOOGLE_CLOUD_PROJECT']
    options = ClientOptions(quota_project_id=project)
    client = bigquery.Client(client_options=options)
    client = bigquery.Client()
    
    #Load data into DataFrame
    
    sql = '''select * from procurement.offlinesales limit 100;'''
    df = client.query(sql).to_dataframe()
    
  4. 运行隔离林算法,以发现数据集中的异常值:

    to_model_columns = df.columns[2:4]
    from sklearn.ensemble import IsolationForest
    clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \
                            max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0)
    clf.fit(df[to_model_columns])
    pred = clf.predict(df[to_model_columns])
    df['anomaly']=pred
    outliers=df.loc[df['anomaly']==-1]
    outlier_index=list(outliers.index)
    #print(outlier_index)
    #Find the number of anomalies and normal points here points classified -1 are anomalous
    print(df['anomaly'].value_counts())
    
  5. 使用 Matplotlib 可视化结果绘制预测的异常值:

    import numpy as np
    from sklearn.decomposition import PCA
    pca = PCA(2)
    pca.fit(df[to_model_columns])
    res=pd.DataFrame(pca.transform(df[to_model_columns]))
    Z = np.array(res)
    plt.title("IsolationForest")
    plt.contourf( Z, cmap=plt.cm.Blues_r)
    b1 = plt.scatter(res[0], res[1], c='green',
                    s=20,label="normal points")
    b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20,  edgecolor="red",label="predicted outliers")
    plt.legend(loc="upper right")
    plt.show()
    

此图片显示了交易数据,并用红色突出显示了异常情况。

以红色突出显示的异常交易数据

安排笔记本

借助“探索”功能,您可以安排定期运行笔记本。按照相应步骤安排您创建的 Jupyter 笔记本

Dataplex 会创建一个调度任务来定期运行您的笔记本。如需监控任务进度,请点击查看时间表

共享或导出笔记本

借助“探索”功能,您可以使用 IAM 权限与组织中的其他人共享笔记本。

查看角色。向用户授予或撤消此笔记本的 Dataplex Viewer (roles/dataplex.viewer)、Dataplex Editor (roles/dataplex.editor) 和 Dataplex Administrator (roles/dataplex.admin) 角色。共享笔记本后,在数据湖级别具有 Viewer 或 Editor 角色的用户可以导航到数据湖并处理共享的笔记本。

如需共享或导出笔记本,请参阅共享笔记本导出笔记本

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

    删除 Google Cloud 项目:

    gcloud projects delete PROJECT_ID

逐个删除资源

  1. 删除存储分区:
    gcloud storage buckets delete BUCKET_NAME
  2. 删除实例:
    gcloud compute instances delete INSTANCE_NAME

后续步骤