Collecter les journaux AWS Elastic MapReduce

Compatible avec:

Ce document explique comment ingérer les journaux AWS Elastic MapReduce (EMR) dans Google Security Operations. AWS EMR est une plate-forme de big data native cloud qui traite rapidement de grandes quantités de données. Intégrer les journaux EMR à Google SecOps vous permet d'analyser l'activité du cluster et de détecter les menaces de sécurité potentielles.

Avant de commencer

  • Assurez-vous de disposer d'une instance Google SecOps.
  • Assurez-vous de disposer d'un accès privilégié à AWS.

Configurer le bucket Amazon S3

  1. Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur: Créer un bucket.
  2. Enregistrez le nom et la région du bucket pour une utilisation ultérieure.
  3. Créez un utilisateur en suivant le guide de l'utilisateur Créer un utilisateur IAM.
  4. Sélectionnez l'utilisateur créé.
  5. Sélectionnez l'onglet Informations d'identification de sécurité.
  6. Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
  7. Sélectionnez Service tiers comme Cas d'utilisation.
  8. Cliquez sur Suivant.
  9. Facultatif: ajoutez une balise de description.
  10. Cliquez sur Créer une clé d'accès.
  11. Cliquez sur Download CSV file (Télécharger le fichier CSV) pour enregistrer la clé d'accès et la clé d'accès secrète pour les utiliser ultérieurement.
  12. Cliquez sur OK.
  13. Sélectionnez l'onglet Autorisations.
  14. Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
  15. Sélectionnez Ajouter des autorisations.
  16. Sélectionnez Joindre directement des règles.
  17. Recherchez et sélectionnez les règles AmazonS3FullAccess et CloudWatchLogsFullAccess.
  18. Cliquez sur Suivant.
  19. Cliquez sur Ajouter des autorisations.

Configurer AWS EMR pour transférer les journaux

  1. Connectez-vous à l'AWS Management Console.
  2. Dans la barre de recherche, saisissez EMR, puis sélectionnez Amazon EMR dans la liste des services.
  3. Cliquez sur Clusters.
  4. Recherchez et sélectionnez le cluster EMR pour lequel vous souhaitez activer la journalisation.
  5. Cliquez sur Modifier sur la page Détails du cluster.
  6. Sur l'écran Edit Cluster (Modifier le cluster), accédez à la section Logging (Journalisation).
  7. Sélectionnez Activer la journalisation.
  8. Spécifiez le bucket S3 dans lequel les journaux seront stockés.
  9. Spécifiez l'URI S3 au format s3://your-bucket-name/ (tous les journaux EMR seront stockés à la racine du bucket).
  10. Sélectionnez les types de journaux suivants :
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs (si vous utilisez Hadoop)
  11. Cliquez sur Enregistrer.

Configurer un flux dans Google SecOps pour ingérer les journaux AWS EMR

  1. Accédez à SIEM Settings > Feeds (Paramètres du SIEM > Flux).
  2. Cliquez sur Ajouter.
  3. Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Journaux AWS EMR).
  4. Sélectionnez Amazon S3 comme Type de source.
  5. Sélectionnez AWS EMR comme Type de journal.
  6. Cliquez sur Suivant.
  7. Spécifiez les valeurs des paramètres d'entrée suivants:

    • Region (Région) : région dans laquelle se trouve le bucket Amazon S3.
    • URI S3: URI du bucket.
      • s3://your-log-bucket-name/
        • Remplacez your-log-bucket-name par le nom réel du bucket.
    • Un URI est: sélectionnez Répertoire ou Répertoire incluant des sous-répertoires.
    • Options de suppression de la source: sélectionnez l'option de suppression en fonction de vos préférences.

    • ID de clé d'accès: clé d'accès utilisateur ayant accès au bucket S3.

    • Clé d'accès secrète: clé secrète de l'utilisateur ayant accès au bucket S3.

    • Espace de noms des éléments: espace de noms des éléments.

    • Libellés d'ingestion: libellé à appliquer aux événements de ce flux.

  8. Cliquez sur Suivant.

  9. Vérifiez la configuration de votre nouveau flux dans l'écran Finaliser, puis cliquez sur Envoyer.

Tableau de mappage UDM

Champ de journal Mappage UDM Logique
app_id additional.fields[].key La valeur "APP" est attribuée via l'analyseur
app_id additional.fields[].value.string_value Mappé directement à partir du champ APP dans le journal brut.
app_name additional.fields[].key La valeur "APPNAME" est attribuée via l'analyseur
app_name additional.fields[].value.string_value Mappé directement à partir du champ APPNAME dans le journal brut.
blockid additional.fields[].key La valeur "blockid" est attribuée via l'analyseur
blockid additional.fields[].value.string_value Mappé directement à partir du champ blockid dans le journal brut.
bytes network.received_bytes Mappé directement à partir du champ bytes dans le journal brut, converti en entier non signé.
cliID additional.fields[].key La valeur "cliID" est attribuée via l'analyseur
cliID additional.fields[].value.string_value Mappé directement à partir du champ cliID dans le journal brut.
cmd target.process.command_line Mappé directement à partir du champ cmd dans le journal brut.
comp_name additional.fields[].key La valeur "COMP" est attribuée via l'analyseur
comp_name additional.fields[].value.string_value Mappé directement à partir du champ COMP dans le journal brut.
configuration_version additional.fields[].key La valeur "configuration_version" est attribuée via l'analyseur
configuration_version additional.fields[].value.string_value Mappé directement à partir du champ configuration_version dans le journal brut, converti en chaîne.
containerID additional.fields[].key La valeur "containerID" est attribuée via l'analyseur
containerID additional.fields[].value.string_value Mappé directement à partir du champ CONTAINERID dans le journal brut.
description security_result.description Mappé directement à partir du champ description dans le journal brut.
dfs.FSNamesystem.* additional.fields[].key La clé est générée en concatenant "dfs.FSNamesystem." avec la clé des données JSON.
dfs.FSNamesystem.* additional.fields[].value.string_value La valeur est mappée directement à partir de la valeur correspondante dans l'objet JSON dfs.FSNamesystem, convertie en chaîne.
duration additional.fields[].key La valeur "duration" est attribuée via l'analyseur
duration additional.fields[].value.string_value Mappé directement à partir du champ duration dans le journal brut.
duration network.session_duration.seconds Mappé directement à partir du champ duration dans le journal brut, converti en entier.
environment additional.fields[].key La valeur "environment" est attribuée via l'analyseur
environment additional.fields[].value.string_value Mappé directement à partir du champ environment dans le journal brut. Extrait du champ ip_port à l'aide de grok et de la manipulation de chaînes. Extrait du champ ip_port à l'aide de grok et de la manipulation de chaînes, converti en entier.
event_type metadata.event_type Déterminé par la logique de l'analyseur en fonction de la présence d'informations principal et target. Il peut s'agir de NETWORK_CONNECTION, USER_RESOURCE_ACCESS, STATUS_UPDATE ou GENERIC_EVENT.
file_path target.file.full_path Mappé directement à partir du champ file_path dans le journal brut.
host principal.hostname Mappé directement à partir du champ host dans le journal brut.
host target.hostname Mappé directement à partir du champ host dans le journal brut.
host_ip principal.ip Mappé directement à partir du champ host_ip dans le journal brut.
host_port principal.port Mappé directement à partir du champ host_port dans le journal brut, converti en entier.
http_url target.url Mappé directement à partir du champ http_url dans le journal brut.
index additional.fields[].key La valeur "index" est attribuée via l'analyseur
index additional.fields[].value.string_value Mappé directement à partir du champ index dans le journal brut.
kind metadata.product_event_type Mappé directement à partir du champ kind dans le journal brut. La valeur "AWS_EMR" est attribuée via l'analyseur La valeur "AWS EMR" est attribuée via l'analyseur La valeur "AMAZON" est attribuée via l'analyseur
offset additional.fields[].key La valeur "offset" est attribuée via l'analyseur
offset additional.fields[].value.string_value Mappé directement à partir du champ offset dans le journal brut.
op metadata.product_event_type Mappé directement à partir du champ op ou OPERATION dans le journal brut.
proto network.application_protocol Extrait du champ http_url à l'aide de grok, converti en majuscules.
puppet_version additional.fields[].key La valeur "puppet_version" est attribuée via l'analyseur
puppet_version additional.fields[].value.string_value Mappé directement à partir du champ puppet_version dans le journal brut.
queue_name additional.fields[].key La valeur "queue_name" est attribuée via l'analyseur
queue_name additional.fields[].value.string_value Mappé directement à partir du champ queue_name dans le journal brut.
report_format additional.fields[].key La valeur "report_format" est attribuée via l'analyseur
report_format additional.fields[].value.string_value Mappé directement à partir du champ report_format dans le journal brut, converti en chaîne.
resource additional.fields[].key La valeur "resource" est attribuée via l'analyseur
resource additional.fields[].value.string_value Mappé directement à partir du champ resource dans le journal brut.
result security_result.action_details Mappé directement à partir du champ RESULT dans le journal brut.
security_id additional.fields[].key La valeur "security_id" est attribuée via l'analyseur
security_id additional.fields[].value.string_value Mappé directement à partir du champ security_id dans le journal brut.
severity security_result.severity Mappé à partir du champ severity dans le journal brut. INFO est mappé sur INFORMATIONAL, WARN est mappé sur MEDIUM.
srvID additional.fields[].key La valeur "srvID" est attribuée via l'analyseur
srvID additional.fields[].value.string_value Mappé directement à partir du champ srvID dans le journal brut.
status additional.fields[].key La valeur "status" est attribuée via l'analyseur
status additional.fields[].value.string_value Mappé directement à partir du champ status dans le journal brut.
summary security_result.summary Mappé directement à partir du champ summary dans le journal brut.
target_app target.application Mappé directement à partir du champ TARGET dans le journal brut.
target_ip target.ip Mappé directement à partir du champ target_ip ou IP dans le journal brut.
target_port target.port Mappé directement à partir du champ target_port dans le journal brut, converti en entier.
timestamp metadata.event_timestamp Mappé directement à partir du champ timestamp dans le journal brut, analysé en tant qu'horodatage ISO8601.
timestamp event.timestamp Mappé directement à partir du champ timestamp dans le journal brut, analysé en tant qu'horodatage ISO8601.
trade_date additional.fields[].key La valeur "trade_date" est attribuée via l'analyseur
trade_date additional.fields[].value.string_value Mappé directement à partir du champ trade_date dans le journal brut.
transaction_uuid additional.fields[].key La valeur "transaction_uuid" est attribuée via l'analyseur
transaction_uuid additional.fields[].value.string_value Mappé directement à partir du champ transaction_uuid dans le journal brut.
type additional.fields[].key La valeur "type" est attribuée via l'analyseur
type additional.fields[].value.string_value Mappé directement à partir du champ type dans le journal brut.
user target.user.userid Mappé directement à partir du champ USER ou ugi dans le journal brut.

Modifications

2023-12-19

  • Correction de bug: correction des résultats incohérents pour le modèle Grok.

2023-10-30

  • Analyseur nouvellement créé.

Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.