Recolha registos do Akamai Cloud Monitor
Este documento explica como ingerir registos do Akamai Cloud Monitor (equilibrador de carga, modelador de tráfego, ADC) no Google Security Operations através do AWS S3. A Akamai envia eventos JSON para o seu ponto final HTTPS; um recetor API Gateway + Lambda escreve os eventos para o S3 (JSONL, gz). O analisador transforma os registos JSON em UDM. Extrai campos do payload JSON, faz conversões de tipos de dados, muda o nome dos campos para corresponderem ao esquema da UDM e processa a lógica específica para campos personalizados e construção de URLs. Também incorpora o processamento de erros e a lógica condicional com base na presença de campos.
Antes de começar
Certifique-se de que tem os seguintes pré-requisitos:
- Instância do Google SecOps
- Acesso privilegiado ao Akamai Control Center e ao Property Manager
- Acesso privilegiado à AWS*(S3, IAM, Lambda, API Gateway)
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
akamai-cloud-monitor
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3 (Lambda)
- Na consola da AWS, aceda a IAM > Políticas > Criar política > JSON e cole a política abaixo.
Política JSON (substitua
akamai-cloud-monitor
pelo nome do seu contentor do S3):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutAkamaiObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::akamai-cloud-monitor/*" } ] }
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política JSON.
Dê o nome
WriteAkamaiCMToS3Role
à função e clique em Criar função.
Crie a função Lambda
Definição | Valor |
---|---|
Nome | akamai_cloud_monitor_to_s3 |
Runtime | Python 3.13 |
Arquitetura | x86_64 |
Função de execução | WriteAkamaiCMToS3Role |
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
akamai_cloud_monitor_to_s3.py
):#!/usr/bin/env python3 # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3 import os, json, gzip, io, uuid, base64, datetime as dt import boto3 S3_BUCKET = os.environ["S3_BUCKET_NAME"] S3_PREFIX = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/" INGEST_TOKEN = os.environ.get("INGEST_TOKEN") # optional shared secret in URL query (?token=...) s3 = boto3.client("s3") def _write_jsonl_gz(objs: list) -> str: key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="w") as gz: for o in objs: gz.write((json.dumps(o, separators=(",", ":")) + "n").encode()) buf.seek(0) s3.upload_fileobj( buf, S3_BUCKET, f"{S3_PREFIX}{key}", ExtraArgs={ "ContentType": "application/json", "ContentEncoding": "gzip", }, ) return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}" def _parse_records_from_event(event) -> list: # HTTP API (Lambda proxy) event: body is a JSON string body = event.get("body") or "" if event.get("isBase64Encoded"): body = base64.b64decode(body).decode("utf-8", "replace") try: data = json.loads(body) except Exception: # accept line-delimited JSON as pass-through try: return [json.loads(line) for line in body.splitlines() if line.strip()] except Exception: return [] if isinstance(data, list): return data if isinstance(data, dict): return [data] return [] def lambda_handler(event, context=None): # Optional shared-secret verification via query parameter (?token=...) if INGEST_TOKEN: qs = event.get("queryStringParameters") or {} token = qs.get("token") if token != INGEST_TOKEN: return {"statusCode": 403, "body": "forbidden"} records = _parse_records_from_event(event) if not records: return {"statusCode": 204, "body": "no content"} key = _write_jsonl_gz(records) return { "statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}), }
Aceda a Configuração > Variáveis de ambiente > Editar.
Clique em Adicionar nova variável de ambiente e defina os seguintes valores:
Variáveis de ambiente
Chave Exemplo S3_BUCKET_NAME
akamai-cloud-monitor
S3_PREFIX
akamai/cloud-monitor/json/
INGEST_TOKEN
random-shared-secret
Aceda a Configuração > Configuração geral.
Clique em Editar e defina Limite de tempo para 5 minutos (300 segundos).
Clique em Guardar.
Crie um Amazon API Gateway (ponto final HTTPS para a Akamai)
- Na consola da AWS, aceda a API Gateway > Criar API.
- Selecione API HTTP > Criar.
- Indique os seguintes detalhes de configuração:
- Integrações: escolha Lambda e selecione
akamai_cloud_monitor_to_s3
. - Routes: adicione ANY
/{proxy+}
ou crie uma rota específica (por exemplo, POST/akamai/cloud-monitor
). - Fases: crie ou use $default.
- Integrações: escolha Lambda e selecione
- Implemente a API e copie o URL de invocação (por exemplo,
https://abc123.execute-api.<region>.amazonaws.com
).
Configure o Akamai Cloud Monitor para enviar registos
- No Akamai Control Center, abra a sua propriedade no Property Manager.
- Clique em Adicionar regra > escolha Gestão na nuvem.
- Adicione a instrumentação do Cloud Monitor e selecione os conjuntos de dados necessários.
- Adicione a Entrega de dados do Cloud Monitor.
- Nome do anfitrião de fornecimento: introduza o URL de invocação da API Gateway (por exemplo,
abc123.execute-api.<region>.amazonaws.com
). - Caminho do URL de fornecimento: o seu caminho mais um token de consulta opcional, por exemplo:
/akamai/cloud-monitor?token=<INGEST_TOKEN>
.
- Nome do anfitrião de fornecimento: introduza o URL de invocação da API Gateway (por exemplo,
- Guarde e ative a versão da propriedade.
Configure um feed no Google SecOps para carregar o Akamai Cloud Monitor (JSON do S3)
- Aceda a Definições do SIEM > Feeds.
- Clique em Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Akamai Cloud Monitor — S3
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Akamai Cloud Monitor como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://akamai-cloud-monitor/akamai/cloud-monitor/json/
- Opções de eliminação da origem: se pretende eliminar ficheiros e/ou diretórios após a transferência.
- Idade máxima do ficheiro: inclui ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: uma chave de acesso alfanumérica de 20 carateres (por exemplo, AKIAIOSFODNN7EXAMPLE).
- Chave de acesso secreta: uma chave de acesso secreta alfanumérica de 40 carateres (por exemplo, wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY).
- Espaço de nomes de recursos:
akamai.cloud_monitor
- Etiquetas de carregamento: as etiquetas são adicionadas a todos os eventos deste feed (por exemplo,
source=akamai_cloud_monitor
,format=json
).
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento de UDM | Lógica |
---|---|---|
accLang |
network.http.user_agent |
Mapeado diretamente se não for "-" ou uma string vazia. |
city |
principal.location.city |
Mapeado diretamente se não for "-" ou uma string vazia. |
cliIP |
principal.ip |
Mapeado diretamente se não for uma string vazia. |
country |
principal.location.country_or_region |
Mapeado diretamente se não for "-" ou uma string vazia. |
cp |
additional.fields |
Mapeado como um par de chave-valor com a chave "cp". |
customField |
about.ip , about.labels , src.ip |
Analisados como pares de chave-valor. Processamento especial de "eIp" e "pIp" para mapear respetivamente para src.ip e about.ip . Outras chaves são mapeadas como etiquetas em about . |
errorCode |
security_result.summary , security_result.severity |
Se estiver presente, define security_result.severity como "ERROR" e mapeia o valor para security_result.summary . |
geo.city |
principal.location.city |
Mapeado diretamente se city for "-" ou uma string vazia. |
geo.country |
principal.location.country_or_region |
Mapeado diretamente se country for "-" ou uma string vazia. |
geo.lat |
principal.location.region_latitude |
Mapeado diretamente, convertido em flutuante. |
geo.long |
principal.location.region_longitude |
Mapeado diretamente, convertido em flutuante. |
geo.region |
principal.location.state |
Mapeados diretamente. |
id |
metadata.product_log_id |
Mapeado diretamente se não for uma string vazia. |
message.cliIP |
principal.ip |
Mapeado diretamente se cliIP for uma string vazia. |
message.fwdHost |
principal.hostname |
Mapeados diretamente. |
message.reqHost |
target.hostname , target.url |
Usado para criar target.url e extrair target.hostname . |
message.reqLen |
network.sent_bytes |
Mapeado diretamente, convertido em número inteiro não assinado se totalBytes estiver vazio ou for "-". |
message.reqMethod |
network.http.method |
Mapeado diretamente se reqMethod for uma string vazia. |
message.reqPath |
target.url |
Anexado a target.url . |
message.reqPort |
target.port |
Mapeado diretamente, convertido em número inteiro se reqPort for uma string vazia. |
message.respLen |
network.received_bytes |
Mapeado diretamente, convertido em número inteiro não assinado. |
message.sslVer |
network.tls.version |
Mapeados diretamente. |
message.status |
network.http.response_code |
Mapeado diretamente, convertido em número inteiro se statusCode estiver vazio ou for "-". |
message.UA |
network.http.user_agent |
Mapeado diretamente se UA for "-" ou uma string vazia. |
network.asnum |
additional.fields |
Mapeado como um par de chave-valor com a chave "asnum". |
network.edgeIP |
intermediary.ip |
Mapeados diretamente. |
network.network |
additional.fields |
Mapeado como um par de chave-valor com a chave "network". |
network.networkType |
additional.fields |
Mapeado como um par de chave-valor com a chave "networkType". |
proto |
network.application_protocol |
Usado para determinar network.application_protocol . |
queryStr |
target.url |
Anexado a target.url se não for "-" ou uma string vazia. |
referer |
network.http.referral_url , about.hostname |
Mapeado diretamente se não for "-". O nome do anfitrião extraído é mapeado para about.hostname . |
reqHost |
target.hostname , target.url |
Usado para criar target.url e extrair target.hostname . |
reqId |
metadata.product_log_id , network.session_id |
Mapeado diretamente se id for uma string vazia. Também mapeado para network.session_id . |
reqMethod |
network.http.method |
Mapeado diretamente se não for uma string vazia. |
reqPath |
target.url |
Anexado a target.url se não for "-". |
reqPort |
target.port |
Mapeado diretamente, convertido em número inteiro. |
reqTimeSec |
metadata.event_timestamp , timestamp |
Usado para definir a data/hora do evento. |
start |
metadata.event_timestamp , timestamp |
Usado para definir a data/hora do evento se reqTimeSec for uma string vazia. |
statusCode |
network.http.response_code |
Mapeado diretamente, convertido em número inteiro se não for "-" ou uma string vazia. |
tlsVersion |
network.tls.version |
Mapeados diretamente. |
totalBytes |
network.sent_bytes |
Mapeado diretamente, convertido em número inteiro não assinado se não estiver vazio ou for "-". |
type |
metadata.product_event_type |
Mapeados diretamente. |
UA |
network.http.user_agent |
Mapeado diretamente se não for "-" ou uma string vazia. |
version |
metadata.product_version |
Mapeados diretamente. |
xForwardedFor |
principal.ip |
Mapeado diretamente se não for "-" ou uma string vazia. |
(Lógica do analisador) | metadata.vendor_name |
Definido como "Akamai". |
(Lógica do analisador) | metadata.product_name |
Definido como "DataStream". |
(Lógica do analisador) | metadata.event_type |
Definido como "NETWORK_HTTP". |
(Lógica do analisador) | metadata.product_version |
Definido como "2" se version for uma string vazia. |
(Lógica do analisador) | metadata.log_type |
Definido como "AKAMAI_CLOUD_MONITOR". |
(Lógica do analisador) | network.application_protocol |
Determinado a partir de proto ou message.proto . Definido como "HTTPS" se qualquer um contiver "HTTPS" (sem distinção entre maiúsculas e minúsculas) ou "HTTP" caso contrário. |
(Lógica do analisador) | security_result.severity |
Definido como "INFORMATIONAL" se errorCode for "-" ou uma string vazia. |
(Lógica do analisador) | target.url |
Construído a partir de protocol , reqHost (ou message.reqHost ), reqPath (ou message.reqPath ) e queryStr . |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.