Coletar registros de auditoria da DigiCert
Este documento explica como ingerir registros de auditoria da DigiCert no Google Security Operations usando o Amazon S3.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao DigiCert CertCentral (chave de API com função de administrador)
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Receber a chave de API e o ID do relatório da DigiCert
- No CertCentral, acesse Conta > Chaves de API e crie a chave de API (
X-DC-DEVKEY
). - Em Relatórios > Biblioteca de relatórios, crie um relatório de Registro de auditoria no formato JSON e anote o ID do relatório (UUID).
- Também é possível encontrar o ID de um relatório no histórico.
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
digicert-logs
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- Acesse Console da AWS > IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDigiCertObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::digicert-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json" } ] }
- Substitua
digicert-logs
se você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada.
Nomeie a função como
WriteDigicertToS3Role
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
Informe os seguintes detalhes de configuração:
Configuração Valor Nome digicert_audit_logs_to_s3
Ambiente de execução Python 3.13 Arquitetura x86_64 Função de execução WriteDigicertToS3Role
Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
digicert_audit_logs_to_s3.py
):#!/usr/bin/env python3 import datetime as dt, gzip, io, json, os, time, uuid, zipfile from typing import Any, Dict, Iterable, List, Tuple from urllib import request, parse, error import boto3 from botocore.exceptions import ClientError API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" s3 = boto3.client("s3") def _now() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def _http(method: str, url: str, api_key: str, body: bytes | None = None, timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]: headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT} attempt, backoff = 0, 1.0 while True: req = request.Request(url=url, method=method, headers=headers, data=body) try: with request.urlopen(req, timeout=timeout) as resp: status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()} data = resp.read() if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue return status, h, data except error.HTTPError as e: status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()} if status == 429 and attempt < max_retries: ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff attempt += 1; time.sleep(delay); backoff *= 2; continue if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise except error.URLError: if attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise def start_report_run(api_key: str, report_id: str, timeout: int) -> None: st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout) if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}") def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None, limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC", timeout: int = 30, offset: int = 0) -> Dict[str, Any]: qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction} if status_filter: qs["status"] = status_filter if report_type: qs["report_type"] = report_type st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}") return json.loads(body.decode("utf-8")) def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime, timeout: int, max_wait_seconds: int, poll_interval: int) -> str: deadline = time.time() + max_wait_seconds while time.time() < deadline: hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs", limit=200, timeout=timeout).get("report_history", []) for it in hist: if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue try: rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc) except Exception: rsd = started_not_before if rsd + dt.timedelta(seconds=60) >= started_not_before: return it["report_run_identifier"] time.sleep(poll_interval) raise TimeoutError("READY run not found in time") def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]: st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}") if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK": with zipfile.ZipFile(io.BytesIO(body)) as zf: name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None) if not name: raise RuntimeError("ZIP has no JSON") rows = json.loads(zf.read(name).decode("utf-8")) else: rows = json.loads(body.decode("utf-8")) if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format") return rows def load_state(bucket: str, key: str) -> Dict[str, Any]: try: return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")) except ClientError as e: if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {} raise def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None: s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json") def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str: ts = _now().strftime("%Y/%m/%d/%H%M%S") key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: for r in rows: gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8")) s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/x-ndjson", ContentEncoding="gzip") return key def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: api_key = os.environ["DIGICERT_API_KEY"] report_id = os.environ["DIGICERT_REPORT_ID"] bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/") state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json") max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300")) poll_int = int(os.environ.get("POLL_INTERVAL", "10")) timeout = int(os.environ.get("REQUEST_TIMEOUT", "30")) state = load_state(bucket, state_key) if state_key else {} last_run = state.get("last_run_id") started = _now() start_report_run(api_key, report_id, timeout) run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int) if last_run and last_run == run_id: return {"status":"skip", "report_run_identifier": run_id} rows = get_json_rows(api_key, report_id, run_id, timeout) key = write_ndjson_gz(bucket, prefix, rows, run_id) if state_key: save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(), "last_s3_key": key, "rows_count": len(rows)}) return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id, "rows": len(rows), "s3_key": key}
Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente, substituindo pelos seus valores:
Chave Exemplo S3_BUCKET
digicert-logs
S3_PREFIX
digicert/logs/
STATE_KEY
digicert/logs/state.json
DIGICERT_API_KEY
xxxxxxxxxxxxxxxxxxxxxxxx
DIGICERT_REPORT_ID
88de5e19-ec57-4d70-865d-df953b062574
REQUEST_TIMEOUT
30
POLL_INTERVAL
10
MAX_WAIT_SECONDS
300
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 15 minutos (900 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
). - Destino: sua função Lambda.
- Nome:
digicert-audit-1h
.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários e clique em Adicionar usuários.
- Informe os seguintes detalhes de configuração:
- Usuário: insira um nome exclusivo (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso - Acesso programático.
- Clique em Criar usuário.
- Usuário: insira um nome exclusivo (por exemplo,
- Anexe a política de leitura mínima (personalizada): Usuários > selecione
secops-reader
> Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Name =
secops-reader-policy
.Clique em Criar política > pesquisar/selecionar > Próxima > Adicionar permissões.
Crie uma chave de acesso para
secops-reader
: Credenciais de segurança > Chaves de acesso > Criar chave de acesso Criar chave de acesso**.Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros da DigiCert
- Acesse Configurações do SIEM > Feeds.
- Clique em Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
DigiCert Audit Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Digicert como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://digicert-logs/digicert/logs/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: padrão de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo a ser aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.