データの異常の検出


このドキュメントでは、Dataplex Explore を使用して小売取引データセットの異常を検出する方法について説明します。

データ探索ワークベンチ(Explore)を使用すると、データ アナリストはリアルタイムで大規模なデータセットをインタラクティブにクエリして探索できます。Explore は、データから分析情報を取得するのに役立ち、Cloud Storage と BigQuery に保存されているデータをクエリできます。Explore はサーバーレス Spark プラットフォームを使用するため、基盤となるインフラストラクチャを管理してスケーリングする必要はありません。

目標

このチュートリアルでは、次のタスクを行う方法を説明します。

  • Explore の Spark SQL Workbench を使用して、Spark SQL クエリを作成して実行します。
  • JupyterLab ノートブックを使用して結果を表示します。
  • ノートブックを定期的に実行するようにスケジュール設定すると、データの異常をモニタリングできます。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

データ探索用のデータを準備する

  1. Parquet ファイル(retail_offline_sales_march)をダウンロードします。

    Parquet ファイルをダウンロードする

  2. 次のように、offlinesales_curated という Cloud Storage バケットを作成します。

    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a name that meets the bucket naming requirements.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select a storage class.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.

  3. ファイル システムからオブジェクトをアップロードするの手順に沿って、ダウンロードした offlinesales_march_parquet ファイルを作成した offlinesales_curated Cloud Storage バケットにアップロードします。

  4. レイクを作成するの手順に従って、Dataplex レイクを作成して operations という名前を付けます。

  5. ゾーンを追加するの手順に沿って、operations レイクにゾーンを追加し、procurement という名前を付けます。

  6. procurement ゾーンで、アセットを追加するの手順に従って、アセットとして作成した offlinesales_curated Cloud Storage バケットを追加します。

探索するテーブルを選択する

  1. Google Cloud コンソールで Dataplex の [Explore] ページに移動します。

  2. [レイク] フィールドで、operations レイクを選択します。

  3. operations レイクをクリックします。

  4. procurement ゾーンに移動し、テーブルをクリックしてメタデータを探索します。

    次の図では、選択した調達ゾーンに Offline というテーブルがあります。このテーブルには、メタデータ orderidproductquantityorderedunitpriceorderdatepurchaseaddress があります。

    探索するテーブルを選択する

  5. Spark SQL エディタで、 [追加] をクリックします。Spark SQL スクリプトが表示されます。

  6. 省略可: スクリプトを分割タブビューで開き、メタデータと新しいスクリプトを並べて表示します。新しいスクリプトタブで [さらに表示] を選択し、[タブを右に分割] または [タブを左に分割] を選択します。

データを確認する

環境は、Spark SQL クエリとノートブックをレイク内で実行するためのサーバーレス コンピューティング リソースを提供します。 Spark SQL クエリを作成する前に、クエリを実行する環境を作成します。

次の SparkSQL クエリを使用してデータを探索します。SparkSQL エディタで、[新しいスクリプト] ペインにクエリを入力します。

テーブルの 10 行のサンプル

  1. 次のクエリを入力します。

    select * from procurement.offlinesales where orderid != 'orderid' limit 10;
    
  2. [実行] をクリックします。

データセット内のトランザクションの合計数を取得する

  1. 次のクエリを入力します。

    select count(*) from procurement.offlinesales where orderid!='orderid';
    
  2. [実行] をクリックします。

データセット内のさまざまな商品タイプの数を確認する

  1. 次のクエリを入力します。

    select count(distinct product) from procurement.offlinesales where orderid!='orderid';
    
  2. [実行] をクリックします。

トランザクション値が大きい商品を見つける

販売を商品タイプと平均販売価格別に分類して、トランザクション値が大きい商品を把握します。

  1. 次のクエリを入力します。

    select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
    
  2. [実行] をクリックします。

次の画像には、product という列を使用して avg_sales_amount という列に表示されるトランザクション値が大きいセールス アイテムを識別する Results ペインが表示されています。

スクリプトの結果を確認します。

変動係数を使用して異常を検出する

前回のクエリでは、ノートパソコンの平均トランザクション量の値が高いことがわかりました。次のクエリは、データセット内の異常ではないノートパソコンのトランザクションを検出する方法を示しています。

次のクエリは、指標「変動係数」rsd_value を使用して、値の分散が平均値と比較して低い、異常ではないトランザクションを検索します。変動係数が低いほど、異常値が少ないことを示します。

  1. 次のクエリを入力します。

    WITH stats AS (
    SELECT product,
          AVG(quantityordered * unitprice)  AS avg_value,
          STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value
    FROM procurement.offlinesales
    GROUP BY product)
    SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount,
        ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg
    FROM procurement.offlinesales INNER JOIN stats USING (product)
    WHERE rsd_value <= 0.2
    ORDER BY distance_from_avg DESC
    LIMIT 10
    
  2. [実行] をクリックします。

  3. スクリプトの結果を確認します。

    次の画像の [結果] ペインでは、product という列を使用して、変動係数 0.2 の範囲内のトランザクション値を持つセールス アイテムを特定しています。

    スクリプトの結果を確認します。

JupyterLab ノートブックを使用して異常を可視化する

ML モデルを構築して、異常を大規模に検出して可視化します。

  1. ノートブックを作成する

  2. ノートブックを別のタブで開き、読み込みが完了するまで待ちます。Spark SQL クエリを実行したセッションは続行されます。

  3. 必要なパッケージをインポートし、トランザクション データを含む BigQuery 外部テーブルに接続します。次のコードを実行します。

    from google.cloud import bigquery
    from google.api_core.client_options import ClientOptions
    import os
    import warnings
    warnings.filterwarnings('ignore')
    import pandas as pd
    
    project = os.environ['GOOGLE_CLOUD_PROJECT']
    options = ClientOptions(quota_project_id=project)
    client = bigquery.Client(client_options=options)
    client = bigquery.Client()
    
    #Load data into DataFrame
    
    sql = '''select * from procurement.offlinesales limit 100;'''
    df = client.query(sql).to_dataframe()
    
  4. 分離フォレスト アルゴリズムを実行して、データセット内の異常を検出します。

    to_model_columns = df.columns[2:4]
    from sklearn.ensemble import IsolationForest
    clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \
                            max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0)
    clf.fit(df[to_model_columns])
    pred = clf.predict(df[to_model_columns])
    df['anomaly']=pred
    outliers=df.loc[df['anomaly']==-1]
    outlier_index=list(outliers.index)
    #print(outlier_index)
    #Find the number of anomalies and normal points here points classified -1 are anomalous
    print(df['anomaly'].value_counts())
    
  5. Matplotlib の可視化を使用して、予測された異常をプロットします。

    import numpy as np
    from sklearn.decomposition import PCA
    pca = PCA(2)
    pca.fit(df[to_model_columns])
    res=pd.DataFrame(pca.transform(df[to_model_columns]))
    Z = np.array(res)
    plt.title("IsolationForest")
    plt.contourf( Z, cmap=plt.cm.Blues_r)
    b1 = plt.scatter(res[0], res[1], c='green',
                    s=20,label="normal points")
    b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20,  edgecolor="red",label="predicted outliers")
    plt.legend(loc="upper right")
    plt.show()
    

この画像は、異常値が赤でハイライト表示されたトランザクション データを示しています。

異常値が赤でハイライト表示されたトランザクション データ

ノートブックのスケジュールを設定する

Explore では、ノートブックを定期的に実行するようにスケジュール設定できます。手順に沿って、作成した Jupyter Notebook のスケジュールを設定します。

Dataplex は、ノートブックを定期的に実行するスケジュール設定タスクを作成します。タスクの進行状況をモニタリングするには、[スケジュールを表示] をクリックします。

ノートブックを共有またはエクスポートする

Explore では、IAM 権限を使用して組織内の他のユーザーとノートブックを共有できます。

ロールを確認します。このノートブックのユーザーに対して、Dataplex 閲覧者(roles/dataplex.viewer)、Dataplex 編集者(roles/dataplex.editor)、Dataplex 管理者(roles/dataplex.admin)のロールを付与または取り消します。ノートブックを共有した後、レイクレベルで閲覧者または編集者のロールを持つユーザーは、レイクに移動して共有ノートブックで作業できます。

ノートブックを共有またはエクスポートするには、ノートブックを共有するまたはノートブックをエクスポートするをご覧ください。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

リソースを個別に削除する

  1. バケットを削除します。
    gcloud storage buckets delete BUCKET_NAME
  2. インスタンスを削除します。
    gcloud compute instances delete INSTANCE_NAME

次のステップ