Dataflow、BigQuery ML、Cloud Data Loss Prevention を使用した安全な異常検出ソリューションの構築

このチュートリアルでは、通信ネットワーク向けに安全な ML ベースのネットワーク異常検出ソリューションを作成する方法について説明します。この種のソリューションは、サイバー セキュリティの脅威の特定に使用されます。

このチュートリアルは、データ エンジニアとデータ サイエンティストを対象としています。また、以下に関する基本的な知識があることを前提としています。

リファレンス アーキテクチャ

次の図は、ML ベースのネットワーク異常検出システムの構築に使用されるコンポーネントを示しています。Pub/Sub と Cloud Storage はデータソースとして機能します。Dataflow は、DLP API でトークン化されたデータから特徴を集約して抽出します。BigQuery ML は、特徴から K 平均法クラスタリング モデルを作成し、Dataflow で外れ値を識別します。

Dataflow と BigQuery ML を使用した異常検出のリファレンス アーキテクチャ。

目標

  • Pub/Sub トピックとサブスクリプションを作成して、NetFlow の合成ログデータを生成する。
  • Dataflow を使用して、NetFlow ログデータから特徴を集計して抽出する。
  • BigQuery ML の K 平均法クラスタリング モデルを作成する。
  • DLP API を使用して、センシティブ データをトークン化する。
  • 正規化されトレーニングされたデータを使用して、リアルタイムで外れ値を検出する Dataflow パイプラインを作成する。

料金

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  5. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  6. Cloud Console で、Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

    Cloud Console の下部にある Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。gcloud コマンドライン ツールなどの Cloud SDK がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  7. このチュートリアルでは、Cloud Shell からすべてのコマンドを実行します。
  8. Cloud Shell で、BigQuery、Dataflow、Cloud Storage、DLP API を有効にします。

    gcloud services enable dlp.googleapis.com bigquery.googleapis.com \
      dataflow.googleapis.com storage-component.googleapis.com \
      pubsub.googleapis.com cloudbuild.googleapis.com
    

Dataflow と Pub/Sub を使用した合成データの生成

このセクションでは、自動化された Dataflow パイプラインをトリガーして、合成 NetFlow ログデータを生成する Pub/Sub トピックとサブスクリプションを作成します。

Pub/Sub トピックとサブスクリプションを作成する

    Cloud Shell で、Pub/Sub トピックとサブスクリプションを作成します。
    export PROJECT_ID=$(gcloud config get-value project)
    export TOPIC_ID=TOPIC_ID
    export SUBSCRIPTION_ID=SUBSCRIPTION_ID
    gcloud pubsub topics create $TOPIC_ID
    gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID 
    次のように置き換えます。
    • TOPIC_ID: Pub/Sub トピックの名前。
    • SUBSCRIPTION_ID: Pub/Sub サブスクリプションの名前。

合成データ生成パイプラインをトリガーする

  1. Cloud Shell で GitHub リポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git
    cd df-ml-anomaly-detection
    
  2. ジョブの自動送信を有効にするには、Cloud Build サービス アカウントに Dataflow の権限を付与します。

    export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \
      --format="value(PROJECT_NUMBER)")
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/dataflow.admin
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/compute.instanceAdmin.v1
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/iam.serviceAccountUser
    
    
  3. 合成データ生成パイプラインを開始します。

    gcloud builds submit . --machine-type=n1-highcpu-8 \
      --config scripts/cloud-build-data-generator.yaml \
      --substitutions _TOPIC_ID=${TOPIC_ID}
    

    コード パッケージが大きいため、ハイメモリ マシンタイプを使用する必要があります。このチュートリアルでは、machine-type=n1-highcpu-8 を使用します。

  4. サブスクリプションでログデータがパブリッシュされていることを確認します。

    gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt
    cat raw_log.txt
    

    出力には、次のようにランダムな値が入力された NetFlow ログスキーマ フィールドのサブセットが含まれます。

    {
     \"subscriberId\": \"mharper\",
     \"srcIP\": \"12.0.9.4",
     \"dstIP\": \"12.0.1.2\",
     \"srcPort\": 5000,
     \"dstPort\": 3000,
     \"txBytes\": 15,
     \"rxBytes\": 40,
     \"startTime\": 1570276550,
     \"endTime\": 1570276559,
     \"tcpFlag\": 0,
     \"protocolName\": \"tcp\",
     \"protocolNumber\": 0
    }
    

特徴の抽出と外れ値のデータの検出

このセクションでは、異常検出パイプラインによって処理された特徴と外れ値のデータを保存する BigQuery テーブルを作成します。

特徴と外れ値データの保存用 BigQuery テーブルを作成する

  1. Cloud Shell で、BigQuery データセットを作成します。

    export DATASET_NAME=DATASET_NAME
    bq --location=US mk -d \
      --description "Network Logs Dataset" \
      ${DATASET_NAME}
    
  2. BigQuery テーブルを作成します。

    bq mk -t --schema src/main/resources/aggr_log_table_schema.json \
      --time_partitioning_type=DAY \
      --clustering_fields="dst_subnet,subscriber_id" \
      --description "Network Log Feature Table" \
      ${PROJECT_ID}:${DATASET_NAME}.cluster_model_data
    
    bq mk -t --schema src/main/resources/outlier_table_schema.json \
      --description "Network Log Outlier Table" \
      ${PROJECT_ID}:${DATASET_NAME}.outlier_data
    
    bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \
      --description "Sample Normalized Data" \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data
    

    次のテーブルが生成されます。

    • cluster_model_data: モデル作成の特徴値を保存するクラスタ化パーティション分割テーブル。
    • outlier_data: 異常を保存する外れ値テーブル。
    • normalized_centroid_data: サンプルモデルから正規化されたデータが事前に入力されるテーブル。
  3. セントロイド データをテーブルに読み込みます。

    bq load \
      --source_format=NEWLINE_DELIMITED_JSON \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \
      gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json
    

Dataflow Flex テンプレートを作成してトリガーする

このセクションでは、Dataflow Flex テンプレートを作成して、異常検出パイプラインをトリガーします。

  1. Cloud Shell で、プロジェクトに Docker イメージを作成します。

    gcloud auth configure-docker
    gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
    
  2. Flex テンプレート構成ファイルを Cloud Storage バケットにアップロードします。

    export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG
    gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET}
    cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json
    {"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection",
    "sdk_info": {"language": "JAVA"}
    }
    EOF
    

    次のように置き換えます。

    • PROJECT_ID: 実際の Cloud プロジェクトの ID
    • DF_TEMPLATE_CONFIG: Dataflow Flex テンプレート構成ファイルの Cloud Storage バケットの名前
    • REGION: Cloud Storage バケットを作成するリージョン
  3. 正規化されたモデルデータをパイプライン パラメータとして渡す SQL ファイルを作成します。

    echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql
    gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
    
  4. 異常検出パイプラインを実行します。

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true
    
  5. Cloud Console で、Dataflow ページに移動します。

    Dataflow ページに移動

  6. netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N` ジョブをクリックします。次のような Dataflow パイプラインの表現が表示されます。

Dataflow モニタリング ユーザー インターフェースの異常検出パイプライン ジョブビュー。

テストの外れ値のメッセージをパブリッシュする

メッセージをパブリッシュして、外れ値のメッセージがパイプラインで正しく検出されていることを確認できます。

  1. Cloud Shell で、次のメッセージをパブリッシュします。

    gcloud pubsub topics publish ${TOPIC_ID} --message \
    "{\"subscriberId\": \"00000000000000000\",  \
    \"srcIP\": \"12.0.9.4\", \
    \"dstIP\": \"12.0.1.3\", \
    \"srcPort\": 5000, \
    \"dstPort\": 3000, \
    \"txBytes\": 150000, \
    \"rxBytes\": 40000, \
    \"startTime\": 1570276550, \
    \"endTime\": 1570276550, \
    \"tcpFlag\": 0, \
    \"protocolName\": \"tcp\", \
    \"protocolNumber\": 0}"
    

    合成データの設定量(100~500 バイト)と比較して、送信バイト数(txBytes)と受信バイト数(rxBytes)が異常に多いことがわかります。このメッセージによって、セキュリティ リスクの検証が促される可能性があります。

  2. 1、2 分後、異常が識別され BigQuery テーブルに保存されたことを確認します。

    export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    WHERE subscriber_id like "0%" limit 1'
    bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt
    cat outlier_orig.txt
    

    出力は次のようになります。

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |   transaction_time |
    +---------------+--------------+----------------------------+
    | 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

BigQuery ML を使用した K 平均法クラスタリング モデルの作成

  1. Cloud Console で、BigQuery の [クエリエディタ] ページに移動します。

    [クエリエディタ] に移動

  2. 特徴テーブルからトレーニング データを選択し、BigQuery ML を使用して K 平均法クラスタリング モデルを作成します。

    --> temp table for training data
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_NAME.train_data as
    (SELECT * FROM DATASET_NAME.cluster_model_data
    WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE
    AND NOT IS_NAN(avg_tx_bytes)
    AND NOT IS_NAN(avg_rx_bytes)
    AND NOT IS_NAN(avg_duration))
    limit 100000;
    
    --> create a model using BigQuery ML
    #standardSQL
    CREATE OR REPLACE MODEL DATASET_NAME.log_cluster options(model_type='kmeans', standardize_features = true) AS
    SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet)
    FROM DATASET_NAME.train_data;
    

    次のように置き換えます。

    • START_DATEEND_DATE: 現在の日付
    • DATASET_NAME: 先ほど作成したデータセット
  3. クラスタごとにデータを正規化します。

    --> create normalize table for each centroid
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_NAME.normalized_centroid_data as(
    with centroid_details AS (
    SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value)
    order by centroid_id) AS cluster
    from ML.CENTROIDS(model DATASET_NAME.log_cluster)
    group by centroid_id
    ),
    cluster as (select centroid_details.centroid_id as centroid_id,
    (select value from unnest(cluster) where name = 'number_of_records') AS number_of_records,
    (select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes,
    (select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes,
    (select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes,
    (select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes,
    (select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes,
    (select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes,
    (select value from unnest(cluster) where name = 'max_duration') AS max_duration,
    (select value from unnest(cluster) where name = 'min_duration') AS min_duration,
    (select value from unnest(cluster) where name = 'avg_duration') AS avg_duration
    FROM centroid_details order by centroid_id asc),
    predict as
    (select * from ML.PREDICT(model DATASET_NAME.log_cluster,
    (select * from DATASET_NAME.train_data)))
    select c.centroid_id as centroid_id,
    (stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+      (p.avg_rx_bytes-c.min_rx_bytes)
    +(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration)))
    as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes,  any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as   avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes,   any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes,  any_value(c.avg_duration) as avg_duration,any_value(c.max_duration)
    as max_duration , any_value(c.min_duration) as min_duration
    from predict as p
    inner join cluster as c on c.centroid_id = p.centroid_id
    group by c.centroid_id);
    

    このクエリは、入力ベクトルとセントロイド ベクトル間の標準偏差関数を使用して、クラスタごとに正規化された距離を計算します。つまり、次の式を実装します。

    stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))

  4. normalized_centroid_data テーブルを検証します。

    #standardSQL
    SELECT * from DATASET_NAME.normalized_centroid_data
    

    このステートメントの結果は、各セントロイド ID について算出、正規化された距離のテーブルになります。

    K 平均法クラスタごとに正規化されたデータ。

Cloud DLP を使用したデータの匿名化

このセクションでは、追加のパラメータを渡してパイプラインを再利用し、subscriber_id 列の国際モバイル サブスクライバー ID(IMSI)番号を匿名化します。

  1. Cloud Shell で、暗号鍵を作成します。

    export TEK=$(openssl rand -base64 32); echo ${TEK}
    a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
    
  2. コードエディタを起動するには、Cloud Shell ウィンドウのツールバーで [エディタを開く] をクリックします。

  3. [ファイル] > [新しいファイル] をクリックし、deid_template.json という名前のファイルを作成します。

  4. 次の JSON ブロックを新しいファイルにコピーします。

    {
      "deidentifyTemplate": {
        "displayName": "Config to de-identify IMEI Number",
        "description": "IMEI Number masking transformation",
        "deidentifyConfig": {
          "recordTransformations": {
            "fieldTransformations": [
              {
                "fields": [
                  {
                    "name": "subscriber_id"
                  }
                ],
                "primitiveTransformation": {
                  "cryptoDeterministicConfig": {
                    "cryptoKey": {
                      "unwrapped": {
                        "key": "CRYPTO_KEY"
                      }
                    },
                    "surrogateInfoType": {
                      "name": "IMSI_TOKEN"
                    }
                  }
                }
              }
            ]
          }
        }
      },
      "templateId": "dlp-deid-subid"
    }
    

    CRYPTO_KEY は、前に作成した暗号鍵に置き換えます。本番環境のワークロードには、Cloud KMS でラップされた鍵を使用することをおすすめします。ファイルを保存します。

  5. Cloud Shell ツールバーの [ターミナルを開く] をクリックします。

  6. Cloud Shell ターミナルで、Cloud DLP 匿名化テンプレートを作成します。

    export DLP_API_ROOT_URL="https://dlp.googleapis.com"
    export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates"
    export DEID_CONFIG="@deid_template.json"
    
    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST -H "Content-Type: application/json" \
       -H "Authorization: Bearer ${ACCESS_TOKEN}" \
       "${DEID_TEMPLATE_API}" \
       -d "${DEID_CONFIG}"
    

    これにより、Cloud プロジェクトに次の名前のテンプレートが作成されます。

    "name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"

  7. 前の手順でトリガーしたパイプラインを停止します。

    gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
    
  8. Cloud DLP で匿名化されたテンプレート名を使用して、異常検出パイプラインをトリガーします。

    gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true,\
    deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
    
  9. 外れ値テーブルにクエリを実行して、サブスクライバー ID が正常に匿名化されたことを確認します。

    export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    ORDER BY transaction_time DESC'
    
    bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt
    
    cat outlier_deid.txt
    

    出力は次のようになります。

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |      transaction_time      |
    +---------------+--------------+----------------------------+
    | IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

    サブスクライバー ID が匿名化されると、subscriber_id 列は元のサブスクライバー ID(00000000000)ではなくなります。

クリーンアップ

シリーズのチュートリアルを続行する予定がない場合、課金を停止する最も簡単な方法は、チュートリアル用に作成した Cloud プロジェクトを削除することです。また、リソースを個別に削除することもできます。

プロジェクトの削除

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ