Raccogliere i log di AWS Elastic MapReduce
Questo documento spiega come importare i log di AWS Elastic MapReduce (EMR) in Google Security Operations. AWS EMR è una piattaforma big data cloud-native che elabora rapidamente grandi quantità di dati. L'integrazione dei log di EMR in Google SecOps ti consente di analizzare l'attività del cluster e rilevare potenziali minacce alla sicurezza.
Prima di iniziare
- Assicurati di avere un'istanza Google SecOps.
- Assicurati di disporre dell'accesso con privilegi ad AWS.
Configura il bucket Amazon S3
- Crea un bucket Amazon S3 seguendo questa guida dell'utente: Creare un bucket
- Salva Nome e Regione del bucket per utilizzarli in un secondo momento.
- Crea un utente seguendo questa guida dell'utente: Creare un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per utilizzarle in un secondo momento.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri di autorizzazione.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Collega direttamente i criteri.
- Cerca e seleziona i criteri AmazonS3FullAccess e CloudWatchLogsFullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Configurare AWS EMR per inoltrare i log
- Accedi alla AWS Management Console.
- Nella barra di ricerca, digita EMR e seleziona Amazon EMR dall'elenco dei servizi.
- Fai clic su Cluster.
- Individua e seleziona il cluster EMR per cui vuoi attivare il logging.
- Fai clic su Modifica nella pagina Dettagli cluster.
- Nella schermata Modifica cluster, vai alla sezione Logging.
- Seleziona Attiva il logging.
- Specifica il bucket S3 in cui verranno archiviati i log.
- Specifica l'URI S3 nel formato
s3://your-bucket-name/
(in questo modo tutti i log di EMR verranno archiviati nella radice del bucket). - Seleziona i seguenti tipi di log:
Step logs
Application logs
YARN logs
System logs
HDFS Logs
(se utilizzi Hadoop)
- Fai clic su Salva.
Configura un feed in Google SecOps per importare i log di AWS EMR
- Vai a Impostazioni SIEM > Feed.
- Fai clic su Aggiungi nuovo.
- Nel campo Nome feed, inserisci un nome per il feed (ad esempio Log AWS EMR).
- Seleziona Amazon S3 come Tipo di origine.
- Seleziona AWS EMR come Tipo di log.
- Fai clic su Avanti.
Specifica i valori per i seguenti parametri di input:
- Regione: la regione in cui si trova il bucket Amazon S3.
- URI S3: l'URI del bucket.
s3://your-log-bucket-name/
- Sostituisci
your-log-bucket-name
con il nome effettivo del bucket.
- Sostituisci
- L'URI è una: seleziona Directory o Directory che include sottodirectory.
Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
ID chiave di accesso: la chiave di accesso utente con accesso al bucket S3.
Chiave di accesso segreta: la chiave segreta dell'utente con accesso al bucket S3.
Spazio dei nomi degli asset: lo spazio dei nomi degli asset.
Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.
Fai clic su Avanti.
Rivedi la configurazione del nuovo feed nella schermata Concludi e poi fai clic su Invia.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logica |
---|---|---|
app_id |
additional.fields[].key |
Il valore "APP" viene assegnato tramite il parser |
app_id |
additional.fields[].value.string_value |
Mappato direttamente dal campo APP nel log non elaborato. |
app_name |
additional.fields[].key |
Il valore "APPNAME" viene assegnato tramite il parser |
app_name |
additional.fields[].value.string_value |
Mappato direttamente dal campo APPNAME nel log non elaborato. |
blockid |
additional.fields[].key |
Il valore "blockid" viene assegnato tramite il parser |
blockid |
additional.fields[].value.string_value |
Mappato direttamente dal campo blockid nel log non elaborato. |
bytes |
network.received_bytes |
Mappato direttamente dal campo bytes nel log non elaborato, convertito in un numero intero non firmato. |
cliID |
additional.fields[].key |
Il valore "cliID" viene assegnato tramite il parser |
cliID |
additional.fields[].value.string_value |
Mappato direttamente dal campo cliID nel log non elaborato. |
cmd |
target.process.command_line |
Mappato direttamente dal campo cmd nel log non elaborato. |
comp_name |
additional.fields[].key |
Il valore "COMP" viene assegnato tramite il parser |
comp_name |
additional.fields[].value.string_value |
Mappato direttamente dal campo COMP nel log non elaborato. |
configuration_version |
additional.fields[].key |
Il valore "configuration_version" viene assegnato tramite il parser |
configuration_version |
additional.fields[].value.string_value |
Mappato direttamente dal campo configuration_version nel log non elaborato, convertito in una stringa. |
containerID |
additional.fields[].key |
Il valore "containerID" viene assegnato tramite il parser |
containerID |
additional.fields[].value.string_value |
Mappato direttamente dal campo CONTAINERID nel log non elaborato. |
description |
security_result.description |
Mappato direttamente dal campo description nel log non elaborato. |
dfs.FSNamesystem.* |
additional.fields[].key |
La chiave viene generata concatenando "dfs.FSNamesystem." con la chiave dei dati JSON. |
dfs.FSNamesystem.* |
additional.fields[].value.string_value |
Il valore viene mappato direttamente dal valore corrispondente nell'oggetto JSON dfs.FSNamesystem , convertito in una stringa. |
duration |
additional.fields[].key |
Il valore "duration" viene assegnato tramite il parser |
duration |
additional.fields[].value.string_value |
Mappato direttamente dal campo duration nel log non elaborato. |
duration |
network.session_duration.seconds |
Mappato direttamente dal campo duration nel log non elaborato, convertito in un numero intero. |
environment |
additional.fields[].key |
Il valore "environment" viene assegnato tramite il parser |
environment |
additional.fields[].value.string_value |
Mappato direttamente dal campo environment nel log non elaborato. Estratte dal campo ip_port utilizzando grok e la manipolazione di stringhe. Estratto dal campo ip_port utilizzando grok e la manipolazione di stringhe, convertito in un numero intero. |
event_type |
metadata.event_type |
Determinato dalla logica dell'analizzatore in base alla presenza di informazioni principal e target . Può essere NETWORK_CONNECTION , USER_RESOURCE_ACCESS , STATUS_UPDATE o GENERIC_EVENT . |
file_path |
target.file.full_path |
Mappato direttamente dal campo file_path nel log non elaborato. |
host |
principal.hostname |
Mappato direttamente dal campo host nel log non elaborato. |
host |
target.hostname |
Mappato direttamente dal campo host nel log non elaborato. |
host_ip |
principal.ip |
Mappato direttamente dal campo host_ip nel log non elaborato. |
host_port |
principal.port |
Mappato direttamente dal campo host_port nel log non elaborato, convertito in un numero intero. |
http_url |
target.url |
Mappato direttamente dal campo http_url nel log non elaborato. |
index |
additional.fields[].key |
Il valore "index" viene assegnato tramite il parser |
index |
additional.fields[].value.string_value |
Mappato direttamente dal campo index nel log non elaborato. |
kind |
metadata.product_event_type |
Mappato direttamente dal campo kind nel log non elaborato. Il valore "AWS_EMR" viene assegnato tramite il parser Il valore "AWS EMR" viene assegnato tramite il parser Il valore "AMAZON" viene assegnato tramite il parser |
offset |
additional.fields[].key |
Il valore "offset" viene assegnato tramite il parser |
offset |
additional.fields[].value.string_value |
Mappato direttamente dal campo offset nel log non elaborato. |
op |
metadata.product_event_type |
Mappato direttamente dal campo op o OPERATION nel log non elaborato. |
proto |
network.application_protocol |
Estratto dal campo http_url utilizzando grok, convertito in maiuscolo. |
puppet_version |
additional.fields[].key |
Il valore "puppet_version" viene assegnato tramite il parser |
puppet_version |
additional.fields[].value.string_value |
Mappato direttamente dal campo puppet_version nel log non elaborato. |
queue_name |
additional.fields[].key |
Il valore "queue_name" viene assegnato tramite il parser |
queue_name |
additional.fields[].value.string_value |
Mappato direttamente dal campo queue_name nel log non elaborato. |
report_format |
additional.fields[].key |
Il valore "report_format" viene assegnato tramite il parser |
report_format |
additional.fields[].value.string_value |
Mappato direttamente dal campo report_format nel log non elaborato, convertito in una stringa. |
resource |
additional.fields[].key |
Il valore "resource" viene assegnato tramite il parser |
resource |
additional.fields[].value.string_value |
Mappato direttamente dal campo resource nel log non elaborato. |
result |
security_result.action_details |
Mappato direttamente dal campo RESULT nel log non elaborato. |
security_id |
additional.fields[].key |
Il valore "security_id" viene assegnato tramite il parser |
security_id |
additional.fields[].value.string_value |
Mappato direttamente dal campo security_id nel log non elaborato. |
severity |
security_result.severity |
Mappato dal campo severity nel log non elaborato. INFO è mappato a INFORMATIONAL , WARN è mappato a MEDIUM . |
srvID |
additional.fields[].key |
Il valore "srvID" viene assegnato tramite il parser |
srvID |
additional.fields[].value.string_value |
Mappato direttamente dal campo srvID nel log non elaborato. |
status |
additional.fields[].key |
Il valore "status" viene assegnato tramite il parser |
status |
additional.fields[].value.string_value |
Mappato direttamente dal campo status nel log non elaborato. |
summary |
security_result.summary |
Mappato direttamente dal campo summary nel log non elaborato. |
target_app |
target.application |
Mappato direttamente dal campo TARGET nel log non elaborato. |
target_ip |
target.ip |
Mappato direttamente dal campo target_ip o IP nel log non elaborato. |
target_port |
target.port |
Mappato direttamente dal campo target_port nel log non elaborato, convertito in un numero intero. |
timestamp |
metadata.event_timestamp |
Mappato direttamente dal campo timestamp nel log non elaborato, analizzato come timestamp ISO8601. |
timestamp |
event.timestamp |
Mappato direttamente dal campo timestamp nel log non elaborato, analizzato come timestamp ISO8601. |
trade_date |
additional.fields[].key |
Il valore "trade_date" viene assegnato tramite il parser |
trade_date |
additional.fields[].value.string_value |
Mappato direttamente dal campo trade_date nel log non elaborato. |
transaction_uuid |
additional.fields[].key |
Il valore "transaction_uuid" viene assegnato tramite il parser |
transaction_uuid |
additional.fields[].value.string_value |
Mappato direttamente dal campo transaction_uuid nel log non elaborato. |
type |
additional.fields[].key |
Il valore "type" viene assegnato tramite il parser |
type |
additional.fields[].value.string_value |
Mappato direttamente dal campo type nel log non elaborato. |
user |
target.user.userid |
Mappato direttamente dal campo USER o ugi nel log non elaborato. |
Modifiche
2023-12-19
- Correzione di bug: sono stati corretti i risultati inaffidabili per il pattern Grok.
2023-10-30
- Parser appena creato.
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.