データの異常の検出


このドキュメントでは、Dataplex Explore を使用して、小売トランザクション データセット内の異常を検出する方法について説明します。

データ探索ワークベンチ(Explore)を使用すると、データ アナリストはリアルタイムで大規模なデータセットをインタラクティブにクエリして探索できます。Explore は、データから分析情報を取得するのに役立ち、Cloud Storage と BigQuery に保存されているデータをクエリできます。Explore はサーバーレス Spark プラットフォームを使用するため、基盤となるインフラストラクチャを管理してスケーリングする必要はありません。

目標

このチュートリアルでは、次のタスクを行う方法を説明します。

  • Explore の Spark SQL Workbench を使用して、Spark SQL クエリを作成し、実行する。
  • JupyterLab ノートブックを使用して結果を表示します。
  • ノートブックで繰り返し実行をスケジュールし、異常値のデータをモニタリングします。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Google Cloud CLI をインストールします。
  7. gcloud CLI を初期化するには:

    gcloud init
  8. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  9. Google Cloud プロジェクトで課金が有効になっていることを確認します

データ探索のためのデータの準備

  1. Parquet ファイル retail_offline_sales_march をダウンロードします。

    Parquet ファイルをダウンロードする

  2. 次のように offlinesales_curated という Cloud Storage バケットを作成します。

    1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

      [バケット] ページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケットに名前を付ける] で、バケット名の要件を満たす名前を入力します。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、ストレージ クラスを選択します。
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。

  3. ファイル システムからオブジェクトをアップロードするの手順に従って、ダウンロードした offlinesales_march_parquet ファイルを、作成した offlinesales_curated Cloud Storage バケットにアップロードします。

  4. レイクを作成するの手順に従って、Dataplex レイクを作成して operations という名前を付けます。

  5. operations レイクで、ゾーンを追加するの手順に従ってゾーンを追加し、procurement という名前を付けます。

  6. procurement ゾーンで、アセットを追加するの手順に従って、アセットとして作成した offlinesales_curated Cloud Storage バケットを追加します。

探索するテーブルを選択する

  1. Google Cloud コンソールで、Dataplex の [探索] ページに移動します。

    Exploreへ移動

  2. [レイク] フィールドで operations レイクを選択します。

  3. operations レイクをクリックします。

  4. procurement ゾーンに移動し、テーブルをクリックしてメタデータを調べます。

    次の図では、選択した調達ゾーンに Offline というテーブルがあります。このテーブルには、メタデータ orderidproductquantityorderedunitpriceorderdatepurchaseaddress があります。

    探索するテーブルを選択する

  5. Spark SQL エディタで、 [追加] をクリックします。Spark SQL スクリプトが表示されます。

  6. 省略可: スクリプトを分割タブビューで開き、メタデータと新しいスクリプトを並べて表示します。新しいスクリプトタブで [さらに表示] を選択し、[タブを右に分割] または [タブを左に分割] を選択します。

データを確認する

環境は、Spark SQL クエリとノートブックをレイク内で実行するためのサーバーレス コンピューティング リソースを提供します。 Spark SQL クエリを作成する前に、クエリを実行する環境を作成します。

次の SparkSQL クエリを使用してデータを探索します。SparkSQL エディタで、[新しいスクリプト] ペインにクエリを入力します。

テーブルの 10 行のサンプル

  1. 次のクエリを入力します。

    select * from procurement.offlinesales where orderid != 'orderid' limit 10;
    
  2. [実行] をクリックします。

データセット内のトランザクションの合計数を取得する

  1. 次のクエリを入力します。

    select count(*) from procurement.offlinesales where orderid!='orderid';
    
  2. [実行] をクリックします。

データセット内のさまざまな商品タイプの数を確認する

  1. 次のクエリを入力します。

    select count(distinct product) from procurement.offlinesales where orderid!='orderid';
    
  2. [実行] をクリックします。

トランザクション値が大きい商品を見つける

商品の種類と平均販売価格ごとに分類することで、トランザクション値が大きい商品を把握できます。

  1. 次のクエリを入力します。

    select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
    
  2. [実行] をクリックします。

次の画像には、product という列を使用して avg_sales_amount という列に表示されるトランザクション値が大きいセールス アイテムを識別する Results ペインが表示されています。

スクリプトの結果を確認します。

変動係数を使用して異常を検出する

最後のクエリは、ノートパソコンの平均トランザクション額が高いことを示しています。次のクエリは、データセット内の異常ではないノートパソコン トランザクションを検出する方法を示しています。

次のクエリでは、「変動係数」指標 rsd_value を使用して、値の分散が平均値よりも低くない、異常ではないトランザクションを見つけます。変動係数が小さいほど、異常が少ないことを示します。

  1. 次のクエリを入力します。

    WITH stats AS (
    SELECT product,
          AVG(quantityordered * unitprice)  AS avg_value,
          STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value
    FROM procurement.offlinesales
    GROUP BY product)
    SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount,
        ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg
    FROM procurement.offlinesales INNER JOIN stats USING (product)
    WHERE rsd_value <= 0.2
    ORDER BY distance_from_avg DESC
    LIMIT 10
    
  2. [実行] をクリックします。

  3. スクリプトの結果を確認します。

    次の図では、[結果] ペインで商品という列を使用して、変動係数 0.2 以内のトランザクション値を持つ販売商品を識別しています。

    スクリプトの結果を確認します。

JupyterLab ノートブックを使用して異常を可視化する

大規模な異常を検出して可視化するための ML モデルを構築します。

  1. ノートブックを作成する

  2. 別のタブでノートブックを開き、読み込まれるまで待ちます。Spark SQL クエリを実行したセッションは続行されます。

  3. 必要なパッケージをインポートし、トランザクション データが含まれる BigQuery 外部テーブルに接続します。次のコードを実行します。

    from google.cloud import bigquery
    from google.api_core.client_options import ClientOptions
    import os
    import warnings
    warnings.filterwarnings('ignore')
    import pandas as pd
    
    project = os.environ['GOOGLE_CLOUD_PROJECT']
    options = ClientOptions(quota_project_id=project)
    client = bigquery.Client(client_options=options)
    client = bigquery.Client()
    
    #Load data into DataFrame
    
    sql = '''select * from procurement.offlinesales limit 100;'''
    df = client.query(sql).to_dataframe()
    
  4. 分離フォレスト アルゴリズムを実行して、データセット内の異常を検出します。

    to_model_columns = df.columns[2:4]
    from sklearn.ensemble import IsolationForest
    clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \
                            max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0)
    clf.fit(df[to_model_columns])
    pred = clf.predict(df[to_model_columns])
    df['anomaly']=pred
    outliers=df.loc[df['anomaly']==-1]
    outlier_index=list(outliers.index)
    #print(outlier_index)
    #Find the number of anomalies and normal points here points classified -1 are anomalous
    print(df['anomaly'].value_counts())
    
  5. Matplotlib の可視化を使用して、予測された異常をプロットします。

    import numpy as np
    from sklearn.decomposition import PCA
    pca = PCA(2)
    pca.fit(df[to_model_columns])
    res=pd.DataFrame(pca.transform(df[to_model_columns]))
    Z = np.array(res)
    plt.title("IsolationForest")
    plt.contourf( Z, cmap=plt.cm.Blues_r)
    b1 = plt.scatter(res[0], res[1], c='green',
                    s=20,label="normal points")
    b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20,  edgecolor="red",label="predicted outliers")
    plt.legend(loc="upper right")
    plt.show()
    

この画像は、異常値が赤でハイライト表示されたトランザクション データを示しています。

異常値が赤でハイライト表示されたトランザクション データ

ノートブックのスケジュールを設定する

データ探索では、定期的に実行するノートブックをスケジュールできます。手順に従って Jupyter Notebook をスケジュールします。

Dataplex は、ノートブックを定期的に実行するスケジューリング タスクを作成します。タスクの進行状況をモニタリングするには、[スケジュールを表示] をクリックします。

ノートブックを共有またはエクスポートする

Explore では、IAM 権限を使用して、ノートブックを組織内の他のユーザーと共有できます。

ロールを確認します。このノートブックのユーザーに対して、Dataplex 閲覧者(roles/dataplex.viewer)、Dataplex 編集者(roles/dataplex.editor)、Dataplex 管理者(roles/dataplex.admin)のロールを付与または取り消します。ノートブックを共有した後、レイクレベルで閲覧者または編集者のロールを持つユーザーは、レイクに移動して、共有ノートブックで作業できます。

ノートブックを共有またはエクスポートするには、ノートブックを共有するまたはノートブックをエクスポートするをご覧ください。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトの削除

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

リソースを個別に削除する

  1. バケットを削除します。
    gcloud storage buckets delete BUCKET_NAME
  2. インスタンスを削除します。
    gcloud compute instances delete INSTANCE_NAME

次のステップ