Raccogliere i log di AWS Elastic MapReduce

Supportato in:

Questo documento spiega come importare i log di AWS Elastic MapReduce (EMR) in Google Security Operations. AWS EMR è una piattaforma big data cloud-native che elabora rapidamente grandi quantità di dati. L'integrazione dei log di EMR in Google SecOps ti consente di analizzare l'attività del cluster e rilevare potenziali minacce alla sicurezza.

Prima di iniziare

  • Assicurati di avere un'istanza Google SecOps.
  • Assicurati di disporre dell'accesso con privilegi ad AWS.

Configura il bucket Amazon S3

  1. Crea un bucket Amazon S3 seguendo questa guida dell'utente: Creare un bucket
  2. Salva Nome e Regione del bucket per utilizzarli in un secondo momento.
  3. Crea un utente seguendo questa guida dell'utente: Creare un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi un tag di descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per utilizzarle in un secondo momento.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri di autorizzazione.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Collega direttamente i criteri.
  17. Cerca e seleziona i criteri AmazonS3FullAccess e CloudWatchLogsFullAccess.
  18. Fai clic su Avanti.
  19. Fai clic su Aggiungi autorizzazioni.

Configurare AWS EMR per inoltrare i log

  1. Accedi alla AWS Management Console.
  2. Nella barra di ricerca, digita EMR e seleziona Amazon EMR dall'elenco dei servizi.
  3. Fai clic su Cluster.
  4. Individua e seleziona il cluster EMR per cui vuoi attivare il logging.
  5. Fai clic su Modifica nella pagina Dettagli cluster.
  6. Nella schermata Modifica cluster, vai alla sezione Logging.
  7. Seleziona Attiva il logging.
  8. Specifica il bucket S3 in cui verranno archiviati i log.
  9. Specifica l'URI S3 nel formato s3://your-bucket-name/ (in questo modo tutti i log di EMR verranno archiviati nella radice del bucket).
  10. Seleziona i seguenti tipi di log:
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs (se utilizzi Hadoop)
  11. Fai clic su Salva.

Configura un feed in Google SecOps per importare i log di AWS EMR

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su Aggiungi nuovo.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio Log AWS EMR).
  4. Seleziona Amazon S3 come Tipo di origine.
  5. Seleziona AWS EMR come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:

    • Regione: la regione in cui si trova il bucket Amazon S3.
    • URI S3: l'URI del bucket.
      • s3://your-log-bucket-name/
        • Sostituisci your-log-bucket-name con il nome effettivo del bucket.
    • L'URI è una: seleziona Directory o Directory che include sottodirectory.
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.

    • ID chiave di accesso: la chiave di accesso utente con accesso al bucket S3.

    • Chiave di accesso segreta: la chiave segreta dell'utente con accesso al bucket S3.

    • Spazio dei nomi degli asset: lo spazio dei nomi degli asset.

    • Etichette di importazione: l'etichetta da applicare agli eventi di questo feed.

  8. Fai clic su Avanti.

  9. Rivedi la configurazione del nuovo feed nella schermata Concludi e poi fai clic su Invia.

Tabella di mappatura UDM

Campo log Mappatura UDM Logica
app_id additional.fields[].key Il valore "APP" viene assegnato tramite il parser
app_id additional.fields[].value.string_value Mappato direttamente dal campo APP nel log non elaborato.
app_name additional.fields[].key Il valore "APPNAME" viene assegnato tramite il parser
app_name additional.fields[].value.string_value Mappato direttamente dal campo APPNAME nel log non elaborato.
blockid additional.fields[].key Il valore "blockid" viene assegnato tramite il parser
blockid additional.fields[].value.string_value Mappato direttamente dal campo blockid nel log non elaborato.
bytes network.received_bytes Mappato direttamente dal campo bytes nel log non elaborato, convertito in un numero intero non firmato.
cliID additional.fields[].key Il valore "cliID" viene assegnato tramite il parser
cliID additional.fields[].value.string_value Mappato direttamente dal campo cliID nel log non elaborato.
cmd target.process.command_line Mappato direttamente dal campo cmd nel log non elaborato.
comp_name additional.fields[].key Il valore "COMP" viene assegnato tramite il parser
comp_name additional.fields[].value.string_value Mappato direttamente dal campo COMP nel log non elaborato.
configuration_version additional.fields[].key Il valore "configuration_version" viene assegnato tramite il parser
configuration_version additional.fields[].value.string_value Mappato direttamente dal campo configuration_version nel log non elaborato, convertito in una stringa.
containerID additional.fields[].key Il valore "containerID" viene assegnato tramite il parser
containerID additional.fields[].value.string_value Mappato direttamente dal campo CONTAINERID nel log non elaborato.
description security_result.description Mappato direttamente dal campo description nel log non elaborato.
dfs.FSNamesystem.* additional.fields[].key La chiave viene generata concatenando "dfs.FSNamesystem." con la chiave dei dati JSON.
dfs.FSNamesystem.* additional.fields[].value.string_value Il valore viene mappato direttamente dal valore corrispondente nell'oggetto JSON dfs.FSNamesystem, convertito in una stringa.
duration additional.fields[].key Il valore "duration" viene assegnato tramite il parser
duration additional.fields[].value.string_value Mappato direttamente dal campo duration nel log non elaborato.
duration network.session_duration.seconds Mappato direttamente dal campo duration nel log non elaborato, convertito in un numero intero.
environment additional.fields[].key Il valore "environment" viene assegnato tramite il parser
environment additional.fields[].value.string_value Mappato direttamente dal campo environment nel log non elaborato. Estratte dal campo ip_port utilizzando grok e la manipolazione di stringhe. Estratto dal campo ip_port utilizzando grok e la manipolazione di stringhe, convertito in un numero intero.
event_type metadata.event_type Determinato dalla logica dell'analizzatore in base alla presenza di informazioni principal e target. Può essere NETWORK_CONNECTION, USER_RESOURCE_ACCESS, STATUS_UPDATE o GENERIC_EVENT.
file_path target.file.full_path Mappato direttamente dal campo file_path nel log non elaborato.
host principal.hostname Mappato direttamente dal campo host nel log non elaborato.
host target.hostname Mappato direttamente dal campo host nel log non elaborato.
host_ip principal.ip Mappato direttamente dal campo host_ip nel log non elaborato.
host_port principal.port Mappato direttamente dal campo host_port nel log non elaborato, convertito in un numero intero.
http_url target.url Mappato direttamente dal campo http_url nel log non elaborato.
index additional.fields[].key Il valore "index" viene assegnato tramite il parser
index additional.fields[].value.string_value Mappato direttamente dal campo index nel log non elaborato.
kind metadata.product_event_type Mappato direttamente dal campo kind nel log non elaborato. Il valore "AWS_EMR" viene assegnato tramite il parser Il valore "AWS EMR" viene assegnato tramite il parser Il valore "AMAZON" viene assegnato tramite il parser
offset additional.fields[].key Il valore "offset" viene assegnato tramite il parser
offset additional.fields[].value.string_value Mappato direttamente dal campo offset nel log non elaborato.
op metadata.product_event_type Mappato direttamente dal campo op o OPERATION nel log non elaborato.
proto network.application_protocol Estratto dal campo http_url utilizzando grok, convertito in maiuscolo.
puppet_version additional.fields[].key Il valore "puppet_version" viene assegnato tramite il parser
puppet_version additional.fields[].value.string_value Mappato direttamente dal campo puppet_version nel log non elaborato.
queue_name additional.fields[].key Il valore "queue_name" viene assegnato tramite il parser
queue_name additional.fields[].value.string_value Mappato direttamente dal campo queue_name nel log non elaborato.
report_format additional.fields[].key Il valore "report_format" viene assegnato tramite il parser
report_format additional.fields[].value.string_value Mappato direttamente dal campo report_format nel log non elaborato, convertito in una stringa.
resource additional.fields[].key Il valore "resource" viene assegnato tramite il parser
resource additional.fields[].value.string_value Mappato direttamente dal campo resource nel log non elaborato.
result security_result.action_details Mappato direttamente dal campo RESULT nel log non elaborato.
security_id additional.fields[].key Il valore "security_id" viene assegnato tramite il parser
security_id additional.fields[].value.string_value Mappato direttamente dal campo security_id nel log non elaborato.
severity security_result.severity Mappato dal campo severity nel log non elaborato. INFO è mappato a INFORMATIONAL, WARN è mappato a MEDIUM.
srvID additional.fields[].key Il valore "srvID" viene assegnato tramite il parser
srvID additional.fields[].value.string_value Mappato direttamente dal campo srvID nel log non elaborato.
status additional.fields[].key Il valore "status" viene assegnato tramite il parser
status additional.fields[].value.string_value Mappato direttamente dal campo status nel log non elaborato.
summary security_result.summary Mappato direttamente dal campo summary nel log non elaborato.
target_app target.application Mappato direttamente dal campo TARGET nel log non elaborato.
target_ip target.ip Mappato direttamente dal campo target_ip o IP nel log non elaborato.
target_port target.port Mappato direttamente dal campo target_port nel log non elaborato, convertito in un numero intero.
timestamp metadata.event_timestamp Mappato direttamente dal campo timestamp nel log non elaborato, analizzato come timestamp ISO8601.
timestamp event.timestamp Mappato direttamente dal campo timestamp nel log non elaborato, analizzato come timestamp ISO8601.
trade_date additional.fields[].key Il valore "trade_date" viene assegnato tramite il parser
trade_date additional.fields[].value.string_value Mappato direttamente dal campo trade_date nel log non elaborato.
transaction_uuid additional.fields[].key Il valore "transaction_uuid" viene assegnato tramite il parser
transaction_uuid additional.fields[].value.string_value Mappato direttamente dal campo transaction_uuid nel log non elaborato.
type additional.fields[].key Il valore "type" viene assegnato tramite il parser
type additional.fields[].value.string_value Mappato direttamente dal campo type nel log non elaborato.
user target.user.userid Mappato direttamente dal campo USER o ugi nel log non elaborato.

Modifiche

2023-12-19

  • Correzione di bug: sono stati corretti i risultati inaffidabili per il pattern Grok.

2023-10-30

  • Parser appena creato.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.