Recolha registos do Akamai Cloud Monitor

Compatível com:

Este documento explica como ingerir registos do Akamai Cloud Monitor (equilibrador de carga, modelador de tráfego, ADC) no Google Security Operations através do AWS S3. A Akamai envia eventos JSON para o seu ponto final HTTPS; um recetor API Gateway + Lambda escreve os eventos para o S3 (JSONL, gz). O analisador transforma os registos JSON em UDM. Extrai campos do payload JSON, faz conversões de tipos de dados, muda o nome dos campos para corresponderem ao esquema da UDM e processa a lógica específica para campos personalizados e construção de URLs. Também incorpora o processamento de erros e a lógica condicional com base na presença de campos.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Instância do Google SecOps
  • Acesso privilegiado ao Akamai Control Center e ao Property Manager
  • Acesso privilegiado à AWS*(S3, IAM, Lambda, API Gateway)

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, akamai-cloud-monitor).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3 (Lambda)

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > JSON e cole a política abaixo.
  2. Política JSON (substitua akamai-cloud-monitor pelo nome do seu contentor do S3):

    {
    "Version": "2012-10-17",
    "Statement": [
        {
        "Sid": "AllowPutAkamaiObjects",
        "Effect": "Allow",
        "Action": ["s3:PutObject"],
        "Resource": "arn:aws:s3:::akamai-cloud-monitor/*"
        }
    ]
    }
    
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política JSON.

  6. Dê o nome WriteAkamaiCMToS3Role à função e clique em Criar função.

Crie a função Lambda

Definição Valor
Nome akamai_cloud_monitor_to_s3
Runtime Python 3.13
Arquitetura x86_64
Função de execução WriteAkamaiCMToS3Role
  1. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (akamai_cloud_monitor_to_s3.py):

    #!/usr/bin/env python3
    # Lambda: Receive Akamai Cloud Monitor POST, write JSONL (gz) to S3
    
    import os, json, gzip, io, uuid, base64, datetime as dt
    import boto3
    
    S3_BUCKET  = os.environ["S3_BUCKET_NAME"]
    S3_PREFIX  = os.environ.get("S3_PREFIX", "akamai/cloud-monitor/json/").strip("/") + "/"
    INGEST_TOKEN = os.environ.get("INGEST_TOKEN")  # optional shared secret in URL query (?token=...)
    
    s3 = boto3.client("s3")
    
    def _write_jsonl_gz(objs: list) -> str:
        key = f"{dt.datetime.utcnow():%Y/%m/%d}/akamai-cloud-monitor-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for o in objs:
                gz.write((json.dumps(o, separators=(",", ":")) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(
            buf,
            S3_BUCKET,
            f"{S3_PREFIX}{key}",
            ExtraArgs={
                "ContentType": "application/json",
                "ContentEncoding": "gzip",
            },
        )
        return f"s3://{S3_BUCKET}/{S3_PREFIX}{key}"
    
    def _parse_records_from_event(event) -> list:
        # HTTP API (Lambda proxy) event: body is a JSON string
        body = event.get("body") or ""
        if event.get("isBase64Encoded"):
            body = base64.b64decode(body).decode("utf-8", "replace")
        try:
            data = json.loads(body)
        except Exception:
            # accept line-delimited JSON as pass-through
            try:
                return [json.loads(line) for line in body.splitlines() if line.strip()]
            except Exception:
                return []
        if isinstance(data, list):
            return data
        if isinstance(data, dict):
            return [data]
        return []
    
    def lambda_handler(event, context=None):
        # Optional shared-secret verification via query parameter (?token=...)
        if INGEST_TOKEN:
            qs = event.get("queryStringParameters") or {}
            token = qs.get("token")
            if token != INGEST_TOKEN:
                return {"statusCode": 403, "body": "forbidden"}
    
        records = _parse_records_from_event(event)
        if not records:
            return {"statusCode": 204, "body": "no content"}
    
        key = _write_jsonl_gz(records)
        return {
            "statusCode": 200,
            "headers": {"Content-Type": "application/json"},
            "body": json.dumps({"ok": True, "s3_key": key, "count": len(records)}),
        }
    
  2. Aceda a Configuração > Variáveis de ambiente > Editar.

  3. Clique em Adicionar nova variável de ambiente e defina os seguintes valores:

    Variáveis de ambiente

    Chave Exemplo
    S3_BUCKET_NAME akamai-cloud-monitor
    S3_PREFIX akamai/cloud-monitor/json/
    INGEST_TOKEN random-shared-secret
  4. Aceda a Configuração > Configuração geral.

  5. Clique em Editar e defina Limite de tempo para 5 minutos (300 segundos).

  6. Clique em Guardar.

Crie um Amazon API Gateway (ponto final HTTPS para a Akamai)

  1. Na consola da AWS, aceda a API Gateway > Criar API.
  2. Selecione API HTTP > Criar.
  3. Indique os seguintes detalhes de configuração:
    • Integrações: escolha Lambda e selecione akamai_cloud_monitor_to_s3.
    • Routes: adicione ANY /{proxy+} ou crie uma rota específica (por exemplo, POST /akamai/cloud-monitor).
    • Fases: crie ou use $default.
  4. Implemente a API e copie o URL de invocação (por exemplo, https://abc123.execute-api.<region>.amazonaws.com).

Configure o Akamai Cloud Monitor para enviar registos

  1. No Akamai Control Center, abra a sua propriedade no Property Manager.
  2. Clique em Adicionar regra > escolha Gestão na nuvem.
  3. Adicione a instrumentação do Cloud Monitor e selecione os conjuntos de dados necessários.
  4. Adicione a Entrega de dados do Cloud Monitor.
    • Nome do anfitrião de fornecimento: introduza o URL de invocação da API Gateway (por exemplo, abc123.execute-api.<region>.amazonaws.com).
    • Caminho do URL de fornecimento: o seu caminho mais um token de consulta opcional, por exemplo: /akamai/cloud-monitor?token=<INGEST_TOKEN>.
  5. Guarde e ative a versão da propriedade.

Configure um feed no Google SecOps para carregar o Akamai Cloud Monitor (JSON do S3)

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Akamai Cloud Monitor — S3).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Akamai Cloud Monitor como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://akamai-cloud-monitor/akamai/cloud-monitor/json/
    • Opções de eliminação da origem: se pretende eliminar ficheiros e/ou diretórios após a transferência.
    • Idade máxima do ficheiro: inclui ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: uma chave de acesso alfanumérica de 20 carateres (por exemplo, AKIAIOSFODNN7EXAMPLE).
    • Chave de acesso secreta: uma chave de acesso secreta alfanumérica de 40 carateres (por exemplo, wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY).
    • Espaço de nomes de recursos: akamai.cloud_monitor
    • Etiquetas de carregamento: as etiquetas são adicionadas a todos os eventos deste feed (por exemplo, source=akamai_cloud_monitor, format=json).
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento de UDM Lógica
accLang network.http.user_agent Mapeado diretamente se não for "-" ou uma string vazia.
city principal.location.city Mapeado diretamente se não for "-" ou uma string vazia.
cliIP principal.ip Mapeado diretamente se não for uma string vazia.
country principal.location.country_or_region Mapeado diretamente se não for "-" ou uma string vazia.
cp additional.fields Mapeado como um par de chave-valor com a chave "cp".
customField about.ip, about.labels, src.ip Analisados como pares de chave-valor. Processamento especial de "eIp" e "pIp" para mapear respetivamente para src.ip e about.ip. Outras chaves são mapeadas como etiquetas em about.
errorCode security_result.summary, security_result.severity Se estiver presente, define security_result.severity como "ERROR" e mapeia o valor para security_result.summary.
geo.city principal.location.city Mapeado diretamente se city for "-" ou uma string vazia.
geo.country principal.location.country_or_region Mapeado diretamente se country for "-" ou uma string vazia.
geo.lat principal.location.region_latitude Mapeado diretamente, convertido em flutuante.
geo.long principal.location.region_longitude Mapeado diretamente, convertido em flutuante.
geo.region principal.location.state Mapeados diretamente.
id metadata.product_log_id Mapeado diretamente se não for uma string vazia.
message.cliIP principal.ip Mapeado diretamente se cliIP for uma string vazia.
message.fwdHost principal.hostname Mapeados diretamente.
message.reqHost target.hostname, target.url Usado para criar target.url e extrair target.hostname.
message.reqLen network.sent_bytes Mapeado diretamente, convertido em número inteiro não assinado se totalBytes estiver vazio ou for "-".
message.reqMethod network.http.method Mapeado diretamente se reqMethod for uma string vazia.
message.reqPath target.url Anexado a target.url.
message.reqPort target.port Mapeado diretamente, convertido em número inteiro se reqPort for uma string vazia.
message.respLen network.received_bytes Mapeado diretamente, convertido em número inteiro não assinado.
message.sslVer network.tls.version Mapeados diretamente.
message.status network.http.response_code Mapeado diretamente, convertido em número inteiro se statusCode estiver vazio ou for "-".
message.UA network.http.user_agent Mapeado diretamente se UA for "-" ou uma string vazia.
network.asnum additional.fields Mapeado como um par de chave-valor com a chave "asnum".
network.edgeIP intermediary.ip Mapeados diretamente.
network.network additional.fields Mapeado como um par de chave-valor com a chave "network".
network.networkType additional.fields Mapeado como um par de chave-valor com a chave "networkType".
proto network.application_protocol Usado para determinar network.application_protocol.
queryStr target.url Anexado a target.url se não for "-" ou uma string vazia.
referer network.http.referral_url, about.hostname Mapeado diretamente se não for "-". O nome do anfitrião extraído é mapeado para about.hostname.
reqHost target.hostname, target.url Usado para criar target.url e extrair target.hostname.
reqId metadata.product_log_id, network.session_id Mapeado diretamente se id for uma string vazia. Também mapeado para network.session_id.
reqMethod network.http.method Mapeado diretamente se não for uma string vazia.
reqPath target.url Anexado a target.url se não for "-".
reqPort target.port Mapeado diretamente, convertido em número inteiro.
reqTimeSec metadata.event_timestamp, timestamp Usado para definir a data/hora do evento.
start metadata.event_timestamp, timestamp Usado para definir a data/hora do evento se reqTimeSec for uma string vazia.
statusCode network.http.response_code Mapeado diretamente, convertido em número inteiro se não for "-" ou uma string vazia.
tlsVersion network.tls.version Mapeados diretamente.
totalBytes network.sent_bytes Mapeado diretamente, convertido em número inteiro não assinado se não estiver vazio ou for "-".
type metadata.product_event_type Mapeados diretamente.
UA network.http.user_agent Mapeado diretamente se não for "-" ou uma string vazia.
version metadata.product_version Mapeados diretamente.
xForwardedFor principal.ip Mapeado diretamente se não for "-" ou uma string vazia.
(Lógica do analisador) metadata.vendor_name Definido como "Akamai".
(Lógica do analisador) metadata.product_name Definido como "DataStream".
(Lógica do analisador) metadata.event_type Definido como "NETWORK_HTTP".
(Lógica do analisador) metadata.product_version Definido como "2" se version for uma string vazia.
(Lógica do analisador) metadata.log_type Definido como "AKAMAI_CLOUD_MONITOR".
(Lógica do analisador) network.application_protocol Determinado a partir de proto ou message.proto. Definido como "HTTPS" se qualquer um contiver "HTTPS" (sem distinção entre maiúsculas e minúsculas) ou "HTTP" caso contrário.
(Lógica do analisador) security_result.severity Definido como "INFORMATIONAL" se errorCode for "-" ou uma string vazia.
(Lógica do analisador) target.url Construído a partir de protocol, reqHost (ou message.reqHost), reqPath (ou message.reqPath) e queryStr.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.