Créer une solution sécurisée de détection d'anomalies à l'aide de Dataflow, BigQuery ML et Cloud Data Loss Prevention

Ce tutoriel décrit comment créer une solution de détection d'anomalies réseau sécurisée et basée sur le ML pour les réseaux de télécommunications. Ce type de solution permet d'identifier les menaces de cybersécurité.

Ce tutoriel est destiné aux ingénieurs de données et aux data scientists. Il part du principe que vous possédez des connaissances de base sur les éléments suivants :

Architecture de référence

Le schéma suivant montre les composants utilisés pour créer un système de détection d'anomalies réseau basé sur le ML. Pub/Sub et Cloud Storage servent de sources de données. Dataflow agrège et extrait des caractéristiques de ces données tokenisées à l'aide de l'API DLP. BigQuery ML crée un modèle de clustering en k-moyennes à partir de ces caractéristiques et Dataflow identifie les anomalies.

Architecture de référence du système de détection d'anomalies basé sur Dataflow et BigQuery ML.

Objectifs

  • Créer un sujet et un abonnement Pub/Sub pour générer des données de journal NetFlow synthétiques
  • Agréger et extraire des caractéristiques à partir des données de journal NetFlow à l'aide de Dataflow
  • Créer un modèle de clustering en k-moyennes avec BigQuery ML
  • Tokeniser les données sensibles avec l'API DLP
  • Créer un pipeline Dataflow pour la détection d'anomalies en temps réel à l'aide de données normalisées et entraînées

Coûts

Ce tutoriel utilise les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

  1. Connectez-vous à votre compte Google.

    Si vous n'en possédez pas déjà un, vous devez en créer un.

  2. Dans Google Cloud Console, sur la page de sélection du projet, sélectionnez ou créez un projet Google Cloud.

    Accéder à la page de sélection du projet

  3. Assurez-vous que la facturation est activée pour votre projet Cloud. Découvrez comment vérifier que la facturation est activée pour votre projet.

  4. Dans Cloud Console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la fenêtre de Cloud Console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement shell dans lequel le SDK Cloud est déjà installé (y compris l'outil de ligne de commande gcloud), et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  5. Vous exécuterez toutes les commandes de ce tutoriel depuis Cloud Shell.
  6. Dans Cloud Shell, activez les API BigQuery, Dataflow, Cloud Storage et DLP.

    gcloud services enable dlp.googleapis.com bigquery.googleapis.com \
      dataflow.googleapis.com storage-component.googleapis.com \
      pubsub.googleapis.com cloudbuild.googleapis.com
    

Générer des données synthétiques à l'aide de Dataflow et de Pub/Sub

Dans cette section, vous allez créer un sujet et un abonnement Pub/Sub pour générer des données de journal NetFlow synthétiques en déclenchant un pipeline Dataflow automatisé.

Créer un sujet et un abonnement Pub/Sub

    Dans Cloud Shell, créez un sujet et un abonnement Pub/Sub :
    export PROJECT_ID=$(gcloud config get-value project)
    export TOPIC_ID=TOPIC_ID
    export SUBSCRIPTION_ID=SUBSCRIPTION_ID
    gcloud pubsub topics create $TOPIC_ID
    gcloud pubsub subscriptions create $SUBSCRIPTION_ID --topic=$TOPIC_ID 
    Remplacez les éléments suivants :
    • TOPIC_ID : nom du sujet Pub/Sub
    • SUBSCRIPTION_ID : nom de l'abonnement Pub/Sub

Déclencher le pipeline de génération de données synthétiques

  1. Dans Cloud Shell, clonez le dépôt GitHub.

    git clone https://github.com/GoogleCloudPlatform/df-ml-anomaly-detection.git
    cd df-ml-anomaly-detection
    
  2. Pour permettre l'envoi automatique de tâches, accordez des autorisations Dataflow à votre compte de service Cloud Build :

    export PROJECT_NUMBER=$(gcloud projects list --filter=${PROJECT_ID} \
      --format="value(PROJECT_NUMBER)")
      gcloud projects add-iam-policy-binding ${PROJECT_ID} \
      --member serviceAccount:${PROJECT_NUMBER}@cloudbuild.gserviceaccount.com \
      --role roles/dataflow.admin
    
  3. Démarrez le pipeline de génération de données synthétiques :

    gcloud builds submit . --machine-type=n1-highcpu-8 \
      --config scripts/cloud-build-data-generator.yaml \
      --substitutions _TOPIC_ID=${TOPIC_ID}
    

    En raison de la taille importante du package de code, vous devez exploiter un type de machine à haute capacité de mémoire. Pour ce tutoriel, utilisez le type machine-type=n1-highcpu-8.

  4. Vérifiez que les données de journal sont bien publiées dans l'abonnement :

    gcloud pubsub subscriptions pull ${SUBSCRIPTION_ID} --auto-ack --limit 1 >> raw_log.txt
    cat raw_log.txt
    

    Le résultat contient un sous-ensemble de champs de schéma de journal NetFlow avec des valeurs aléatoires, semblables à ce qui suit :

    {
     \"subscriberId\": \"mharper\",
     \"srcIP\": \"12.0.9.4",
     \"dstIP\": \"12.0.1.2\",
     \"srcPort\": 5000,
     \"dstPort\": 3000,
     \"txBytes\": 15,
     \"rxBytes\": 40,
     \"startTime\": 1570276550,
     \"endTime\": 1570276559,
     \"tcpFlag\": 0,
     \"protocolName\": \"tcp\",
     \"protocolNumber\": 0
    }
    

Extraire des caractéristiques et identifier les données aberrantes

Dans cette section, vous allez créer des tables BigQuery pour stocker les données de caractéristiques et les données aberrantes traitées par le pipeline de détection d'anomalies.

Créer des tables BigQuery pour stocker les données de caractéristiques et les données aberrantes

  1. Dans Cloud Shell, créez un ensemble de données BigQuery :

    export DATASET_NAME=dataset-name
    bq --location=US mk -d \
      --description "Network Logs Dataset" \
      ${DATASET_NAME}
    
  2. Créez des tables BigQuery :

    bq mk -t --schema src/main/resources/aggr_log_table_schema.json \
      --time_partitioning_type=DAY \
      --clustering_fields="dst_subnet,subscriber_id" \
      --description "Network Log Feature Table" \
      ${PROJECT_ID}:${DATASET_NAME}.cluster_model_data
    
    bq mk -t --schema src/main/resources/outlier_table_schema.json \
      --description "Network Log Outlier Table" \
      ${PROJECT_ID}:${DATASET_NAME}.outlier_data
    
    bq mk -t --schema src/main/resources/normalized_centroid_data_schema.json \
      --description "Sample Normalized Data" \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data
    

    Les tables suivantes sont générées :

    • cluster_model_data : table de partition en cluster qui stocke les valeurs de caractéristiques pour la création de modèles.
    • outlier_data : table des données aberrantes, qui stocke les anomalies.
    • normalized_centroid_data : table préremplie avec des données normalisées créées à partir d'un exemple de modèle.
  3. Chargez les données de centroïde dans les tables :

    bq load \
      --source_format=NEWLINE_DELIMITED_JSON \
      ${PROJECT_ID}:${DATASET_NAME}.normalized_centroid_data \
      gs://df-ml-anomaly-detection-mock-data/sample_model/normalized_centroid_data.json src/main/resources/normalized_centroid_data_schema.json
    

Créer et déclencher un modèle Flex Dataflow

Dans cette section, vous allez créer un modèle Flex Dataflow pour déclencher le pipeline de détection d'anomalies.

  1. À partir de Cloud Shell, créez une image Docker dans votre projet :

    gcloud auth configure-docker
    gradle jib --image=gcr.io/${PROJECT_ID}/df-ml-anomaly-detection:latest -DmainClass=com.google.solutions.df.log.aggregations.SecureLogAggregationPipeline
    
  2. Importez le fichier de configuration du modèle Flex dans le bucket Cloud Storage que vous avez créé précédemment :

    export DF_TEMPLATE_CONFIG_BUCKET=${PROJECT_ID}-DF_TEMPLATE_CONFIG
    gsutil mb -c standard -l REGION gs://${DF_TEMPLATE_CONFIG_BUCKET}
    cat << EOF | gsutil cp - gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json
    {"image": "gcr.io/${PROJECT_ID}/df-ml-anomaly-detection",
    "sdk_info": {"language": "JAVA"}
    }
    EOF
    

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet Cloud
    • DF_TEMPLATE_CONFIG : nom du bucket Cloud Storage accueillant le fichier de configuration du modèle Flex Dataflow
    • REGION : région dans laquelle vous avez créé le bucket Cloud Storage
  3. Créez un fichier SQL pour transmettre les données de modèle normalisées en tant que paramètre de pipeline :

    echo "SELECT * FROM \`${PROJECT_ID}.${DATASET_NAME}.normalized_centroid_data\`" > normalized_cluster_data.sql
    gsutil cp normalized_cluster_data.sql gs://${DF_TEMPLATE_CONFIG_BUCKET}/
    
  4. Exécutez le pipeline de détection d'anomalies :

    gcloud beta dataflow flex-template run "anomaly-detection" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true
    
  5. Dans Cloud Console, accédez à la page Dataflow.

    Accéder à la page Dataflow

  6. Cliquez sur la tâche netflow-anomaly-detection-date +%Y%m%d-%H%M%S-%N`. Une représentation du pipeline Dataflow semblable à la suivante s'affiche :

Vue des tâches du pipeline de détection d&#39;anomalies dans l&#39;interface utilisateur de surveillance de Dataflow.

Publier un message aberrant à des fins de test

Vous pouvez publier un message pour vérifier que le message aberrant est bien détecté par le pipeline.

  1. Dans Cloud Shell, publiez le message suivant :

    gcloud pubsub topics publish ${TOPIC_ID} --message \
    "{\"subscriberId\": \"00000000000000000\",  \
    \"srcIP\": \"12.0.9.4\", \
    \"dstIP\": \"12.0.1.3\", \
    \"srcPort\": 5000, \
    \"dstPort\": 3000, \
    \"txBytes\": 150000, \
    \"rxBytes\": 40000, \
    \"startTime\": 1570276550, \
    \"endTime\": 1570276550, \
    \"tcpFlag\": 0, \
    \"protocolName\": \"tcp\", \
    \"protocolNumber\": 0}"
    

    Notez le nombre inhabituellement élevé d'octets transmis (txBytes) et reçus (rxBytes) par rapport à la plage (100 à 500 octets) configurée pour les données synthétiques. Ce message peut constituer un risque de sécurité, qui nécessite une validation.

  2. Au bout d'une minute environ, vérifiez que l'anomalie est bien identifiée et stockée dans la table BigQuery :

    export OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    WHERE subscriber_id like "0%" limit 1'
    bq query --nouse_legacy_sql $OUTLIER_TABLE_QUERY >> outlier_orig.txt
    cat outlier_orig.txt
    

    Le résultat ressemble à ce qui suit :

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |   transaction_time |
    +---------------+--------------+----------------------------+
    | 00000000000| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

Créer un modèle de clustering en k-moyennes à l'aide de BigQuery ML

  1. Dans Cloud Console, accédez à la page Éditeur de requête de BigQuery.

    Accéder à l'éditeur de requête

  2. Sélectionnez les données d'entraînement dans la table de caractéristiques et créez un modèle de clustering en k-moyennes à l'aide de BigQuery ML :

    --> temp table for training data
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_ID.train_data as
    (SELECT * FROM DATASET_ID.cluster_model_data
    WHERE _PARTITIONDATE BETWEEN START_DATE AND END_DATE
    AND NOT IS_NAN(avg_tx_bytes)
    AND NOT IS_NAN(avg_rx_bytes)
    AND NOT IS_NAN(avg_duration))
    limit 100000;
    
    --> create a model using BigQuery ML
    #standardSQL
    CREATE OR REPLACE MODEL DATASET_ID.log_cluster options(model_type='kmeans', standardize_features = true) AS
    SELECT * EXCEPT (transaction_time,subscriber_id,number_of_unique_ips, number_of_unique_ports, dst_subnet)
    FROM DATASET_ID.train_data;
    

    Remplacez les éléments suivants :

    • START_DATE et END_DATE : date actuelle
    • DATASET_ID : ID de l'ensemble de données créé
  3. Normalisez les données pour chaque cluster :

    --> create normalize table for each centroid
    #standardSQL
    CREATE OR REPLACE TABLE DATASET_ID.normalized_centroid_data as(
    with centroid_details AS (
    SELECT centroid_id,array_agg(struct(feature as name, round(numerical_value,1) as value)
    order by centroid_id) AS cluster
    from ML.CENTROIDS(model DATASET_ID.log_cluster)
    group by centroid_id
    ),
    cluster as (select centroid_details.centroid_id as centroid_id,
    (select value from unnest(cluster) where name = 'number_of_records') AS number_of_records,
    (select value from unnest(cluster) where name = 'max_tx_bytes') AS max_tx_bytes,
    (select value from unnest(cluster) where name = 'min_tx_bytes') AS min_tx_bytes,
    (select value from unnest(cluster) where name = 'avg_tx_bytes') AS avg_tx_bytes,
    (select value from unnest(cluster) where name = 'max_rx_bytes') AS max_rx_bytes,
    (select value from unnest(cluster) where name = 'min_rx_bytes') AS min_rx_bytes,
    (select value from unnest(cluster) where name = 'avg_rx_bytes') AS avg_rx_bytes,
    (select value from unnest(cluster) where name = 'max_duration') AS max_duration,
    (select value from unnest(cluster) where name = 'min_duration') AS min_duration,
    (select value from unnest(cluster) where name = 'avg_duration') AS avg_duration
    FROM centroid_details order by centroid_id asc),
    predict as
    (select * from ML.PREDICT(model DATASET_ID.log_cluster,
    (select * from DATASET_I.train_data)))
    select c.centroid_id as centroid_id,
    (stddev((p.number_of_records-c.number_of_records)+(p.max_tx_bytes-c.max_tx_bytes)+(p.min_tx_bytes-c.min_tx_bytes)+(p.avg_tx_bytes-c.min_tx_bytes)+(p.max_rx_bytes-c.max_rx_bytes)+(p.min_rx_bytes-c.min_rx_bytes)+      (p.avg_rx_bytes-c.min_rx_bytes)
    +(p.max_duration-c.max_duration)+(p.min_duration-c.min_duration)+(p.avg_duration-c.avg_duration)))
    as normalized_dest, any_value(c.number_of_records) as number_of_records,any_value(c.max_tx_bytes) as max_tx_bytes,  any_value(c.min_tx_bytes) as min_tx_bytes , any_value(c.avg_tx_bytes) as   avg_tx_bytes,any_value(c.max_rx_bytes) as max_rx_bytes,   any_value(c.min_tx_bytes) as min_rx_bytes ,any_value(c.avg_rx_bytes) as avg_rx_bytes,  any_value(c.avg_duration) as avg_duration,any_value(c.max_duration)
    as max_duration , any_value(c.min_duration) as min_duration
    from predict as p
    inner join cluster as c on c.centroid_id = p.centroid_id
    group by c.centroid_id);
    

    Cette requête calcule une distance normalisée pour chaque cluster en utilisant la fonction d'écart type entre le vecteur d'entrée et le vecteur centroïde. En d'autres termes, elle met en œuvre la formule suivante :

    stddev(input_value_x-centroid_value_x)+(input_value_y-centroid_value_y)+(..))

  4. Validez la table normalized_centroid_data :

    #standardSQL
    SELECT * from <DATASET_ID>.normalized_centroid_data
    

    Remplacez DATASET_ID par l'ID de l'ensemble de données créé.

    Le résultat de cette instruction est une table des distances normalisées calculées pour chaque ID de centroïde :

    Données normalisées pour chaque cluster en k-moyennes.

Anonymiser les données à l'aide de Cloud DLP

Dans cette section, vous allez réutiliser le pipeline en transmettant un paramètre supplémentaire pour anonymiser le numéro IMSI (International Mobile Subscriber Identity) dans la colonne subscriber_id.

  1. Dans Cloud Shell, créez une clé cryptographique :

    export TEK=$(openssl rand -base64 32); echo ${TEK}
    a3ecrQAQJJ8oxVO8TZ/odlfjcujhWXjU/Xg5lEFiw5M=
    
  2. Pour lancer l'éditeur de code, cliquez sur Ouvrir l'éditeur  dans la barre d'outils de la fenêtre Cloud Shell.

  3. Cliquez sur Fichier > Nouveau fichier et créez un fichier nommé deid_template.json.

  4. Copiez le bloc JSON suivant dans le nouveau fichier :

    {
      "deidentifyTemplate": {
        "displayName": "Config to de-identify IMEI Number",
        "description": "IMEI Number masking transformation",
        "deidentifyConfig": {
          "recordTransformations": {
            "fieldTransformations": [
              {
                "fields": [
                  {
                    "name": "subscriber_id"
                  }
                ],
                "primitiveTransformation": {
                  "cryptoDeterministicConfig": {
                    "cryptoKey": {
                      "unwrapped": {
                        "key": "CRYPTO_KEY"
                      }
                    },
                    "surrogateInfoType": {
                      "name": "IMSI_TOKEN"
                    }
                  }
                }
              }
            ]
          }
        }
      },
      "templateId": "dlp-deid-subid"
    }
    

    Remplacez CRYPTO_KEY par la clé cryptographique que vous avez créée précédemment. Il est recommandé d'utiliser une clé encapsulée par Cloud KMS pour les charges de travail de production. Enregistrez le fichier.

  5. Dans la barre d'outils Cloud Shell, cliquez sur Ouvrir le terminal.

  6. Dans le terminal Cloud Shell, créez un modèle d'anonymisation Cloud DLP :

    export DLP_API_ROOT_URL="https://dlp.googleapis.com"
    export DEID_TEMPLATE_API="${DLP_API_ROOT_URL}/v2/projects/${PROJECT_ID}/deidentifyTemplates"
    export DEID_CONFIG="@deid_template.json"
    
    export ACCESS_TOKEN=$(gcloud auth print-access-token)
    curl -X POST -H "Content-Type: application/json" \
       -H "Authorization: Bearer ${ACCESS_TOKEN}" \
       "${DEID_TEMPLATE_API}" \
       -d "${DEID_CONFIG}"
    

    Cette commande crée un modèle portant le nom suivant dans votre projet Cloud :

    "name": "projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-sub-id"

  7. Arrêtez le pipeline que vous avez déclenché précédemment :

    gcloud dataflow jobs list --filter="name=anomaly-detection" --state=active
    
  8. Déclenchez le pipeline de détection d'anomalies en spécifiant le nom du modèle d'anonymisation Cloud DLP :

    gcloud beta dataflow flex-template run "anomaly-detection-with-dlp" \
    --project=${PROJECT_ID} \
    --region=us-central1 \
    --template-file-gcs-location=gs://${DF_TEMPLATE_CONFIG_BUCKET}/dynamic_template_secure_log_aggr_template.json \
    --parameters=autoscalingAlgorithm="NONE",\
    numWorkers=5,\
    maxNumWorkers=5,\
    workerMachineType=n1-highmem-4,\
    subscriberId=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION_ID},\
    tableSpec=${PROJECT_ID}:${DATASET_NAME}.cluster_model_data,\
    batchFrequency=2,\
    customGcsTempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    tempLocation=gs://${DF_TEMPLATE_CONFIG_BUCKET}/temp,\
    clusterQuery=gs://${DF_TEMPLATE_CONFIG_BUCKET}/normalized_cluster_data.sql,\
    outlierTableSpec=${PROJECT_ID}:${DATASET_NAME}.outlier_data,\
    inputFilePattern=gs://df-ml-anomaly-detection-mock-data/flow_log*.json,\
    workerDiskType=compute.googleapis.com/projects/${PROJECT_ID}/zones/us-central1-b/diskTypes/pd-ssd,\
    diskSizeGb=5,\
    windowInterval=10,\
    writeMethod=FILE_LOADS,\
    streaming=true,\
    deidTemplateName=projects/${PROJECT_ID}/deidentifyTemplates/dlp-deid-subid
    
  9. Interrogez la table des données aberrantes pour vérifier que l'ID d'abonné a bien été anonymisé :

    export DLP_OUTLIER_TABLE_QUERY='SELECT subscriber_id,dst_subnet,transaction_time
    FROM `'${PROJECT_ID}.${DATASET_NAME}'.outlier_data`
    ORDER BY transaction_time DESC'
    
    bq query --nouse_legacy_sql $DLP_OUTLIER_TABLE_QUERY >> outlier_deid.txt
    
    cat outlier_deid.txt
    

    Le résultat ressemble à ce qui suit :

    +---------------+--------------+----------------------------+
    | subscriber_id |  dst_subnet  |      transaction_time      |
    +---------------+--------------+----------------------------+
    | IMSI_TOKEN(64):AcZD2U2v//QiKkGzbFCm29pv5cqVi3Db09Z6CNt5cQSevBKRQvgdDfacPQIRY1dc| 12.0.1.3/22 | 2020-07-09 21:29:36.571000 |
    +---------------+--------------+----------------------------+
    

    Si l'ID d'abonné a été anonymisé, la colonne subscriber_id ne montre plus l'ID d'abonné d'origine, à savoir 00000000000.

Effectuer un nettoyage

Si vous n'avez pas l'intention de continuer à suivre les tutoriels de cette série, le moyen le plus simple d'éviter la facturation consiste à supprimer le projet Cloud que vous avez créé pour ce tutoriel. Vous pouvez également supprimer les différentes ressources.

Supprimer le projet

  1. Dans Cloud Console, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Étape suivante