このドキュメントでは、Dataplex Explore を使用して、小売トランザクション データセット内の異常を検出する方法について説明します。
データ探索ワークベンチ(Explore)を使用すると、データ アナリストはリアルタイムで大規模なデータセットをインタラクティブにクエリして探索できます。Explore は、データから分析情報を取得するのに役立ち、Cloud Storage と BigQuery に保存されているデータをクエリできます。Explore はサーバーレス Spark プラットフォームを使用するため、基盤となるインフラストラクチャを管理してスケーリングする必要はありません。
目標
このチュートリアルでは、次のタスクを行う方法を説明します。
- Explore の Spark SQL Workbench を使用して、Spark SQL クエリを作成し、実行する。
- JupyterLab ノートブックを使用して結果を表示します。
- ノートブックで繰り返し実行をスケジュールし、異常値のデータをモニタリングします。
費用
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_ID
は、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_ID
は、実際の Google Cloud プロジェクト名に置き換えます。
-
- Google Cloud CLI をインストールします。
-
gcloud CLI を初期化するには:
gcloud init
-
Google Cloud プロジェクトを作成または選択します。
-
Google Cloud プロジェクトを作成します。
gcloud projects create PROJECT_ID
PROJECT_ID
は、作成する Google Cloud プロジェクトの名前に置き換えます。 -
作成した Google Cloud プロジェクトを選択します。
gcloud config set project PROJECT_ID
PROJECT_ID
は、実際の Google Cloud プロジェクト名に置き換えます。
-
データ探索のためのデータの準備
Parquet ファイル
retail_offline_sales_march
をダウンロードします。次のように
offlinesales_curated
という Cloud Storage バケットを作成します。- Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。
- [バケットを作成] をクリックします。
- [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
- [作成] をクリックします。
ファイル システムからオブジェクトをアップロードするの手順に従って、ダウンロードした
offlinesales_march_parquet
ファイルを、作成したofflinesales_curated
Cloud Storage バケットにアップロードします。レイクを作成するの手順に従って、Dataplex レイクを作成して
operations
という名前を付けます。operations
レイクで、ゾーンを追加するの手順に従ってゾーンを追加し、procurement
という名前を付けます。procurement
ゾーンで、アセットを追加するの手順に従って、アセットとして作成したofflinesales_curated
Cloud Storage バケットを追加します。
探索するテーブルを選択する
Google Cloud コンソールで、Dataplex の [探索] ページに移動します。
[レイク] フィールドで
operations
レイクを選択します。operations
レイクをクリックします。procurement
ゾーンに移動し、テーブルをクリックしてメタデータを調べます。次の図では、選択した調達ゾーンに
Offline
というテーブルがあります。このテーブルには、メタデータorderid
、product
、quantityordered
、unitprice
、orderdate
、purchaseaddress
があります。Spark SQL エディタで、
[追加] をクリックします。Spark SQL スクリプトが表示されます。省略可: スクリプトを分割タブビューで開き、メタデータと新しいスクリプトを並べて表示します。新しいスクリプトタブで
[さらに表示] を選択し、[タブを右に分割] または [タブを左に分割] を選択します。
データを確認する
環境は、Spark SQL クエリとノートブックをレイク内で実行するためのサーバーレス コンピューティング リソースを提供します。 Spark SQL クエリを作成する前に、クエリを実行する環境を作成します。
次の SparkSQL クエリを使用してデータを探索します。SparkSQL エディタで、[新しいスクリプト] ペインにクエリを入力します。
テーブルの 10 行のサンプル
次のクエリを入力します。
select * from procurement.offlinesales where orderid != 'orderid' limit 10;
[実行] をクリックします。
データセット内のトランザクションの合計数を取得する
次のクエリを入力します。
select count(*) from procurement.offlinesales where orderid!='orderid';
[実行] をクリックします。
データセット内のさまざまな商品タイプの数を確認する
次のクエリを入力します。
select count(distinct product) from procurement.offlinesales where orderid!='orderid';
[実行] をクリックします。
トランザクション値が大きい商品を見つける
商品の種類と平均販売価格ごとに分類することで、トランザクション値が大きい商品を把握できます。
次のクエリを入力します。
select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
[実行] をクリックします。
次の画像には、product
という列を使用して avg_sales_amount
という列に表示されるトランザクション値が大きいセールス アイテムを識別する Results
ペインが表示されています。
変動係数を使用して異常を検出する
最後のクエリは、ノートパソコンの平均トランザクション額が高いことを示しています。次のクエリは、データセット内の異常ではないノートパソコン トランザクションを検出する方法を示しています。
次のクエリでは、「変動係数」指標 rsd_value
を使用して、値の分散が平均値よりも低くない、異常ではないトランザクションを見つけます。変動係数が小さいほど、異常が少ないことを示します。
次のクエリを入力します。
WITH stats AS ( SELECT product, AVG(quantityordered * unitprice) AS avg_value, STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value FROM procurement.offlinesales GROUP BY product) SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount, ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg FROM procurement.offlinesales INNER JOIN stats USING (product) WHERE rsd_value <= 0.2 ORDER BY distance_from_avg DESC LIMIT 10
[実行] をクリックします。
スクリプトの結果を確認します。
次の図では、[結果] ペインで商品という列を使用して、変動係数 0.2 以内のトランザクション値を持つ販売商品を識別しています。
JupyterLab ノートブックを使用して異常を可視化する
大規模な異常を検出して可視化するための ML モデルを構築します。
別のタブでノートブックを開き、読み込まれるまで待ちます。Spark SQL クエリを実行したセッションは続行されます。
必要なパッケージをインポートし、トランザクション データが含まれる BigQuery 外部テーブルに接続します。次のコードを実行します。
from google.cloud import bigquery from google.api_core.client_options import ClientOptions import os import warnings warnings.filterwarnings('ignore') import pandas as pd project = os.environ['GOOGLE_CLOUD_PROJECT'] options = ClientOptions(quota_project_id=project) client = bigquery.Client(client_options=options) client = bigquery.Client() #Load data into DataFrame sql = '''select * from procurement.offlinesales limit 100;''' df = client.query(sql).to_dataframe()
分離フォレスト アルゴリズムを実行して、データセット内の異常を検出します。
to_model_columns = df.columns[2:4] from sklearn.ensemble import IsolationForest clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \ max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0) clf.fit(df[to_model_columns]) pred = clf.predict(df[to_model_columns]) df['anomaly']=pred outliers=df.loc[df['anomaly']==-1] outlier_index=list(outliers.index) #print(outlier_index) #Find the number of anomalies and normal points here points classified -1 are anomalous print(df['anomaly'].value_counts())
Matplotlib の可視化を使用して、予測された異常をプロットします。
import numpy as np from sklearn.decomposition import PCA pca = PCA(2) pca.fit(df[to_model_columns]) res=pd.DataFrame(pca.transform(df[to_model_columns])) Z = np.array(res) plt.title("IsolationForest") plt.contourf( Z, cmap=plt.cm.Blues_r) b1 = plt.scatter(res[0], res[1], c='green', s=20,label="normal points") b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20, edgecolor="red",label="predicted outliers") plt.legend(loc="upper right") plt.show()
この画像は、異常値が赤でハイライト表示されたトランザクション データを示しています。
ノートブックのスケジュールを設定する
データ探索では、定期的に実行するノートブックをスケジュールできます。手順に従って Jupyter Notebook をスケジュールします。
Dataplex は、ノートブックを定期的に実行するスケジューリング タスクを作成します。タスクの進行状況をモニタリングするには、[スケジュールを表示] をクリックします。
ノートブックを共有またはエクスポートする
Explore では、IAM 権限を使用して、ノートブックを組織内の他のユーザーと共有できます。
ロールを確認します。このノートブックのユーザーに対して、Dataplex 閲覧者(roles/dataplex.viewer
)、Dataplex 編集者(roles/dataplex.editor
)、Dataplex 管理者(roles/dataplex.admin
)のロールを付与または取り消します。ノートブックを共有した後、レイクレベルで閲覧者または編集者のロールを持つユーザーは、レイクに移動して、共有ノートブックで作業できます。
ノートブックを共有またはエクスポートするには、ノートブックを共有するまたはノートブックをエクスポートするをご覧ください。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトの削除
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
-
バケットを削除します。
gcloud storage buckets delete BUCKET_NAME
-
インスタンスを削除します。
gcloud compute instances delete INSTANCE_NAME
次のステップ
- Dataplex Explore の詳細を確認する。
- スクリプトとノートブックのスケジュールを設定する