Raccogliere i log di AWS Elastic MapReduce
Questo documento spiega come importare i log di AWS Elastic MapReduce (EMR) in Google Security Operations. AWS EMR è una piattaforma di big data nativa per il cloud che elabora rapidamente grandi quantità di dati. L'integrazione dei log EMR in Google SecOps ti consente di analizzare l'attività del cluster e rilevare potenziali minacce alla sicurezza.
Prima di iniziare
Assicurati di soddisfare i seguenti prerequisiti:
- Istanza Google SecOps
- Accesso privilegiato ad AWS
Configura il bucket Amazon S3
- Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
- Salva il nome e la regione del bucket per utilizzarli in un secondo momento.
- Crea un utente seguendo questa guida: Creazione di un utente IAM.
- Seleziona l'utente creato.
- Seleziona la scheda Credenziali di sicurezza.
- Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
- Seleziona Servizio di terze parti come Caso d'uso.
- Fai clic su Avanti.
- (Facoltativo) Aggiungi un tag di descrizione.
- Fai clic su Crea chiave di accesso.
- Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per un utilizzo successivo.
- Fai clic su Fine.
- Seleziona la scheda Autorizzazioni.
- Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
- Seleziona Aggiungi autorizzazioni.
- Seleziona Allega direttamente i criteri.
- Cerca e seleziona i criteri AmazonS3FullAccess e CloudWatchLogsFullAccess.
- Fai clic su Avanti.
- Fai clic su Aggiungi autorizzazioni.
Come configurare AWS EMR per inoltrare i log
- Accedi alla console di gestione AWS.
- Nella barra di ricerca, digita EMR e seleziona Amazon EMR dall'elenco dei servizi.
- Fai clic su Cluster.
- Trova e seleziona il cluster EMR per cui vuoi abilitare la registrazione.
- Fai clic su Modifica nella pagina Dettagli cluster.
- Nella schermata Modifica cluster, vai alla sezione Logging.
- Seleziona Attiva il logging.
- Specifica il bucket S3 in cui verranno archiviati i log.
- Specifica l'URI S3 nel formato
s3://your-bucket-name/
(in questo modo tutti i log EMR verranno archiviati nella radice del bucket). - Seleziona i seguenti tipi di log:
Step logs
Application logs
YARN logs
System logs
HDFS Logs
(se utilizzi Hadoop)
- Fai clic su Salva.
Configurare i feed
Esistono due diversi punti di accesso per configurare i feed nella piattaforma Google SecOps:
- Impostazioni SIEM > Feed > Aggiungi nuovo feed
- Hub dei contenuti > Pacchetti di contenuti > Inizia
Come configurare il feed AWS EMR
- Fai clic sul pacchetto Amazon Cloud Platform.
- Individua il tipo di log AWS EMR.
Specifica i valori nei seguenti campi.
- Tipo di origine: Amazon SQS V2
- Nome coda: il nome della coda SQS da cui leggere
- URI S3: l'URI del bucket.
s3://your-log-bucket-name/
- Sostituisci
your-log-bucket-name
con il nome effettivo del bucket S3.
- Sostituisci
Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze di importazione.
Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
ID chiave di accesso alla coda SQS: una chiave di accesso all'account che è una stringa alfanumerica di 20 caratteri.
Chiave di accesso segreta della coda SQS: una chiave di accesso all'account che è una stringa alfanumerica di 40 caratteri.
Opzioni avanzate
- Nome feed: un valore precompilato che identifica il feed.
- Spazio dei nomi dell'asset: lo spazio dei nomi associato al feed.
- Etichette di importazione: etichette applicate a tutti gli eventi di questo feed.
Fai clic su Crea feed.
Per ulteriori informazioni sulla configurazione di più feed per diversi tipi di log all'interno di questa famiglia di prodotti, consulta Configurare i feed per prodotto.
Tabella di mappatura UDM
Campo log | Mappatura UDM | Logic |
---|---|---|
app_id |
additional.fields[].key |
Il valore "APP" viene assegnato tramite il parser |
app_id |
additional.fields[].value.string_value |
Mappato direttamente dal campo APP nel log non elaborato. |
app_name |
additional.fields[].key |
Il valore "APPNAME" viene assegnato tramite il parser |
app_name |
additional.fields[].value.string_value |
Mappato direttamente dal campo APPNAME nel log non elaborato. |
blockid |
additional.fields[].key |
Il valore "blockid" viene assegnato tramite il parser |
blockid |
additional.fields[].value.string_value |
Mappato direttamente dal campo blockid nel log non elaborato. |
bytes |
network.received_bytes |
Mappato direttamente dal campo bytes nel log non elaborato, convertito in un numero intero senza segno. |
cliID |
additional.fields[].key |
Il valore "cliID" viene assegnato tramite il parser |
cliID |
additional.fields[].value.string_value |
Mappato direttamente dal campo cliID nel log non elaborato. |
cmd |
target.process.command_line |
Mappato direttamente dal campo cmd nel log non elaborato. |
comp_name |
additional.fields[].key |
Il valore "COMP" viene assegnato tramite il parser |
comp_name |
additional.fields[].value.string_value |
Mappato direttamente dal campo COMP nel log non elaborato. |
configuration_version |
additional.fields[].key |
Il valore "configuration_version" viene assegnato tramite il parser |
configuration_version |
additional.fields[].value.string_value |
Mappato direttamente dal campo configuration_version nel log non elaborato, convertito in una stringa. |
containerID |
additional.fields[].key |
Il valore "containerID" viene assegnato tramite il parser |
containerID |
additional.fields[].value.string_value |
Mappato direttamente dal campo CONTAINERID nel log non elaborato. |
description |
security_result.description |
Mappato direttamente dal campo description nel log non elaborato. |
dfs.FSNamesystem.* |
additional.fields[].key |
La chiave viene generata concatenando "dfs.FSNamesystem." con la chiave dei dati JSON. |
dfs.FSNamesystem.* |
additional.fields[].value.string_value |
Il valore viene mappato direttamente dal valore corrispondente nell'oggetto JSON dfs.FSNamesystem , convertito in una stringa. |
duration |
additional.fields[].key |
Il valore "duration" viene assegnato tramite il parser |
duration |
additional.fields[].value.string_value |
Mappato direttamente dal campo duration nel log non elaborato. |
duration |
network.session_duration.seconds |
Mappato direttamente dal campo duration nel log non elaborato, convertito in un numero intero. |
environment |
additional.fields[].key |
Il valore "environment" viene assegnato tramite il parser |
environment |
additional.fields[].value.string_value |
Mappato direttamente dal campo environment nel log non elaborato. Estratto dal campo ip_port utilizzando grok e la manipolazione delle stringhe. Estratto dal campo ip_port utilizzando grok e la manipolazione delle stringhe, convertito in un numero intero. |
event_type |
metadata.event_type |
Determinato dalla logica del parser in base alla presenza delle informazioni principal e target . Può essere NETWORK_CONNECTION , USER_RESOURCE_ACCESS , STATUS_UPDATE o GENERIC_EVENT . |
file_path |
target.file.full_path |
Mappato direttamente dal campo file_path nel log non elaborato. |
host |
principal.hostname |
Mappato direttamente dal campo host nel log non elaborato. |
host |
target.hostname |
Mappato direttamente dal campo host nel log non elaborato. |
host_ip |
principal.ip |
Mappato direttamente dal campo host_ip nel log non elaborato. |
host_port |
principal.port |
Mappato direttamente dal campo host_port nel log non elaborato, convertito in un numero intero. |
http_url |
target.url |
Mappato direttamente dal campo http_url nel log non elaborato. |
index |
additional.fields[].key |
Il valore "index" viene assegnato tramite il parser |
index |
additional.fields[].value.string_value |
Mappato direttamente dal campo index nel log non elaborato. |
kind |
metadata.product_event_type |
Mappato direttamente dal campo kind nel log non elaborato. Il valore "AWS_EMR" viene assegnato tramite il parser. Il valore "AWS EMR" viene assegnato tramite il parser. Il valore "AMAZON" viene assegnato tramite il parser. |
offset |
additional.fields[].key |
Il valore "offset" viene assegnato tramite il parser |
offset |
additional.fields[].value.string_value |
Mappato direttamente dal campo offset nel log non elaborato. |
op |
metadata.product_event_type |
Mappato direttamente dal campo op o OPERATION nel log non elaborato. |
proto |
network.application_protocol |
Estratto dal campo http_url utilizzando grok, convertito in maiuscolo. |
puppet_version |
additional.fields[].key |
Il valore "puppet_version" viene assegnato tramite il parser |
puppet_version |
additional.fields[].value.string_value |
Mappato direttamente dal campo puppet_version nel log non elaborato. |
queue_name |
additional.fields[].key |
Il valore "queue_name" viene assegnato tramite il parser |
queue_name |
additional.fields[].value.string_value |
Mappato direttamente dal campo queue_name nel log non elaborato. |
report_format |
additional.fields[].key |
Il valore "report_format" viene assegnato tramite il parser |
report_format |
additional.fields[].value.string_value |
Mappato direttamente dal campo report_format nel log non elaborato, convertito in una stringa. |
resource |
additional.fields[].key |
Il valore "resource" viene assegnato tramite il parser |
resource |
additional.fields[].value.string_value |
Mappato direttamente dal campo resource nel log non elaborato. |
result |
security_result.action_details |
Mappato direttamente dal campo RESULT nel log non elaborato. |
security_id |
additional.fields[].key |
Il valore "security_id" viene assegnato tramite il parser |
security_id |
additional.fields[].value.string_value |
Mappato direttamente dal campo security_id nel log non elaborato. |
severity |
security_result.severity |
Mappato dal campo severity nel log non elaborato. INFO è mappato a INFORMATIONAL , WARN è mappato a MEDIUM . |
srvID |
additional.fields[].key |
Il valore "srvID" viene assegnato tramite il parser |
srvID |
additional.fields[].value.string_value |
Mappato direttamente dal campo srvID nel log non elaborato. |
status |
additional.fields[].key |
Il valore "status" viene assegnato tramite il parser |
status |
additional.fields[].value.string_value |
Mappato direttamente dal campo status nel log non elaborato. |
summary |
security_result.summary |
Mappato direttamente dal campo summary nel log non elaborato. |
target_app |
target.application |
Mappato direttamente dal campo TARGET nel log non elaborato. |
target_ip |
target.ip |
Mappato direttamente dal campo target_ip o IP nel log non elaborato. |
target_port |
target.port |
Mappato direttamente dal campo target_port nel log non elaborato, convertito in un numero intero. |
timestamp |
metadata.event_timestamp |
Mappato direttamente dal campo timestamp nel log non elaborato, analizzato come timestamp ISO8601. |
timestamp |
event.timestamp |
Mappato direttamente dal campo timestamp nel log non elaborato, analizzato come timestamp ISO8601. |
trade_date |
additional.fields[].key |
Il valore "trade_date" viene assegnato tramite il parser |
trade_date |
additional.fields[].value.string_value |
Mappato direttamente dal campo trade_date nel log non elaborato. |
transaction_uuid |
additional.fields[].key |
Il valore "transaction_uuid" viene assegnato tramite il parser |
transaction_uuid |
additional.fields[].value.string_value |
Mappato direttamente dal campo transaction_uuid nel log non elaborato. |
type |
additional.fields[].key |
Il valore "type" viene assegnato tramite il parser |
type |
additional.fields[].value.string_value |
Mappato direttamente dal campo type nel log non elaborato. |
user |
target.user.userid |
Mappato direttamente dal campo USER o ugi nel log non elaborato. |
Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.