使用 Dataflow、BigQuery ML 和 Cloud Data Loss Prevention 构建安全的异常检测解决方案

本教程介绍如何为电信网络构建基于机器学习的安全网络异常检测解决方案。这类解决方案可用于帮助识别信息安全威胁。

本教程面向数据工程师和科学家,并假定您已掌握以下基本知识:

参考架构

下图显示了用于构建基于机器学习的网络异常检测系统的组件。Pub/Sub 和 Cloud Storage 充当数据源。Dataflow 聚合利用 DLP API 标记化的数据并从这些数据中提取特征。BigQuery ML 根据这些特征创建 k-means 聚类模型,而 Dataflow 会识别离群值。

使用 Dataflow 和 BigQuery ML 的异常检测参考架构。

目标

  • 创建 Pub/Sub 主题和订阅以生成合成 NetFlow 日志数据。
  • 使用 Dataflow 聚合 NetFlow 日志数据并从这些数据中提取特征。
  • 创建 BigQuery ML k-means 聚类模型。
  • 使用 DLP API 将敏感数据标记化。
  • 使用归一化且经过训练的数据创建 Dataflow 流水线,以进行实时离群值检测。

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  5. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  6. 在 Cloud Console 中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在 Cloud Console 的底部启动,并显示命令行提示符。Cloud Shell 是一个已安装 Cloud SDK 的 Shell 环境,其中包括 gcloud 命令行工具以及已为当前项目设置的值。该会话可能需要几秒钟时间来完成初始化。

  7. 您可以从 Cloud Shell 运行本教程中的所有命令。
  8. 在 Cloud Shell 中,启用 BigQuery、Dataflow、Cloud Storage 和 DLP API。

    gcloud services enable dlp.googleapis.com bigquery.googleapis.com \
      dataflow.googleapis.com storage-component.googleapis.com \
      pubsub.googleapis.com cloudbuild.googleapis.com
    

使用 Dataflow 和 Pub/Sub 生成合成数据

在本部分中,您将创建 Pub/Sub 主题和订阅,以通过触发自动的 Dataflow 流水线来生成合成 NetFlow 日志数据。

创建 Pub/Sub 主题和订阅

    在 Cloud Shell 中,创建 Pub/Sub 主题和订阅:
    export PROJECT_ID=$(gcloud config get-value project)
    export TOPIC_ID=TOPIC_ID
    export SUBSCRIPTION_ID=SUBSCRIPTION_ID
    gcloud pubsub topics create $TOPIC_ID
    gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID 
    替换以下内容:
    • TOPIC_ID:Pub/Sub 主题的名称
    • SUBSCRIPTION_ID:Pub/Sub 订阅的名称

触发合成数据生成流水线

  1. 在 Cloud Shell 中,克隆 GitHub 代码库:

    git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git
    cd df-ml-anomaly-detection
    
  2. 如需允许自动提交作业,请向 Cloud Build 服务帐号授予 Dataflow 权限:

    export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \
      --format="value(PROJECT_NUMBER)")
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/dataflow.admin
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/compute.instanceAdmin.v1
    
    gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/iam.serviceAccountUser
    
    
  3. 启动合成数据生成流水线:

    gcloud builds submit . --machine-type=n1-highcpu-8 \
      --config scripts/cloud-build-data-generator.yaml \
      --substitutions _TOPIC_ID=${TOPIC_ID}
    

    由于代码包很大,因此您必须使用高内存机器类型。在本教程中,请使用 machine-type=n1-highcpu-8

  4. 验证日志数据已在订阅中发布:

    gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt
    cat raw_log.txt
    

    输出包含填充了随机值的 NetFlow 日志架构字段的子集,类似如下内容:

    {
     \"subscriberId\": \"mharper\",
     \"srcIP\": \"12.0.9.4",
     \"dstIP\": \"12.0.1.2\",
     \"srcPort\": 5000,
     \"dstPort\": 3000,
     \"txBytes\": 15,
     \"rxBytes\": 40,
     \"startTime\": 1570276550,
     \"endTime\": 1570276559,
     \"tcpFlag\": 0,
     \"protocolName\": \"tcp\",
     \"protocolNumber\": 0
    }
    

提取特征并查找离群数据

在本部分中,您将创建 BigQuery 表来存储由异常检测流水线处理的特征和异常数据。

创建 BigQuery 表以存储特征和离群数据

  1. 在 Cloud Shell 中,创建一个 BigQuery 数据集:

    export DATASET_NAME=DATASET_NAME
    bq --location=US mk -d \
      --description "Network Logs Dataset" \
      ${DATASET_NAME}
    
  2. 创建 BigQuery 表:

    bq mk -t --schema src/main/resources/aggr_log_table_schema.json \
      --time_partitioning_type=DAY \
      --clustering_fields="dst_subnet,subscriber_id" \
      --description "Network Log Feature Table" \
      ${PROJECT_ID}:${DATASET_NAME}.cluster_model_data
    
    bq mk -t --schema src/main/resources/outlier_table_schema.json \
      --description "Network Log Outlier Table" \
      ${PROJECT_ID}:${DATASET_NAME}.outlier_data
    
    bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \
      --description "Sample Normalized Data" \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data
    

    生成下表:

    • cluster_model_data:存储用于创建模型的特征值的聚类分区表。
    • outlier_data:存储异常的异常值表。
    • normalized_centroid_data:预填充通过示例模型创建的归一化数据的表。
  3. 将形心数据加载到表中:

    bq load \
      --source_format=NEWLINE_DELIMITED_JSON \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \
      gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json
    

创建并触发 Dataflow Flex 模板

在本部分中,您将创建 Dataflow Flex 模板以触发异常检测流水线。

  1. 在 Cloud Shell 中,在您的项目中创建一个 Docker 映像:

    gcloud auth configure-docker
    gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
    
  2. 将 Flex 模板配置文件上传到 Cloud Storage 存储分区中:

    export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG
    gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET}
    cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json
    {"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection",
    "sdk_info": {"language": "JAVA"}
    }
    EOF
    

    替换以下内容:

    • PROJECT_ID:您的 Cloud 项目 ID
    • DF_TEMPLATE_CONFIG:您的 Dataflow Flex 模板配置文件的 Cloud Storage 存储分区的名称
    • REGION:要在其中创建 Cloud Storage 存储分区的区域
  3. 创建一个 SQL 文件以将归一化的模型数据作为流水线参数传递:

    echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql
    gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
    
  4. 运行异常检测流水线:

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true
    
  5. 在 Cloud Console 中,转到 Dataflow 页面。

    转到 Dataflow 页面

  6. 点击 netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N` 作业。这时会出现类似于以下内容的 Dataflow 流水线表示法:

Dataflow 监控界面中的异常检测流水线作业视图。

发布离群值消息以进行测试

您可以发布消息以验证系统已在流水线中正确检测到离群值消息。

  1. 在 Cloud Shell 中,发布以下消息:

    gcloud pubsub topics publish ${TOPIC_ID} --message \
    "{\"subscriberId\": \"00000000000000000\",  \
    \"srcIP\": \"12.0.9.4\", \
    \"dstIP\": \"12.0.1.3\", \
    \"srcPort\": 5000, \
    \"dstPort\": 3000, \
    \"txBytes\": 150000, \
    \"rxBytes\": 40000, \
    \"startTime\": 1570276550, \
    \"endTime\": 1570276550, \
    \"tcpFlag\": 0, \
    \"protocolName\": \"tcp\", \
    \"protocolNumber\": 0}"
    

    请注意,与针对合成数据设置的范围(100 到 500 字节)相比,传输字节数 (txBytes) 和接收字节数 (rxBytes) 就异常大。此消息可能表明有安全风险需要验证。

  2. 大约 1 分钟后,请验证异常已识别并存储在 BigQuery 表中:

    export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    WHERE subscriber_id like "0%" limit 1'
    bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt
    cat outlier_orig.txt
    

    输出内容类似如下:

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |   transaction_time |
    +---------------+--------------+----------------------------+
    | 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

使用 BigQuery ML 创建 k-means 聚类模型

  1. 在 Cloud Console 中,转到 BigQuery 查询编辑器页面。

    转到查询编辑器

  2. 从特征表中选择训练数据,并使用 BigQuery ML 创建 k-means 聚类模型:

    --> temp table for training data
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_NAME.train_data as
    (SELECT * FROM DATASET_NAME.cluster_model_data
    WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE
    AND NOT IS_NAN(avg_tx_bytes)
    AND NOT IS_NAN(avg_rx_bytes)
    AND NOT IS_NAN(avg_duration))
    limit 100000;
    
    --> create a model using BigQuery ML
    #standardSQL
    CREATE OR REPLACE MODEL DATASET_NAME.log_cluster options(model_type='kmeans', standardize_features = true) AS
    SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet)
    FROM DATASET_NAME.train_data;
    

    替换以下内容:

    • START_DATEEND_DATE:当前日期
    • DATASET_NAME:您之前创建的数据集
  3. 将每个聚类的数据归一化:

    --> create normalize table for each centroid
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_NAME.normalized_centroid_data as(
    with centroid_details AS (
    SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value)
    order by centroid_id) AS cluster
    from ML.CENTROIDS(model DATASET_NAME.log_cluster)
    group by centroid_id
    ),
    cluster as (select centroid_details.centroid_id as centroid_id,
    (select value from unnest(cluster) where name = 'number_of_records') AS number_of_records,
    (select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes,
    (select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes,
    (select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes,
    (select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes,
    (select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes,
    (select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes,
    (select value from unnest(cluster) where name = 'max_duration') AS max_duration,
    (select value from unnest(cluster) where name = 'min_duration') AS min_duration,
    (select value from unnest(cluster) where name = 'avg_duration') AS avg_duration
    FROM centroid_details order by centroid_id asc),
    predict as
    (select * from ML.PREDICT(model DATASET_NAME.log_cluster,
    (select * from DATASET_NAME.train_data)))
    select c.centroid_id as centroid_id,
    (stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+      (p.avg_rx_bytes-c.min_rx_bytes)
    +(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration)))
    as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes,  any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as   avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes,   any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes,  any_value(c.avg_duration) as avg_duration,any_value(c.max_duration)
    as max_duration , any_value(c.min_duration) as min_duration
    from predict as p
    inner join cluster as c on c.centroid_id = p.centroid_id
    group by c.centroid_id);
    

    此查询通过在输入和形心向量之间使用标准偏差函数为每个聚类计算归一化距离。换句话说,它实现以下公式:

    stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))

  4. 验证 normalized_centroid_data 表:

    #standardSQL
    SELECT * from DATASET_NAME.normalized_centroid_data
    

    此语句的结果是一个针对每个形心 ID 计算得出的归一化距离表:

    每个 k-means 聚类的归一化数据。

使用 Cloud DLP 对数据进行去标识化

在本部分中,通过传递一个额外参数来对 subscriber_id 列中的国际移动用户识别码 (IMSI) 进行去标识化,以重复使用流水线。

  1. 在 Cloud Shell 中,创建加密密钥:

    export TEK=$(openssl rand -base64 32); echo ${TEK}
    a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
    
  2. 如需启动代码编辑器,请在 Cloud Shell 窗口的工具栏上,点击打开编辑器

  3. 点击文件 > 新文件,然后创建一个名为 deid_template.json 的文件。

  4. 将以下 JSON 代码块复制到新文件中:

    {
      "deidentifyTemplate": {
        "displayName": "Config to de-identify IMEI Number",
        "description": "IMEI Number masking transformation",
        "deidentifyConfig": {
          "recordTransformations": {
            "fieldTransformations": [
              {
                "fields": [
                  {
                    "name": "subscriber_id"
                  }
                ],
                "primitiveTransformation": {
                  "cryptoDeterministicConfig": {
                    "cryptoKey": {
                      "unwrapped": {
                        "key": "CRYPTO_KEY"
                      }
                    },
                    "surrogateInfoType": {
                      "name": "IMSI_TOKEN"
                    }
                  }
                }
              }
            ]
          }
        }
      },
      "templateId": "dlp-deid-subid"
    }
    

    CRYPTO_KEY 替换为您先前创建的加密密钥。最佳做法是将 Cloud KMS 封装的密钥用于生产工作负载。保存文件。

  5. 在 Cloud Shell 工具栏中,点击打开终端

  6. 在 Cloud Shell 终端中,创建一个 Cloud DLP 去标识化模板:

    export DLP_API_ROOT_URL="https://dlp.googleapis.com"
    export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates"
    export DEID_CONFIG="@deid_template.json"
    
    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST -H "Content-Type: application/json" \
       -H "Authorization: Bearer ${ACCESS_TOKEN}" \
       "${DEID_TEMPLATE_API}" \
       -d "${DEID_CONFIG}"
    

    这将在您的 Cloud 项目中创建一个具有以下名称的模板:

    "name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"

  7. 停止您在上一步中触发的流水线:

    gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
    
  8. 使用 Cloud DLP 将模板名称去标识化以触发异常检测流水线:

    gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true,\
    deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
    
  9. 查询离群值表以验证用户识别码是否已成功去标识化:

    export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    ORDER BY transaction_time DESC'
    
    bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt
    
    cat outlier_deid.txt
    

    输出内容类似如下:

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |      transaction_time      |
    +---------------+--------------+----------------------------+
    | IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

    如果用户识别码已去标识化,则 subscriber_id 列将不再是原始用户识别码(即 00000000000)。

清除数据

如果您不打算继续学习本系列教程,那么避免产生费用的最简单方法是删除您为本教程创建的 Cloud 项目。或者,您也可以删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤