このチュートリアルでは、通信ネットワーク向けに安全な ML ベースのネットワーク異常検出ソリューションを作成する方法について説明します。この種のソリューションは、サイバー セキュリティの脅威の特定に使用されます。
このチュートリアルは、データ エンジニアとデータ サイエンティストを対象としています。また、以下に関する基本的な知識があることを前提としています。
- Apache Beam Java SDK を使用して構築された Dataflow パイプライン
- 標準 SQL
- 基本的なシェル スクリプト
- K 平均法クラスタリング
リファレンス アーキテクチャ
次の図は、ML ベースのネットワーク異常検出システムの構築に使用されるコンポーネントを示しています。Pub/Sub と Cloud Storage はデータソースとして機能します。Dataflow は、DLP API でトークン化されたデータから特徴を集約して抽出します。BigQuery ML は、特徴から K 平均法クラスタリング モデルを作成し、Dataflow で外れ値を識別します。
目標
- Pub/Sub トピックとサブスクリプションを作成して、NetFlow の合成ログデータを生成する。
- Dataflow を使用して、NetFlow ログデータから特徴を集計して抽出する。
- BigQuery ML の K 平均法クラスタリング モデルを作成する。
- DLP API を使用して、センシティブ データをトークン化する。
- 正規化されトレーニングされたデータを使用して、リアルタイムで外れ値を検出する Dataflow パイプラインを作成する。
料金
このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。
始める前に
- Google アカウントにログインします。
Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する。
-
Cloud Console で、Cloud Shell をアクティブにします。
Cloud Console の下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。
gcloud
コマンドライン ツールなどの Cloud SDK がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。
このチュートリアルでは、Cloud Shell からすべてのコマンドを実行します。 - Cloud Shell で、BigQuery、Dataflow、Cloud Storage、DLP API を有効にします。
gcloud services enable dlp.googleapis.com bigquery.googleapis.com \ dataflow.googleapis.com storage-component.googleapis.com \ pubsub.googleapis.com cloudbuild.googleapis.com
Dataflow と Pub/Sub を使用した合成データの生成
このセクションでは、自動化された Dataflow パイプラインをトリガーして、合成 NetFlow ログデータを生成する Pub/Sub トピックとサブスクリプションを作成します。
Pub/Sub トピックとサブスクリプションを作成する
- Cloud Shell で、Pub/Sub トピックとサブスクリプションを作成します。
export PROJECT_ID=$(gcloud config get-value project) export TOPIC_ID=TOPIC_ID export SUBSCRIPTION_ID=SUBSCRIPTION_ID gcloud pubsub topics create $TOPIC_ID gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID次のように置き換えます。
- TOPIC_ID: Pub/Sub トピックの名前。
- SUBSCRIPTION_ID: Pub/Sub サブスクリプションの名前。
合成データ生成パイプラインをトリガーする
Cloud Shell で GitHub リポジトリのクローンを作成します。
git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git cd df-ml-anomaly-detection
ジョブの自動送信を有効にするには、Cloud Build サービス アカウントに Dataflow の権限を付与します。
export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \ --format="value(PROJECT_NUMBER)") gcloud projects add-iam-policy-binding ${PROJECT_ID} \ --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \ --role roles/dataflow.admin
合成データ生成パイプラインを開始します。
gcloud builds submit . --machine-type=n1-highcpu-8 \ --config scripts/cloud-build-data-generator.yaml \ --substitutions _TOPIC_ID=${TOPIC_ID}
コード パッケージが大きいため、ハイメモリ マシンタイプを使用する必要があります。このチュートリアルでは、
machine-type=n1-highcpu-8
を使用します。サブスクリプションでログデータがパブリッシュされていることを確認します。
gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt cat raw_log.txt
出力には、次のようにランダムな値が入力された NetFlow ログスキーマ フィールドのサブセットが含まれます。
{ \"subscriberId\": \"mharper\", \"srcIP\": \"12.0.9.4", \"dstIP\": \"12.0.1.2\", \"srcPort\": 5000, \"dstPort\": 3000, \"txBytes\": 15, \"rxBytes\": 40, \"startTime\": 1570276550, \"endTime\": 1570276559, \"tcpFlag\": 0, \"protocolName\": \"tcp\", \"protocolNumber\": 0 }
特徴の抽出と外れ値のデータの検出
このセクションでは、異常検出パイプラインによって処理された特徴と外れ値のデータを保存する BigQuery テーブルを作成します。
特徴と外れ値データの保存用 BigQuery テーブルを作成する
Cloud Shell で、BigQuery データセットを作成します。
export DATASET_NAME=dataset-name bq --location=US mk -d \ --description "Network Logs Dataset" \ ${DATASET_NAME}
BigQuery テーブルを作成します。
bq mk -t --schema src/main/resources/aggr_log_table_schema.json \ --time_partitioning_type=DAY \ --clustering_fields="dst_subnet,subscriber_id" \ --description "Network Log Feature Table" \ ${PROJECT_ID}:${DATASET_NAME}.cluster_model_data bq mk -t --schema src/main/resources/outlier_table_schema.json \ --description "Network Log Outlier Table" \ ${PROJECT_ID}:${DATASET_NAME}.outlier_data bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \ --description "Sample Normalized Data" \ ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data
次のテーブルが生成されます。
cluster_model_data
: モデル作成の特徴値を保存するクラスタ化パーティション分割テーブル。outlier_data
: 異常を保存する外れ値テーブル。normalized_centroid_data
: サンプルモデルから正規化されたデータが事前に入力されるテーブル。
セントロイド データをテーブルに読み込みます。
bq load \ --source_format=NEWLINE_DELIMITED_JSON \ ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \ gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json
Dataflow Flex テンプレートを作成してトリガーする
このセクションでは、Dataflow Flex テンプレートを作成して、異常検出パイプラインをトリガーします。
Cloud Shell で、プロジェクトに Docker イメージを作成します。
gcloud auth configure-docker gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
前に作成した Cloud Storage バケットに Flex テンプレート構成ファイルをアップロードします。
export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET} cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json {"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection", "sdk_info": {"language": "JAVA"} } EOF
次のように置き換えます。
- PROJECT_ID: 実際の Cloud プロジェクトの ID
- DF_TEMPLATE_CONFIG: Dataflow Flex テンプレート構成ファイルの Cloud Storage バケットの名前
- REGION: Cloud Storage バケットを作成したリージョン
正規化されたモデルデータをパイプライン パラメータとして渡す SQL ファイルを作成します。
echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
異常検出パイプラインを実行します。
gcloud beta dataflow flex-template run "anomaly-detection" \ --project=${PROJECT_ID} \ --region=us-central1 \ --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \ --parameters=autoscalingAlgorithm="NONE",\ numWorkers=5,\ maxNumWorkers=5,\ workerMachineType=n1-highmem-4,\ subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\ tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\ batchFrequency=2,\ customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\ tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\ clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\ outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\ inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\ workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\ diskSizeGb=5,\ windowInterval=10,\ writeMethod=FILE_LOADS,\ streaming=true
Cloud Console で、Dataflow ページに移動します。
netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N` ジョブをクリックします。次のような Dataflow パイプラインの表現が表示されます。
テストの外れ値のメッセージをパブリッシュする
メッセージをパブリッシュして、外れ値のメッセージがパイプラインで正しく検出されていることを確認できます。
Cloud Shell で、次のメッセージをパブリッシュします。
gcloud pubsub topics publish ${TOPIC_ID} --message \ "{\"subscriberId\": \"00000000000000000\", \ \"srcIP\": \"12.0.9.4\", \ \"dstIP\": \"12.0.1.3\", \ \"srcPort\": 5000, \ \"dstPort\": 3000, \ \"txBytes\": 150000, \ \"rxBytes\": 40000, \ \"startTime\": 1570276550, \ \"endTime\": 1570276550, \ \"tcpFlag\": 0, \ \"protocolName\": \"tcp\", \ \"protocolNumber\": 0}"
合成データの設定量(100~500 バイト)と比較して、送信バイト数(
txBytes
)と受信バイト数(rxBytes
)が異常に多いことがわかります。このメッセージによって、セキュリティ リスクの検証が促される可能性があります。1、2 分後、異常が識別され BigQuery テーブルに保存されたことを確認します。
export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data` WHERE subscriber_id like "0%" limit 1' bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt cat outlier_orig.txt
出力は次のようになります。
+---------------+--------------+----------------------------+ | subscriber_id | dst_subnet | transaction_time | +---------------+--------------+----------------------------+ | 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 | +---------------+--------------+----------------------------+
BigQuery ML を使用した K 平均法クラスタリング モデルの作成
Cloud Console で、BigQuery の [クエリエディタ] ページに移動します。
特徴テーブルからトレーニング データを選択し、BigQuery ML を使用して K 平均法クラスタリング モデルを作成します。
--> temp table for training data #standardSQL CREATE OR REPLACE TABLE DATASET_ID.train_data as (SELECT * FROM DATASET_ID.cluster_model_data WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE AND NOT IS_NAN(avg_tx_bytes) AND NOT IS_NAN(avg_rx_bytes) AND NOT IS_NAN(avg_duration)) limit 100000; --> create a model using BigQuery ML #standardSQL CREATE OR REPLACE MODEL DATASET_ID.log_cluster options(model_type='kmeans', standardize_features = true) AS SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet) FROM DATASET_ID.train_data;
次のように置き換えます。
- START_DATE、END_DATE: 現在の日付
- DATASET_ID: 作成したデータセット ID
クラスタごとにデータを正規化します。
--> create normalize table for each centroid #standardSQL CREATE OR REPLACE TABLE DATASET_ID.normalized_centroid_data as( with centroid_details AS ( SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value) order by centroid_id) AS cluster from ML.CENTROIDS(model DATASET_ID.log_cluster) group by centroid_id ), cluster as (select centroid_details.centroid_id as centroid_id, (select value from unnest(cluster) where name = 'number_of_records') AS number_of_records, (select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes, (select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes, (select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes, (select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes, (select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes, (select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes, (select value from unnest(cluster) where name = 'max_duration') AS max_duration, (select value from unnest(cluster) where name = 'min_duration') AS min_duration, (select value from unnest(cluster) where name = 'avg_duration') AS avg_duration FROM centroid_details order by centroid_id asc), predict as (select * from ML.PREDICT(model DATASET_ID.log_cluster, (select * from DATASET_I.train_data))) select c.centroid_id as centroid_id, (stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+ (p.avg_rx_bytes-c.min_rx_bytes) +(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration))) as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes, any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes, any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes, any_value(c.avg_duration) as avg_duration,any_value(c.max_duration) as max_duration , any_value(c.min_duration) as min_duration from predict as p inner join cluster as c on c.centroid_id = p.centroid_id group by c.centroid_id);
このクエリは、入力ベクトルとセントロイド ベクトル間の標準偏差関数を使用して、クラスタごとに正規化された距離を計算します。つまり、次の式を実装します。
stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))
normalized_centroid_data
テーブルを検証します。#standardSQL SELECT * from <DATASET_ID>.normalized_centroid_data
DATASET_ID は、作成したデータセットの ID に置き換えます。
このステートメントの結果は、各セントロイド ID について算出、正規化された距離のテーブルになります。
Cloud DLP を使用したデータの匿名化
このセクションでは、追加のパラメータを渡してパイプラインを再利用し、subscriber_id
列の国際モバイル サブスクライバー ID(IMSI)番号を匿名化します。
Cloud Shell で、暗号鍵を作成します。
export TEK=$(openssl rand -base64 32); echo ${TEK} a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
コードエディタを起動するには、Cloud Shell ウィンドウのツールバーで [エディタを開く] edit をクリックします。
[ファイル] > [新しいファイル] をクリックし、
deid_template.json
という名前のファイルを作成します。次の JSON ブロックを新しいファイルにコピーします。
{ "deidentifyTemplate": { "displayName": "Config to de-identify IMEI Number", "description": "IMEI Number masking transformation", "deidentifyConfig": { "recordTransformations": { "fieldTransformations": [ { "fields": [ { "name": "subscriber_id" } ], "primitiveTransformation": { "cryptoDeterministicConfig": { "cryptoKey": { "unwrapped": { "key": "CRYPTO_KEY" } }, "surrogateInfoType": { "name": "IMSI_TOKEN" } } } } ] } } }, "templateId": "dlp-deid-subid" }
CRYPTO_KEY は、前に作成した暗号鍵に置き換えます。本番環境のワークロードには、Cloud KMS でラップされた鍵を使用することをおすすめします。ファイルを保存します。
Cloud Shell ツールバーの [ターミナルを開く] をクリックします。
Cloud Shell ターミナルで、Cloud DLP 匿名化テンプレートを作成します。
export DLP_API_ROOT_URL="https://dlp.googleapis.com" export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates" export DEID_CONFIG="@deid_template.json" export ACCESS_TOKEN=$(gcloud auth print-access-token) curl -X POST -H "Content-Type: application/json" \ -H "Authorization: Bearer ${ACCESS_TOKEN}" \ "${DEID_TEMPLATE_API}" \ -d "${DEID_CONFIG}"
これにより、Cloud プロジェクトに次の名前のテンプレートが作成されます。
"name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"
前の手順でトリガーしたパイプラインを停止します。
gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
Cloud DLP で匿名化されたテンプレート名を使用して、異常検出パイプラインをトリガーします。
gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \ --project=${PROJECT_ID} \ --region=us-central1 \ --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \ --parameters=autoscalingAlgorithm="NONE",\ numWorkers=5,\ maxNumWorkers=5,\ workerMachineType=n1-highmem-4,\ subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\ tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\ batchFrequency=2,\ customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\ tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\ clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\ outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\ inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\ workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\ diskSizeGb=5,\ windowInterval=10,\ writeMethod=FILE_LOADS,\ streaming=true,\ deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
外れ値テーブルにクエリを実行して、サブスクライバー ID が正常に匿名化されたことを確認します。
export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data` ORDER BY transaction_time DESC' bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt cat outlier_deid.txt
出力は次のようになります。
+---------------+--------------+----------------------------+ | subscriber_id | dst_subnet | transaction_time | +---------------+--------------+----------------------------+ | IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 | +---------------+--------------+----------------------------+
サブスクライバー ID が匿名化されると、
subscriber_id
列は元のサブスクライバー ID(00000000000
)ではなくなります。
クリーンアップ
シリーズのチュートリアルを続行する予定がない場合、課金を停止する最も簡単な方法は、チュートリアル用に作成した Cloud プロジェクトを削除することです。また、リソースを個別に削除することもできます。
プロジェクトの削除
- Cloud Console で [リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
次のステップ
- GitHub の Netflow ログの異常検出のリポにあるサンプルコードを確認する。
- 他の異常検出ソリューションの説明を確認する。
- Google Cloud のその他の機能を試す。チュートリアルをご覧ください。