このドキュメントでは、Dataplex Explore を使用して小売取引データセットの異常を検出する方法について説明します。
データ探索ワークベンチ(Explore)を使用すると、データ アナリストはリアルタイムで大規模なデータセットをインタラクティブにクエリして探索できます。Explore は、データから分析情報を取得するのに役立ち、Cloud Storage と BigQuery に保存されているデータをクエリできます。Explore はサーバーレス Spark プラットフォームを使用するため、基盤となるインフラストラクチャを管理してスケーリングする必要はありません。
目標
このチュートリアルでは、次のタスクを行う方法を説明します。
- Explore の Spark SQL Workbench を使用して、Spark SQL クエリを作成して実行します。
- JupyterLab ノートブックを使用して結果を表示します。
- ノートブックを定期的に実行するようにスケジュール設定すると、データの異常をモニタリングできます。
料金
このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
データ探索用のデータを準備する
Parquet ファイル
retail_offline_sales_march
をダウンロードします。次のように、
offlinesales_curated
という Cloud Storage バケットを作成します。- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click Create bucket.
- On the Create a bucket page, enter your bucket information. To go to the next
step, click Continue.
- For Name your bucket, enter a name that meets the bucket naming requirements.
-
For Choose where to store your data, do the following:
- Select a Location type option.
- Select a Location option.
- For Choose a default storage class for your data, select a storage class.
- For Choose how to control access to objects, select an Access control option.
- For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
- Click Create.
ファイル システムからオブジェクトをアップロードするの手順に沿って、ダウンロードした
offlinesales_march_parquet
ファイルを作成したofflinesales_curated
Cloud Storage バケットにアップロードします。レイクを作成するの手順に従って、Dataplex レイクを作成して
operations
という名前を付けます。ゾーンを追加するの手順に沿って、
operations
レイクにゾーンを追加し、procurement
という名前を付けます。procurement
ゾーンで、アセットを追加するの手順に従って、アセットとして作成したofflinesales_curated
Cloud Storage バケットを追加します。
探索するテーブルを選択する
Google Cloud コンソールで Dataplex の [Explore] ページに移動します。
[レイク] フィールドで、
operations
レイクを選択します。operations
レイクをクリックします。procurement
ゾーンに移動し、テーブルをクリックしてメタデータを調べます。次の図では、選択した調達ゾーンに
Offline
というテーブルがあります。このテーブルには、メタデータorderid
、product
、quantityordered
、unitprice
、orderdate
、purchaseaddress
があります。Spark SQL エディタで、
[追加] をクリックします。Spark SQL スクリプトが表示されます。省略可: スクリプトを分割タブビューで開き、メタデータと新しいスクリプトを並べて表示します。新しいスクリプトタブで
[さらに表示] を選択し、[タブを右に分割] または [タブを左に分割] を選択します。
データを確認する
環境は、Spark SQL クエリとノートブックをレイク内で実行するためのサーバーレス コンピューティング リソースを提供します。 Spark SQL クエリを作成する前に、クエリを実行する環境を作成します。
次の SparkSQL クエリを使用してデータを探索します。SparkSQL エディタで、[新しいスクリプト] ペインにクエリを入力します。
テーブルの 10 行のサンプル
次のクエリを入力します。
select * from procurement.offlinesales where orderid != 'orderid' limit 10;
[実行] をクリックします。
データセット内のトランザクションの合計数を取得する
次のクエリを入力します。
select count(*) from procurement.offlinesales where orderid!='orderid';
[実行] をクリックします。
データセット内のさまざまな商品タイプの数を確認する
次のクエリを入力します。
select count(distinct product) from procurement.offlinesales where orderid!='orderid';
[実行] をクリックします。
取引額の大きい商品を見つける
販売を商品タイプと平均販売価格別に分類して、取引額の大きい商品を把握します。
次のクエリを入力します。
select product,avg(quantityordered * unitprice) as avg_sales_amount from procurement.offlinesales where orderid!='orderid' group by product order by avg_sales_amount desc;
[実行] をクリックします。
次の画像には、product
という列を使用して avg_sales_amount
という列に表示されるトランザクション値が大きいセールス アイテムを識別する Results
ペインが表示されています。
変動係数を使用して異常を検出する
前回のクエリでは、ノートパソコンの平均トランザクション額が高いことがわかりました。次のクエリは、データセット内の異常ではないノートパソコンのトランザクションを検出する方法を示しています。
次のクエリは、指標「変動係数」rsd_value
を使用して、値の分散が平均値と比較して低い、異常ではないトランザクションを検索します。変動係数が低いほど、異常値が少ないことを示します。
次のクエリを入力します。
WITH stats AS ( SELECT product, AVG(quantityordered * unitprice) AS avg_value, STDDEV(quantityordered * unitprice) / AVG(quantityordered * unitprice) AS rsd_value FROM procurement.offlinesales GROUP BY product) SELECT orderid, orderdate, product, (quantityordered * unitprice) as sales_amount, ABS(1 - (quantityordered * unitprice)/ avg_value) AS distance_from_avg FROM procurement.offlinesales INNER JOIN stats USING (product) WHERE rsd_value <= 0.2 ORDER BY distance_from_avg DESC LIMIT 10
[実行] をクリックします。
スクリプトの結果を確認します。
次の画像の [結果] ペインでは、product という列を使用して、トランザクション値が変動係数 0.2 の範囲内にあるセールス アイテムを特定しています。
JupyterLab ノートブックを使用して異常を可視化する
ML モデルを構築して、大規模な異常を検出して可視化します。
ノートブックを別のタブで開き、読み込みが完了するまで待ちます。Spark SQL クエリを実行したセッションは続行されます。
必要なパッケージをインポートし、トランザクション データを含む BigQuery 外部テーブルに接続します。次のコードを実行します。
from google.cloud import bigquery from google.api_core.client_options import ClientOptions import os import warnings warnings.filterwarnings('ignore') import pandas as pd project = os.environ['GOOGLE_CLOUD_PROJECT'] options = ClientOptions(quota_project_id=project) client = bigquery.Client(client_options=options) client = bigquery.Client() #Load data into DataFrame sql = '''select * from procurement.offlinesales limit 100;''' df = client.query(sql).to_dataframe()
分離森林アルゴリズムを実行して、データセット内の異常を検出します。
to_model_columns = df.columns[2:4] from sklearn.ensemble import IsolationForest clf=IsolationForest(n_estimators=100, max_samples='auto', contamination=float(.12), \ max_features=1.0, bootstrap=False, n_jobs=-1, random_state=42, verbose=0) clf.fit(df[to_model_columns]) pred = clf.predict(df[to_model_columns]) df['anomaly']=pred outliers=df.loc[df['anomaly']==-1] outlier_index=list(outliers.index) #print(outlier_index) #Find the number of anomalies and normal points here points classified -1 are anomalous print(df['anomaly'].value_counts())
Matplotlib 可視化を使用して、予測された異常をプロットします。
import numpy as np from sklearn.decomposition import PCA pca = PCA(2) pca.fit(df[to_model_columns]) res=pd.DataFrame(pca.transform(df[to_model_columns])) Z = np.array(res) plt.title("IsolationForest") plt.contourf( Z, cmap=plt.cm.Blues_r) b1 = plt.scatter(res[0], res[1], c='green', s=20,label="normal points") b1 =plt.scatter(res.iloc[outlier_index,0],res.iloc[outlier_index,1], c='green',s=20, edgecolor="red",label="predicted outliers") plt.legend(loc="upper right") plt.show()
この画像は、異常値が赤でハイライト表示されたトランザクション データを示しています。
ノートブックのスケジュールを設定する
Explore では、ノートブックを定期的に実行するようにスケジュール設定できます。手順に沿って、作成した Jupyter Notebook のスケジュールを設定します。
Dataplex は、ノートブックを定期的に実行するスケジュール設定タスクを作成します。タスクの進行状況をモニタリングするには、[スケジュールを表示] をクリックします。
ノートブックを共有またはエクスポートする
Explore では、IAM 権限を使用して組織内の他のユーザーとノートブックを共有できます。
ロールを確認します。このノートブックのユーザーに対して、Dataplex 閲覧者(roles/dataplex.viewer
)、Dataplex 編集者(roles/dataplex.editor
)、Dataplex 管理者(roles/dataplex.admin
)のロールを付与または取り消します。ノートブックを共有した後、レイクレベルで閲覧者または編集者のロールを持つユーザーは、レイクに移動して共有ノートブックで作業できます。
ノートブックを共有またはエクスポートするには、ノートブックを共有するまたはノートブックをエクスポートするをご覧ください。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
-
バケットを削除します。
gcloud storage buckets delete BUCKET_NAME
-
インスタンスを削除します。
gcloud compute instances delete INSTANCE_NAME
次のステップ
- Dataplex Explore の詳細を確認する。
- スクリプトとノートブックのスケジュールを設定する