Coletar registros do AWS Elastic MapReduce
Este documento explica como transferir os registros do AWS Elastic MapReduce (EMR) para o Google Security Operations. O AWS EMR é uma plataforma de Big Data nativa da nuvem que processa grandes quantidades de dados rapidamente. A integração dos registros de EMR ao Google SecOps permite analisar a atividade do cluster e detectar possíveis ameaças à segurança.
Antes de começar
- Verifique se você tem uma instância do Google SecOps.
- Verifique se você tem acesso privilegiado à AWS.
Configurar o bucket do Amazon S3
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Criar um bucket.
- Salve o Nome e a Região do bucket para uso futuro.
- Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o Usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso futuro.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione as políticas AmazonS3FullAccess e CloudWatchLogsFullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar o AWS EMR para encaminhar registros
- Faça login no Console de Gerenciamento da AWS.
- Na barra de pesquisa, digite EMR e selecione Amazon EMR na lista de serviços.
- Clique em Clusters.
- Encontre e selecione o cluster do EMR em que você quer ativar o registro.
- Clique em Editar na página Detalhes do cluster.
- Na tela Editar cluster, acesse a seção Registro.
- Selecione Ativar a geração de registros.
- Especifique o bucket do S3 em que os registros serão armazenados.
- Especifique o URI do S3 no formato
s3://your-bucket-name/
. Isso vai armazenar todos os registros do EMR na raiz do bucket. - Selecione os seguintes tipos de registro:
Step logs
Application logs
YARN logs
System logs
HDFS Logs
(se você estiver usando o Hadoop)
- Clique em Salvar.
Configurar um feed no Google SecOps para ingerir registros do AWS EMR
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo.
- No campo Nome do feed, insira um nome para o feed (por exemplo, AWS EMR Logs).
- Selecione Amazon S3 como o Tipo de origem.
- Selecione AWS EMR como o Tipo de registro.
- Clique em Próxima.
Especifique valores para os seguintes parâmetros de entrada:
- Região: a região em que o bucket do Amazon S3 está localizado.
- URI do S3: o URI do bucket.
s3://your-log-bucket-name/
- Substitua
your-log-bucket-name
pelo nome real do bucket.
- Substitua
- O URI é: selecione Diretório ou Diretório que inclui subdiretórios.
Opções de exclusão da origem: selecione a opção de exclusão de acordo com sua preferência.
ID da chave de acesso: a chave de acesso do usuário com acesso ao bucket do S3.
Chave de acesso secreta: a chave secreta do usuário com acesso ao bucket do S3.
Namespace de recursos: o namespace de recursos.
Rótulos de ingestão: o rótulo a ser aplicado aos eventos desse feed.
Clique em Próxima.
Revise a configuração do novo feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
app_id |
additional.fields[].key |
O valor "APP" é atribuído pelo analisador |
app_id |
additional.fields[].value.string_value |
Mapeado diretamente do campo APP no registro bruto. |
app_name |
additional.fields[].key |
O valor "APPNAME" é atribuído pelo analisador |
app_name |
additional.fields[].value.string_value |
Mapeado diretamente do campo APPNAME no registro bruto. |
blockid |
additional.fields[].key |
O valor "blockid" é atribuído pelo analisador |
blockid |
additional.fields[].value.string_value |
Mapeado diretamente do campo blockid no registro bruto. |
bytes |
network.received_bytes |
Mapeado diretamente do campo bytes no registro bruto, convertido em um número inteiro não assinado. |
cliID |
additional.fields[].key |
O valor "cliID" é atribuído pelo analisador |
cliID |
additional.fields[].value.string_value |
Mapeado diretamente do campo cliID no registro bruto. |
cmd |
target.process.command_line |
Mapeado diretamente do campo cmd no registro bruto. |
comp_name |
additional.fields[].key |
O valor "COMP" é atribuído pelo analisador |
comp_name |
additional.fields[].value.string_value |
Mapeado diretamente do campo COMP no registro bruto. |
configuration_version |
additional.fields[].key |
O valor "configuration_version" é atribuído pelo analisador |
configuration_version |
additional.fields[].value.string_value |
Mapeado diretamente do campo configuration_version no registro bruto, convertido em uma string. |
containerID |
additional.fields[].key |
O valor "containerID" é atribuído pelo analisador |
containerID |
additional.fields[].value.string_value |
Mapeado diretamente do campo CONTAINERID no registro bruto. |
description |
security_result.description |
Mapeado diretamente do campo description no registro bruto. |
dfs.FSNamesystem.* |
additional.fields[].key |
A chave é gerada concatenando "dfs.FSNamesystem." com a chave dos dados JSON. |
dfs.FSNamesystem.* |
additional.fields[].value.string_value |
O valor é mapeado diretamente do valor correspondente no objeto JSON dfs.FSNamesystem , convertido em uma string. |
duration |
additional.fields[].key |
O valor "duration" é atribuído pelo analisador |
duration |
additional.fields[].value.string_value |
Mapeado diretamente do campo duration no registro bruto. |
duration |
network.session_duration.seconds |
Mapeado diretamente do campo duration no registro bruto, convertido em um número inteiro. |
environment |
additional.fields[].key |
O valor "ambiente" é atribuído pelo analisador |
environment |
additional.fields[].value.string_value |
Mapeado diretamente do campo environment no registro bruto. Extraídos do campo ip_port usando grok e manipulação de strings. Extraídos do campo ip_port usando grok e manipulação de strings, convertidos em um número inteiro. |
event_type |
metadata.event_type |
Determinado pela lógica do analisador com base na presença de informações principal e target . Pode ser NETWORK_CONNECTION , USER_RESOURCE_ACCESS , STATUS_UPDATE ou GENERIC_EVENT . |
file_path |
target.file.full_path |
Mapeado diretamente do campo file_path no registro bruto. |
host |
principal.hostname |
Mapeado diretamente do campo host no registro bruto. |
host |
target.hostname |
Mapeado diretamente do campo host no registro bruto. |
host_ip |
principal.ip |
Mapeado diretamente do campo host_ip no registro bruto. |
host_port |
principal.port |
Mapeado diretamente do campo host_port no registro bruto, convertido em um número inteiro. |
http_url |
target.url |
Mapeado diretamente do campo http_url no registro bruto. |
index |
additional.fields[].key |
O valor "index" é atribuído pelo analisador |
index |
additional.fields[].value.string_value |
Mapeado diretamente do campo index no registro bruto. |
kind |
metadata.product_event_type |
Mapeado diretamente do campo kind no registro bruto. O valor "AWS_EMR" é atribuído pelo analisador O valor "AWS EMR" é atribuído pelo analisador O valor "AMAZON" é atribuído pelo analisador |
offset |
additional.fields[].key |
O valor "offset" é atribuído pelo analisador |
offset |
additional.fields[].value.string_value |
Mapeado diretamente do campo offset no registro bruto. |
op |
metadata.product_event_type |
Mapeado diretamente do campo op ou OPERATION no registro bruto. |
proto |
network.application_protocol |
Extraídos do campo http_url usando grok, convertidos em letras maiúsculas. |
puppet_version |
additional.fields[].key |
O valor "puppet_version" é atribuído pelo analisador |
puppet_version |
additional.fields[].value.string_value |
Mapeado diretamente do campo puppet_version no registro bruto. |
queue_name |
additional.fields[].key |
O valor "queue_name" é atribuído pelo analisador |
queue_name |
additional.fields[].value.string_value |
Mapeado diretamente do campo queue_name no registro bruto. |
report_format |
additional.fields[].key |
O valor "report_format" é atribuído pelo analisador |
report_format |
additional.fields[].value.string_value |
Mapeado diretamente do campo report_format no registro bruto, convertido em uma string. |
resource |
additional.fields[].key |
O valor "resource" é atribuído pelo analisador |
resource |
additional.fields[].value.string_value |
Mapeado diretamente do campo resource no registro bruto. |
result |
security_result.action_details |
Mapeado diretamente do campo RESULT no registro bruto. |
security_id |
additional.fields[].key |
O valor "security_id" é atribuído pelo analisador |
security_id |
additional.fields[].value.string_value |
Mapeado diretamente do campo security_id no registro bruto. |
severity |
security_result.severity |
Mapeado do campo severity no registro bruto. INFO é mapeado para INFORMATIONAL , WARN é mapeado para MEDIUM . |
srvID |
additional.fields[].key |
O valor "srvID" é atribuído pelo analisador |
srvID |
additional.fields[].value.string_value |
Mapeado diretamente do campo srvID no registro bruto. |
status |
additional.fields[].key |
O valor "status" é atribuído pelo analisador |
status |
additional.fields[].value.string_value |
Mapeado diretamente do campo status no registro bruto. |
summary |
security_result.summary |
Mapeado diretamente do campo summary no registro bruto. |
target_app |
target.application |
Mapeado diretamente do campo TARGET no registro bruto. |
target_ip |
target.ip |
Mapeado diretamente do campo target_ip ou IP no registro bruto. |
target_port |
target.port |
Mapeado diretamente do campo target_port no registro bruto, convertido em um número inteiro. |
timestamp |
metadata.event_timestamp |
Mapeado diretamente do campo timestamp no registro bruto, analisado como um carimbo de data/hora ISO8601. |
timestamp |
event.timestamp |
Mapeado diretamente do campo timestamp no registro bruto, analisado como um carimbo de data/hora ISO8601. |
trade_date |
additional.fields[].key |
O valor "trade_date" é atribuído pelo analisador |
trade_date |
additional.fields[].value.string_value |
Mapeado diretamente do campo trade_date no registro bruto. |
transaction_uuid |
additional.fields[].key |
O valor "transaction_uuid" é atribuído pelo analisador |
transaction_uuid |
additional.fields[].value.string_value |
Mapeado diretamente do campo transaction_uuid no registro bruto. |
type |
additional.fields[].key |
O valor "type" é atribuído pelo analisador |
type |
additional.fields[].value.string_value |
Mapeado diretamente do campo type no registro bruto. |
user |
target.user.userid |
Mapeado diretamente do campo USER ou ugi no registro bruto. |
Alterações
2023-12-19
- Correção de bug: correção dos resultados instáveis para o padrão Grok.
2023-10-30
- Parser recém-criado.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.