Mengumpulkan log Platform ZeroFox

Didukung di:

Dokumen ini menjelaskan cara menyerap log Platform ZeroFox ke Google Security Operations menggunakan Amazon S3.

Sebelum memulai

Pastikan Anda memenuhi prasyarat berikut:

  • Instance Google SecOps.
  • Akses istimewa ke tenant ZeroFox Platform.
  • Akses istimewa ke AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).

Mendapatkan prasyarat ZeroFox

  1. Login ke ZeroFox Platform di https://cloud.zerofox.com.
  2. Buka Data Connectors > API Data Feeds.
    • URL Langsung (setelah login): https://cloud.zerofox.com/data_connectors/api
    • Jika Anda tidak melihat item menu ini, hubungi administrator ZeroFox Anda untuk mendapatkan akses.
  3. Klik Generate Token atau Create Personal Access Token.
  4. Berikan detail konfigurasi berikut:
    • Nama: Masukkan nama deskriptif (misalnya, Google SecOps S3 Ingestion).
    • Masa berlaku: Pilih periode rotasi sesuai dengan kebijakan keamanan organisasi Anda.
    • Izin/Feed: Pilih izin baca untuk: Alerts, CTI feeds, dan jenis data lain yang ingin Anda ekspor
  5. Klik Generate.
  6. Salin dan simpan Personal Access Token yang dibuat di lokasi yang aman (Anda tidak akan dapat melihatnya lagi).
  7. Simpan ZEROFOX_BASE_URL: https://api.zerofox.com (default untuk sebagian besar tenant)

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, zerofox-platform-logs).
  3. Buat Pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: Tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download file .CSV untuk menyimpan Kunci Akses dan Kunci Akses Rahasia untuk referensi di masa mendatang.
  12. Klik Selesai.
  13. Pilih tab Permissions.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung.
  17. Cari kebijakan AmazonS3FullAccess.
  18. Pilih kebijakan.
  19. Klik Berikutnya.
  20. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Di konsol AWS, buka IAM > Policies,
  2. Klik Buat kebijakan > tab JSON.
  3. Salin dan tempel kebijakan berikut.
  4. Policy JSON (ganti zerofox-platform-logs jika Anda memasukkan nama bucket yang berbeda):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::zerofox-platform-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::zerofox-platform-logs/zerofox/platform/state.json"
        }
      ]
    }
    
  5. Klik Berikutnya > Buat kebijakan.

  6. Buka IAM > Roles > Create role > AWS service > Lambda.

  7. Lampirkan kebijakan yang baru dibuat.

  8. Beri nama peran ZeroFoxPlatformToS3Role, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di Konsol AWS, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:

    Setelan Nilai
    Nama zerofox_platform_to_s3
    Runtime Python 3.13
    Arsitektur x86_64
    Peran eksekusi ZeroFoxPlatformToS3Role
  4. Setelah fungsi dibuat, buka tab Code, hapus stub, dan tempelkan kode berikut (zerofox_platform_to_s3.py).

    #!/usr/bin/env python3
    # Lambda: Pull ZeroFox Platform data (alerts/incidents/logs) to S3 (no transform)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    S3_BUCKET     = os.environ["S3_BUCKET"]
    S3_PREFIX     = os.environ.get("S3_PREFIX", "zerofox/platform/")
    STATE_KEY     = os.environ.get("STATE_KEY", "zerofox/platform/state.json")
    LOOKBACK_SEC  = int(os.environ.get("LOOKBACK_SECONDS", "3600"))
    PAGE_SIZE     = int(os.environ.get("PAGE_SIZE", "200"))
    MAX_PAGES     = int(os.environ.get("MAX_PAGES", "20"))
    HTTP_TIMEOUT  = int(os.environ.get("HTTP_TIMEOUT", "60"))
    HTTP_RETRIES  = int(os.environ.get("HTTP_RETRIES", "3"))
    URL_TEMPLATE  = os.environ.get("URL_TEMPLATE", "")
    AUTH_HEADER   = os.environ.get("AUTH_HEADER", "")  # e.g. "Authorization: Bearer <token>"
    
    ZEROFOX_BASE_URL = os.environ.get("ZEROFOX_BASE_URL", "https://api.zerofox.com")
    ZEROFOX_API_TOKEN = os.environ.get("ZEROFOX_API_TOKEN", "")
    
    s3 = boto3.client("s3")
    
    def _iso(ts: float) -> str:
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(ts))
    
    def _load_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            b = obj["Body"].read()
            return json.loads(b) if b else {}
        except Exception:
            return {"last_since": _iso(time.time() - LOOKBACK_SEC)}
    
    def _save_state(st: dict) -> None:
        s3.put_object(
            Bucket=S3_BUCKET, Key=STATE_KEY,
            Body=json.dumps(st, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
    
    def _headers() -> dict:
        hdrs = {"Accept": "application/json", "Content-Type": "application/json"}
        if AUTH_HEADER:
            try:
                k, v = AUTH_HEADER.split(":", 1)
                hdrs[k.strip()] = v.strip()
            except ValueError:
                hdrs["Authorization"] = AUTH_HEADER.strip()
        elif ZEROFOX_API_TOKEN:
            hdrs["Authorization"] = f"Bearer {ZEROFOX_API_TOKEN}"
        return hdrs
    
    def _http_get(url: str) -> dict:
        attempt = 0
        while True:
            try:
                req = Request(url, method="GET")
                for k, v in _headers().items():
                    req.add_header(k, v)
                with urlopen(req, timeout=HTTP_TIMEOUT) as r:
                    body = r.read()
                    try:
                        return json.loads(body.decode("utf-8"))
                    except json.JSONDecodeError:
                        return {"raw": body.decode("utf-8", errors="replace")}
            except HTTPError as e:
                if e.code in (429, 500, 502, 503, 504) and attempt < HTTP_RETRIES:
                    retry_after = int(e.headers.get("Retry-After", 1 + attempt))
                    time.sleep(max(1, retry_after))
                    attempt += 1
                    continue
                raise
            except URLError:
                if attempt < HTTP_RETRIES:
                    time.sleep(1 + attempt)
                    attempt += 1
                    continue
                raise
    
    def _put_json(obj: dict, label: str) -> str:
        ts = time.gmtime()
        key = f"{S3_PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', ts)}-zerofox-{label}.json"
        s3.put_object(
            Bucket=S3_BUCKET, Key=key,
            Body=json.dumps(obj, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _extract_next_token(payload: dict):
        next_token = (payload.get("next") or payload.get("next_token") or 
                      payload.get("nextPageToken") or payload.get("next_page_token"))
        if isinstance(next_token, dict):
            return next_token.get("token") or next_token.get("cursor") or next_token.get("value")
        return next_token
    
    def _extract_items(payload: dict) -> list:
        for key in ("results", "data", "alerts", "items", "logs", "events"):
            if isinstance(payload.get(key), list):
                return payload[key]
        return []
    
    def _extract_newest_timestamp(items: list, current: str) -> str:
        newest = current
        for item in items:
            timestamp = (item.get("timestamp") or item.get("created_at") or 
                        item.get("last_modified") or item.get("event_time") or 
                        item.get("log_time") or item.get("updated_at"))
            if isinstance(timestamp, str) and timestamp > newest:
                newest = timestamp
        return newest
    
    def lambda_handler(event=None, context=None):
        st = _load_state()
        since = st.get("last_since") or _iso(time.time() - LOOKBACK_SEC)
    
        # Use URL_TEMPLATE if provided, otherwise construct default alerts endpoint
        if URL_TEMPLATE:
            base_url = URL_TEMPLATE.replace("{SINCE}", urllib.parse.quote(since))
        else:
            base_url = f"{ZEROFOX_BASE_URL}/v1/alerts?since={urllib.parse.quote(since)}"
    
        page_token = ""
        pages = 0
        total_items = 0
        newest_since = since
    
        while pages < MAX_PAGES:
            # Construct URL with pagination
            if URL_TEMPLATE:
                url = (base_url
                      .replace("{PAGE_TOKEN}", urllib.parse.quote(page_token))
                      .replace("{PAGE_SIZE}", str(PAGE_SIZE)))
            else:
                url = f"{base_url}&limit={PAGE_SIZE}"
                if page_token:
                    url += f"&page_token={urllib.parse.quote(page_token)}"
    
            payload = _http_get(url)
            _put_json(payload, f"page-{pages:05d}")
    
            items = _extract_items(payload)
            total_items += len(items)
            newest_since = _extract_newest_timestamp(items, newest_since)
    
            pages += 1
            next_token = _extract_next_token(payload)
            if not next_token:
                break
            page_token = str(next_token)
    
        if newest_since and newest_since != st.get("last_since"):
            st["last_since"] = newest_since
            _save_state(st)
    
        return {"ok": True, "pages": pages, "items": total_items, "since": since, "new_since": newest_since}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  5. Buka Configuration > Environment variables.

  6. Klik Edit > Tambahkan variabel lingkungan baru.

  7. Masukkan variabel lingkungan yang diberikan dalam tabel berikut, dengan mengganti nilai contoh dengan nilai Anda.

    Variabel lingkungan

    Kunci Nilai contoh
    S3_BUCKET zerofox-platform-logs
    S3_PREFIX zerofox/platform/
    STATE_KEY zerofox/platform/state.json
    ZEROFOX_BASE_URL https://api.zerofox.com
    ZEROFOX_API_TOKEN your-zerofox-personal-access-token
    LOOKBACK_SECONDS 3600
    PAGE_SIZE 200
    MAX_PAGES 20
    HTTP_TIMEOUT 60
    HTTP_RETRIES 3
    URL_TEMPLATE (opsional) Template URL kustom dengan {SINCE}, {PAGE_TOKEN}, {PAGE_SIZE}
    AUTH_HEADER (opsional) Authorization: Bearer <token> untuk autentikasi kustom
  8. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).

  9. Pilih tab Configuration

  10. Di panel Konfigurasi umum, klik Edit.

  11. Ubah Waktu Tunggu menjadi 5 menit (300 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour).
    • Target: Fungsi Lambda Anda zerofox_platform_to_s3.
    • Name: zerofox-platform-1h.
  3. Klik Buat jadwal.

(Opsional) Buat pengguna & kunci IAM hanya baca untuk Google SecOps

  1. Buka Konsol AWS > IAM > Pengguna.
  2. Klik Add users.
  3. Berikan detail konfigurasi berikut:
    • Pengguna: Masukkan secops-reader.
    • Jenis akses: Pilih Kunci akses — Akses terprogram.
  4. Klik Buat pengguna.
  5. Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::zerofox-platform-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::zerofox-platform-logs"
        }
      ]
    }
    
  7. Nama = secops-reader-policy.

  8. Klik Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.

  9. Buat kunci akses untuk secops-reader: Kredensial keamanan > Kunci akses.

  10. Klik Create access key.

  11. Download .CSV. (Anda akan menempelkan nilai ini ke feed).

Mengonfigurasi feed di Google SecOps untuk menyerap log Platform ZeroFox

  1. Buka Setelan SIEM > Feed.
  2. Klik + Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, ZeroFox Platform Logs).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih ZeroFox Platform sebagai Jenis log.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://zerofox-platform-logs/zerofox/platform/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: Namespace aset.
    • Label penyerapan: Label yang diterapkan ke peristiwa dari feed ini.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.