Collecter les journaux AWS Elastic MapReduce
Ce document explique comment ingérer les journaux AWS Elastic MapReduce (EMR) dans Google Security Operations. AWS EMR est une plate-forme de big data native cloud qui traite rapidement de grandes quantités de données. Intégrer les journaux EMR à Google SecOps vous permet d'analyser l'activité du cluster et de détecter les menaces de sécurité potentielles.
Avant de commencer
- Assurez-vous de disposer d'une instance Google SecOps.
- Assurez-vous de disposer d'un accès privilégié à AWS.
Configurer le bucket Amazon S3
- Créez un bucket Amazon S3 en suivant ce guide de l'utilisateur: Créer un bucket.
- Enregistrez le nom et la région du bucket pour une utilisation ultérieure.
- Créez un utilisateur en suivant le guide de l'utilisateur Créer un utilisateur IAM.
- Sélectionnez l'utilisateur créé.
- Sélectionnez l'onglet Informations d'identification de sécurité.
- Cliquez sur Créer une clé d'accès dans la section Clés d'accès.
- Sélectionnez Service tiers comme Cas d'utilisation.
- Cliquez sur Suivant.
- Facultatif: ajoutez une balise de description.
- Cliquez sur Créer une clé d'accès.
- Cliquez sur Download CSV file (Télécharger le fichier CSV) pour enregistrer la clé d'accès et la clé d'accès secrète pour les utiliser ultérieurement.
- Cliquez sur OK.
- Sélectionnez l'onglet Autorisations.
- Cliquez sur Ajouter des autorisations dans la section Règles d'autorisation.
- Sélectionnez Ajouter des autorisations.
- Sélectionnez Joindre directement des règles.
- Recherchez et sélectionnez les règles AmazonS3FullAccess et CloudWatchLogsFullAccess.
- Cliquez sur Suivant.
- Cliquez sur Ajouter des autorisations.
Configurer AWS EMR pour transférer les journaux
- Connectez-vous à l'AWS Management Console.
- Dans la barre de recherche, saisissez EMR, puis sélectionnez Amazon EMR dans la liste des services.
- Cliquez sur Clusters.
- Recherchez et sélectionnez le cluster EMR pour lequel vous souhaitez activer la journalisation.
- Cliquez sur Modifier sur la page Détails du cluster.
- Sur l'écran Edit Cluster (Modifier le cluster), accédez à la section Logging (Journalisation).
- Sélectionnez Activer la journalisation.
- Spécifiez le bucket S3 dans lequel les journaux seront stockés.
- Spécifiez l'URI S3 au format
s3://your-bucket-name/
(tous les journaux EMR seront stockés à la racine du bucket). - Sélectionnez les types de journaux suivants :
Step logs
Application logs
YARN logs
System logs
HDFS Logs
(si vous utilisez Hadoop)
- Cliquez sur Enregistrer.
Configurer un flux dans Google SecOps pour ingérer les journaux AWS EMR
- Accédez à SIEM Settings > Feeds (Paramètres du SIEM > Flux).
- Cliquez sur Ajouter.
- Dans le champ Nom du flux, saisissez un nom pour le flux (par exemple, Journaux AWS EMR).
- Sélectionnez Amazon S3 comme Type de source.
- Sélectionnez AWS EMR comme Type de journal.
- Cliquez sur Suivant.
Spécifiez les valeurs des paramètres d'entrée suivants:
- Region (Région) : région dans laquelle se trouve le bucket Amazon S3.
- URI S3: URI du bucket.
s3://your-log-bucket-name/
- Remplacez
your-log-bucket-name
par le nom réel du bucket.
- Remplacez
- Un URI est: sélectionnez Répertoire ou Répertoire incluant des sous-répertoires.
Options de suppression de la source: sélectionnez l'option de suppression en fonction de vos préférences.
ID de clé d'accès: clé d'accès utilisateur ayant accès au bucket S3.
Clé d'accès secrète: clé secrète de l'utilisateur ayant accès au bucket S3.
Espace de noms des éléments: espace de noms des éléments.
Libellés d'ingestion: libellé à appliquer aux événements de ce flux.
Cliquez sur Suivant.
Vérifiez la configuration de votre nouveau flux dans l'écran Finaliser, puis cliquez sur Envoyer.
Tableau de mappage UDM
Champ de journal | Mappage UDM | Logique |
---|---|---|
app_id |
additional.fields[].key |
La valeur "APP" est attribuée via l'analyseur |
app_id |
additional.fields[].value.string_value |
Mappé directement à partir du champ APP dans le journal brut. |
app_name |
additional.fields[].key |
La valeur "APPNAME" est attribuée via l'analyseur |
app_name |
additional.fields[].value.string_value |
Mappé directement à partir du champ APPNAME dans le journal brut. |
blockid |
additional.fields[].key |
La valeur "blockid" est attribuée via l'analyseur |
blockid |
additional.fields[].value.string_value |
Mappé directement à partir du champ blockid dans le journal brut. |
bytes |
network.received_bytes |
Mappé directement à partir du champ bytes dans le journal brut, converti en entier non signé. |
cliID |
additional.fields[].key |
La valeur "cliID" est attribuée via l'analyseur |
cliID |
additional.fields[].value.string_value |
Mappé directement à partir du champ cliID dans le journal brut. |
cmd |
target.process.command_line |
Mappé directement à partir du champ cmd dans le journal brut. |
comp_name |
additional.fields[].key |
La valeur "COMP" est attribuée via l'analyseur |
comp_name |
additional.fields[].value.string_value |
Mappé directement à partir du champ COMP dans le journal brut. |
configuration_version |
additional.fields[].key |
La valeur "configuration_version" est attribuée via l'analyseur |
configuration_version |
additional.fields[].value.string_value |
Mappé directement à partir du champ configuration_version dans le journal brut, converti en chaîne. |
containerID |
additional.fields[].key |
La valeur "containerID" est attribuée via l'analyseur |
containerID |
additional.fields[].value.string_value |
Mappé directement à partir du champ CONTAINERID dans le journal brut. |
description |
security_result.description |
Mappé directement à partir du champ description dans le journal brut. |
dfs.FSNamesystem.* |
additional.fields[].key |
La clé est générée en concatenant "dfs.FSNamesystem." avec la clé des données JSON. |
dfs.FSNamesystem.* |
additional.fields[].value.string_value |
La valeur est mappée directement à partir de la valeur correspondante dans l'objet JSON dfs.FSNamesystem , convertie en chaîne. |
duration |
additional.fields[].key |
La valeur "duration" est attribuée via l'analyseur |
duration |
additional.fields[].value.string_value |
Mappé directement à partir du champ duration dans le journal brut. |
duration |
network.session_duration.seconds |
Mappé directement à partir du champ duration dans le journal brut, converti en entier. |
environment |
additional.fields[].key |
La valeur "environment" est attribuée via l'analyseur |
environment |
additional.fields[].value.string_value |
Mappé directement à partir du champ environment dans le journal brut. Extrait du champ ip_port à l'aide de grok et de la manipulation de chaînes. Extrait du champ ip_port à l'aide de grok et de la manipulation de chaînes, converti en entier. |
event_type |
metadata.event_type |
Déterminé par la logique de l'analyseur en fonction de la présence d'informations principal et target . Il peut s'agir de NETWORK_CONNECTION , USER_RESOURCE_ACCESS , STATUS_UPDATE ou GENERIC_EVENT . |
file_path |
target.file.full_path |
Mappé directement à partir du champ file_path dans le journal brut. |
host |
principal.hostname |
Mappé directement à partir du champ host dans le journal brut. |
host |
target.hostname |
Mappé directement à partir du champ host dans le journal brut. |
host_ip |
principal.ip |
Mappé directement à partir du champ host_ip dans le journal brut. |
host_port |
principal.port |
Mappé directement à partir du champ host_port dans le journal brut, converti en entier. |
http_url |
target.url |
Mappé directement à partir du champ http_url dans le journal brut. |
index |
additional.fields[].key |
La valeur "index" est attribuée via l'analyseur |
index |
additional.fields[].value.string_value |
Mappé directement à partir du champ index dans le journal brut. |
kind |
metadata.product_event_type |
Mappé directement à partir du champ kind dans le journal brut. La valeur "AWS_EMR" est attribuée via l'analyseur La valeur "AWS EMR" est attribuée via l'analyseur La valeur "AMAZON" est attribuée via l'analyseur |
offset |
additional.fields[].key |
La valeur "offset" est attribuée via l'analyseur |
offset |
additional.fields[].value.string_value |
Mappé directement à partir du champ offset dans le journal brut. |
op |
metadata.product_event_type |
Mappé directement à partir du champ op ou OPERATION dans le journal brut. |
proto |
network.application_protocol |
Extrait du champ http_url à l'aide de grok, converti en majuscules. |
puppet_version |
additional.fields[].key |
La valeur "puppet_version" est attribuée via l'analyseur |
puppet_version |
additional.fields[].value.string_value |
Mappé directement à partir du champ puppet_version dans le journal brut. |
queue_name |
additional.fields[].key |
La valeur "queue_name" est attribuée via l'analyseur |
queue_name |
additional.fields[].value.string_value |
Mappé directement à partir du champ queue_name dans le journal brut. |
report_format |
additional.fields[].key |
La valeur "report_format" est attribuée via l'analyseur |
report_format |
additional.fields[].value.string_value |
Mappé directement à partir du champ report_format dans le journal brut, converti en chaîne. |
resource |
additional.fields[].key |
La valeur "resource" est attribuée via l'analyseur |
resource |
additional.fields[].value.string_value |
Mappé directement à partir du champ resource dans le journal brut. |
result |
security_result.action_details |
Mappé directement à partir du champ RESULT dans le journal brut. |
security_id |
additional.fields[].key |
La valeur "security_id" est attribuée via l'analyseur |
security_id |
additional.fields[].value.string_value |
Mappé directement à partir du champ security_id dans le journal brut. |
severity |
security_result.severity |
Mappé à partir du champ severity dans le journal brut. INFO est mappé sur INFORMATIONAL , WARN est mappé sur MEDIUM . |
srvID |
additional.fields[].key |
La valeur "srvID" est attribuée via l'analyseur |
srvID |
additional.fields[].value.string_value |
Mappé directement à partir du champ srvID dans le journal brut. |
status |
additional.fields[].key |
La valeur "status" est attribuée via l'analyseur |
status |
additional.fields[].value.string_value |
Mappé directement à partir du champ status dans le journal brut. |
summary |
security_result.summary |
Mappé directement à partir du champ summary dans le journal brut. |
target_app |
target.application |
Mappé directement à partir du champ TARGET dans le journal brut. |
target_ip |
target.ip |
Mappé directement à partir du champ target_ip ou IP dans le journal brut. |
target_port |
target.port |
Mappé directement à partir du champ target_port dans le journal brut, converti en entier. |
timestamp |
metadata.event_timestamp |
Mappé directement à partir du champ timestamp dans le journal brut, analysé en tant qu'horodatage ISO8601. |
timestamp |
event.timestamp |
Mappé directement à partir du champ timestamp dans le journal brut, analysé en tant qu'horodatage ISO8601. |
trade_date |
additional.fields[].key |
La valeur "trade_date" est attribuée via l'analyseur |
trade_date |
additional.fields[].value.string_value |
Mappé directement à partir du champ trade_date dans le journal brut. |
transaction_uuid |
additional.fields[].key |
La valeur "transaction_uuid" est attribuée via l'analyseur |
transaction_uuid |
additional.fields[].value.string_value |
Mappé directement à partir du champ transaction_uuid dans le journal brut. |
type |
additional.fields[].key |
La valeur "type" est attribuée via l'analyseur |
type |
additional.fields[].value.string_value |
Mappé directement à partir du champ type dans le journal brut. |
user |
target.user.userid |
Mappé directement à partir du champ USER ou ugi dans le journal brut. |
Modifications
2023-12-19
- Correction de bug: correction des résultats incohérents pour le modèle Grok.
2023-10-30
- Analyseur nouvellement créé.
Vous avez encore besoin d'aide ? Obtenez des réponses de membres de la communauté et de professionnels Google SecOps.