Recolha registos de auditoria da DigiCert
Este documento explica como introduzir registos de auditoria da DigiCert no Google Security Operations através do Amazon S3.
Antes de começar
- Instância do Google SecOps
- Acesso privilegiado ao DigiCert CertCentral (chave da API com função de administrador)
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Obtenha a chave da API e o ID do relatório da DigiCert
- No CertCentral, aceda a Conta > Chaves da API e crie a chave da API (
X-DC-DEVKEY
). - Em Relatórios > Biblioteca de relatórios, crie um relatório de Registo de auditoria com o formato JSON e tome nota do respetivo ID do relatório (UUID).
- Também pode encontrar o ID de um relatório existente através do histórico de relatórios.
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
digicert-logs
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Aceda a AWS console > IAM > Policies > Create policy > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutDigiCertObjects", "Effect": "Allow", "Action": ["s3:PutObject"], "Resource": "arn:aws:s3:::digicert-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::digicert-logs/digicert/logs/state.json" } ] }
- Substitua
digicert-logs
se tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política criada recentemente.
Dê o nome
WriteDigicertToS3Role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
Faculte os seguintes detalhes de configuração:
Definição Valor Nome digicert_audit_logs_to_s3
Runtime Python 3.13 Arquitetura x86_64 Função de execução WriteDigicertToS3Role
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
digicert_audit_logs_to_s3.py
):#!/usr/bin/env python3 import datetime as dt, gzip, io, json, os, time, uuid, zipfile from typing import Any, Dict, Iterable, List, Tuple from urllib import request, parse, error import boto3 from botocore.exceptions import ClientError API_BASE = "https://api.digicert.com/reports/v1" USER_AGENT = "secops-digicert-reports/1.0" s3 = boto3.client("s3") def _now() -> dt.datetime: return dt.datetime.now(dt.timezone.utc) def _http(method: str, url: str, api_key: str, body: bytes | None = None, timeout: int = 30, max_retries: int = 5) -> Tuple[int, Dict[str,str], bytes]: headers = {"X-DC-DEVKEY": api_key, "Content-Type": "application/json", "User-Agent": USER_AGENT} attempt, backoff = 0, 1.0 while True: req = request.Request(url=url, method=method, headers=headers, data=body) try: with request.urlopen(req, timeout=timeout) as resp: status, h = resp.status, {k.lower(): v for k, v in resp.headers.items()} data = resp.read() if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue return status, h, data except error.HTTPError as e: status, h = e.code, {k.lower(): v for k, v in (e.headers or {}).items()} if status == 429 and attempt < max_retries: ra = h.get("retry-after"); delay = float(ra) if ra and ra.isdigit() else backoff attempt += 1; time.sleep(delay); backoff *= 2; continue if 500 <= status <= 599 and attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise except error.URLError: if attempt < max_retries: attempt += 1; time.sleep(backoff); backoff *= 2; continue raise def start_report_run(api_key: str, report_id: str, timeout: int) -> None: st, _, body = _http("POST", f"{API_BASE}/report/{report_id}/run", api_key, b"{}", timeout) if st not in (200, 201): raise RuntimeError(f"Start run failed: {st} {body[:200]!r}") def list_report_history(api_key: str, *, status_filter: str | None = None, report_type: str | None = None, limit: int = 100, sort_by: str = "report_start_date", sort_direction: str = "DESC", timeout: int = 30, offset: int = 0) -> Dict[str, Any]: qs = {"limit": str(limit), "offset": str(offset), "sort_by": sort_by, "sort_direction": sort_direction} if status_filter: qs["status"] = status_filter if report_type: qs["report_type"] = report_type st, _, body = _http("GET", f"{API_BASE}/report/history?{parse.urlencode(qs)}", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"History failed: {st} {body[:200]!r}") return json.loads(body.decode("utf-8")) def find_ready_run(api_key: str, report_id: str, started_not_before: dt.datetime, timeout: int, max_wait_seconds: int, poll_interval: int) -> str: deadline = time.time() + max_wait_seconds while time.time() < deadline: hist = list_report_history(api_key, status_filter="READY", report_type="audit-logs", limit=200, timeout=timeout).get("report_history", []) for it in hist: if it.get("report_identifier") != report_id or not it.get("report_run_identifier"): continue try: rsd = dt.datetime.strptime(it.get("report_start_date",""), "%Y-%m-%d %H:%M:%S").replace(tzinfo=dt.timezone.utc) except Exception: rsd = started_not_before if rsd + dt.timedelta(seconds=60) >= started_not_before: return it["report_run_identifier"] time.sleep(poll_interval) raise TimeoutError("READY run not found in time") def get_json_rows(api_key: str, report_id: str, run_id: str, timeout: int) -> List[Dict[str, Any]]: st, h, body = _http("GET", f"{API_BASE}/report/{report_id}/{run_id}/json", api_key, timeout=timeout) if st != 200: raise RuntimeError(f"Get JSON failed: {st} {body[:200]!r}") if "application/zip" in h.get("content-type","").lower() or body[:2] == b"PK": with zipfile.ZipFile(io.BytesIO(body)) as zf: name = next((n for n in zf.namelist() if n.lower().endswith(".json")), None) if not name: raise RuntimeError("ZIP has no JSON") rows = json.loads(zf.read(name).decode("utf-8")) else: rows = json.loads(body.decode("utf-8")) if not isinstance(rows, list): raise RuntimeError("Unexpected JSON format") return rows def load_state(bucket: str, key: str) -> Dict[str, Any]: try: return json.loads(s3.get_object(Bucket=bucket, Key=key)["Body"].read().decode("utf-8")) except ClientError as e: if e.response["Error"]["Code"] in ("NoSuchKey","404"): return {} raise def save_state(bucket: str, key: str, state: Dict[str, Any]) -> None: s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode("utf-8"), ContentType="application/json") def write_ndjson_gz(bucket: str, prefix: str, rows: Iterable[Dict[str, Any]], run_id: str) -> str: ts = _now().strftime("%Y/%m/%d/%H%M%S") key = f"{prefix}/{ts}-digicert-audit-{run_id[:8]}-{uuid.uuid4().hex}.json.gz" buf = io.BytesIO() with gzip.GzipFile(fileobj=buf, mode="wb") as gz: for r in rows: gz.write((json.dumps(r, separators=(',',':')) + "\n").encode("utf-8")) s3.put_object(Bucket=bucket, Key=key, Body=buf.getvalue(), ContentType="application/x-ndjson", ContentEncoding="gzip") return key def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]: api_key = os.environ["DIGICERT_API_KEY"] report_id = os.environ["DIGICERT_REPORT_ID"] bucket = os.environ["S3_BUCKET"] prefix = os.environ.get("S3_PREFIX", "digicert/logs").rstrip("/") state_key = os.environ.get("STATE_KEY", f"{prefix}/state.json") max_wait = int(os.environ.get("MAX_WAIT_SECONDS", "300")) poll_int = int(os.environ.get("POLL_INTERVAL", "10")) timeout = int(os.environ.get("REQUEST_TIMEOUT", "30")) state = load_state(bucket, state_key) if state_key else {} last_run = state.get("last_run_id") started = _now() start_report_run(api_key, report_id, timeout) run_id = find_ready_run(api_key, report_id, started, timeout, max_wait, poll_int) if last_run and last_run == run_id: return {"status":"skip", "report_run_identifier": run_id} rows = get_json_rows(api_key, report_id, run_id, timeout) key = write_ndjson_gz(bucket, prefix, rows, run_id) if state_key: save_state(bucket, state_key, {"last_run_id": run_id, "last_success_at": _now().isoformat(), "last_s3_key": key, "rows_count": len(rows)}) return {"status":"ok", "report_identifier": report_id, "report_run_identifier": run_id, "rows": len(rows), "s3_key": key}
Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Exemplo S3_BUCKET
digicert-logs
S3_PREFIX
digicert/logs/
STATE_KEY
digicert/logs/state.json
DIGICERT_API_KEY
xxxxxxxxxxxxxxxxxxxxxxxx
DIGICERT_REPORT_ID
88de5e19-ec57-4d70-865d-df953b062574
REQUEST_TIMEOUT
30
POLL_INTERVAL
10
MAX_WAIT_SECONDS
300
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 15 minutos (900 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda.
- Nome:
digicert-audit-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Users e, de seguida, clique em Add users.
- Indique os seguintes detalhes de configuração:
- Utilizador: introduza um nome único (por exemplo,
secops-reader
) - Tipo de acesso: selecione Chave de acesso – Acesso programático
- Clique em Criar utilizador.
- Utilizador: introduza um nome único (por exemplo,
- Anexe a política de leitura mínima (personalizada): Utilizadores > selecione
secops-reader
> Autorizações > Adicionar autorizações > Anexar políticas diretamente > Criar política No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
Nome =
secops-reader-policy
.Clique em Criar política > procure/selecione > Seguinte > Adicionar autorizações.
Crie uma chave de acesso para
secops-reader
: Credenciais de segurança > Chaves de acesso > Criar chave de acesso Criar chave de acesso**.Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos da DigiCert
- Aceda a Definições do SIEM > Feeds.
- Clique em Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
DigiCert Audit Logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione Digicert como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://digicert-logs/digicert/logs/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: predefinição de 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta a aplicar aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.