Mengumpulkan log audit tingkat grup Snyk

Didukung di:

Dokumen ini menjelaskan cara menyerap log audit tingkat grup Snyk ke Google Security Operations menggunakan Amazon S3. Parser pertama-tama membersihkan kolom yang tidak diperlukan dari log mentah. Kemudian, informasi yang relevan seperti detail pengguna, jenis peristiwa, dan stempel waktu akan diekstrak, diubah, dan dipetakan ke skema UDM Google SecOps untuk representasi log keamanan standar.

Sebelum memulai

Pastikan Anda memenuhi prasyarat berikut:

  • Instance Google SecOps
  • Akses istimewa ke Snyk (Admin Grup) dan token API dengan akses ke Grup
  • Akses istimewa ke AWS (S3, IAM, Lambda, EventBridge)

Mengumpulkan prasyarat log audit tingkat Grup Snyk (ID, kunci API, ID org, token)

  1. Di Snyk, klik avatar Anda > Account settings > API token.
    • Klik Cabut & buat ulang (atau Buat), lalu salin token.
    • Simpan token ini sebagai variabel lingkungan SNYK_API_TOKEN.
  2. Di Snyk, beralihlah ke Grup Anda (pengalih kiri atas).
    • Buka Setelan grup. Salin <GROUP_ID> dari URL: https://app.snyk.io/group/<GROUP_ID>/settings.
    • Atau gunakan REST API: GET https://api.snyk.io/rest/groups?version=2021-06-04 dan pilih id.
  3. Pastikan pengguna token memiliki izin Lihat Log Audit (group.audit.read).

Mengonfigurasi bucket AWS S3 dan IAM untuk Google SecOps

  1. Buat bucket Amazon S3 dengan mengikuti panduan pengguna ini: Membuat bucket
  2. Simpan Name dan Region bucket untuk referensi di masa mendatang (misalnya, snyk-audit).
  3. Buat pengguna dengan mengikuti panduan pengguna ini: Membuat pengguna IAM.
  4. Pilih Pengguna yang dibuat.
  5. Pilih tab Kredensial keamanan.
  6. Klik Create Access Key di bagian Access Keys.
  7. Pilih Layanan pihak ketiga sebagai Kasus penggunaan.
  8. Klik Berikutnya.
  9. Opsional: tambahkan tag deskripsi.
  10. Klik Create access key.
  11. Klik Download CSV file untuk menyimpan Access Key dan Secret Access Key untuk digunakan nanti.
  12. Klik Selesai.
  13. Pilih tab Izin.
  14. Klik Tambahkan izin di bagian Kebijakan izin.
  15. Pilih Tambahkan izin.
  16. Pilih Lampirkan kebijakan secara langsung
  17. Telusuri dan pilih kebijakan AmazonS3FullAccess.
  18. Klik Berikutnya.
  19. Klik Add permissions.

Mengonfigurasi kebijakan dan peran IAM untuk upload S3

  1. Di konsol AWS, buka IAM > Policies > Create policy > JSON tab.
  2. Masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutSnykAuditObjects",
          "Effect": "Allow",
          "Action": [
            "s3:PutObject",
            "s3:GetObject"
          ],
          "Resource": "arn:aws:s3:::snyk-audit/*"
        }
      ]
    }
    
  3. Klik Berikutnya > Buat kebijakan.

  4. Buka IAM > Roles > Create role > AWS service > Lambda.

  5. Lampirkan kebijakan yang baru dibuat.

  6. Beri nama peran WriteSnykAuditToS3Role, lalu klik Buat peran.

Buat fungsi Lambda

  1. Di Konsol AWS, buka Lambda > Functions > Create function.
  2. Klik Buat dari awal.
  3. Berikan detail konfigurasi berikut:
Setelan Nilai
Nama snyk_group_audit_to_s3
Runtime Python 3.13
Arsitektur x86_64
Peran eksekusi WriteSnykAuditToS3Role
  1. Setelah fungsi dibuat, buka tab Code, hapus stub, dan masukkan kode berikut (snyk_group_audit_to_s3.py):

    # snyk_group_audit_to_s3.py
    #!/usr/bin/env python3
    # Lambda: Pull Snyk Group-level Audit Logs (REST) to S3 (no transform)
    
    import os
    import json
    import time
    import urllib.parse
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError
    import boto3
    
    BASE = os.environ.get("SNYK_API_BASE", "https://api.snyk.io").rstrip("/")
    GROUP_ID = os.environ["SNYK_GROUP_ID"].strip()
    API_TOKEN = os.environ["SNYK_API_TOKEN"].strip()
    BUCKET = os.environ["S3_BUCKET"].strip()
    PREFIX = os.environ.get("S3_PREFIX", "snyk/audit/").strip()
    SIZE = int(os.environ.get("SIZE", "100"))  # max 100 per docs
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "20"))
    STATE_KEY = os.environ.get("STATE_KEY", "snyk/audit/state.json")
    API_VERSION = os.environ.get("SNYK_API_VERSION", "2021-06-04").strip()  # required by REST API
    LOOKBACK_SECONDS = int(os.environ.get("LOOKBACK_SECONDS", "3600"))  # used only when no cursor
    
    # Optional filters
    EVENTS_CSV = os.environ.get("EVENTS", "").strip()            # e.g. "group.create,org.user.invited"
    EXCLUDE_EVENTS_CSV = os.environ.get("EXCLUDE_EVENTS", "").strip()
    
    s3 = boto3.client("s3")
    
    HDRS = {
        # REST authentication requires "token" scheme and vnd.api+json Accept
        "Authorization": f"token {API_TOKEN}",
        "Accept": "application/vnd.api+json",
    }
    
    def _get_state() -> str | None:
        try:
            obj = s3.get_object(Bucket=BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read()).get("cursor")
        except Exception:
            return None
    
    def _put_state(cursor: str):
        s3.put_object(Bucket=BUCKET, Key=STATE_KEY, Body=json.dumps({"cursor": cursor}).encode("utf-8"))
    
    def _write(payload: dict) -> str:
        ts = time.strftime("%Y/%m/%d/%H%M%S", time.gmtime())
        key = f"{PREFIX.rstrip('/')}/{ts}-snyk-group-audit.json"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"),
            ContentType="application/json",
        )
        return key
    
    def _parse_next_cursor_from_links(links: dict | None) -> str | None:
        if not links:
            return None
        nxt = links.get("next")
        if not nxt:
            return None
        try:
            q = urllib.parse.urlparse(nxt).query
            params = urllib.parse.parse_qs(q)
            cur = params.get("cursor")
            return cur[0] if cur else None
        except Exception:
            return None
    
    def _http_get(url: str) -> dict:
        req = Request(url, method="GET", headers=HDRS)
        try:
            with urlopen(req, timeout=60) as r:
                return json.loads(r.read().decode("utf-8"))
        except HTTPError as e:
            # Back off on rate limit or transient server errors; single retry
            if e.code in (429, 500, 502, 503, 504):
                delay = int(e.headers.get("Retry-After", "1"))
                time.sleep(max(1, delay))
                with urlopen(req, timeout=60) as r2:
                    return json.loads(r2.read().decode("utf-8"))
            raise
    
    def _as_list(csv_str: str) -> list[str]:
        return [x.strip() for x in csv_str.split(",") if x.strip()]
    
    def fetch_page(cursor: str | None, first_run_from_iso: str | None):
        base_path = f"/rest/groups/{GROUP_ID}/audit_logs/search"
        params: dict[str, object] = {
            "version": API_VERSION,
            "size": SIZE,
        }
        if cursor:
            params["cursor"] = cursor
        elif first_run_from_iso:
            params["from"] = first_run_from_iso  # RFC3339
    
        events = _as_list(EVENTS_CSV)
        exclude_events = _as_list(EXCLUDE_EVENTS_CSV)
        if events and exclude_events:
            # API does not allow both at the same time; prefer explicit include
            exclude_events = []
        if events:
            params["events"] = events  # will be encoded as repeated params
        if exclude_events:
            params["exclude_events"] = exclude_events
    
        url = f"{BASE}{base_path}?{urllib.parse.urlencode(params, doseq=True)}"
        return _http_get(url)
    
    def lambda_handler(event=None, context=None):
        cursor = _get_state()
        pages = 0
        total = 0
        last_cursor = cursor
    
        # Only for the very first run (no saved cursor), constrain the time window
        first_run_from_iso = None
        if not cursor and LOOKBACK_SECONDS > 0:
            first_run_from_iso = time.strftime(
                "%Y-%m-%dT%H:%M:%SZ", time.gmtime(time.time() - LOOKBACK_SECONDS)
            )
    
        while pages < MAX_PAGES:
            payload = fetch_page(cursor, first_run_from_iso)
            _write(payload)
    
            # items are nested under data.items per Snyk docs
            data_obj = payload.get("data") or {}
            items = data_obj.get("items") or []
            if isinstance(items, list):
                total += len(items)
    
            cursor = _parse_next_cursor_from_links(payload.get("links"))
            pages += 1
            if not cursor:
                break
    
            # after first page, disable from-filter
            first_run_from_iso = None
    
        if cursor and cursor != last_cursor:
            _put_state(cursor)
    
        return {"ok": True, "pages": pages, "events": total, "next_cursor": cursor}
    
    if __name__ == "__main__":
        print(lambda_handler())
    

Menambahkan variabel lingkungan

  1. Buka Configuration > Environment variables.
  2. Klik Edit > Tambahkan variabel lingkungan baru.
  3. Masukkan variabel lingkungan berikut, ganti dengan nilai Anda:

    Kunci Contoh
    S3_BUCKET snyk-audit
    S3_PREFIX snyk/audit/
    STATE_KEY snyk/audit/state.json
    SNYK_GROUP_ID <your_group_id>
    SNYK_API_TOKEN xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx
    SNYK_API_BASE https://api.snyk.io (opsional)
    SNYK_API_VERSION 2021-06-04
    SIZE 100
    MAX_PAGES 20
    LOOKBACK_SECONDS 3600
    EVENTS (opsional) group.create,org.user.add
    EXCLUDE_EVENTS (opsional) api.access
  4. Setelah fungsi dibuat, tetap buka halamannya (atau buka Lambda > Functions > your-function).

  5. Pilih tab Configuration

  6. Di panel General configuration, klik Edit.

  7. Ubah Waktu tunggu menjadi 5 menit (300 detik), lalu klik Simpan.

Membuat jadwal EventBridge

  1. Buka Amazon EventBridge > Scheduler > Create schedule.
  2. Berikan detail konfigurasi berikut:
    • Jadwal berulang: Tarif (1 hour).
    • Target: Fungsi Lambda Anda.
    • Name: snyk-group-audit-1h.
  3. Klik Buat jadwal.

Opsional: Buat pengguna & kunci IAM hanya baca untuk Google SecOps

  1. Di Konsol AWS, buka IAM > Pengguna > Tambahkan pengguna.
  2. Klik Add users.
  3. Berikan detail konfigurasi berikut:
    • Pengguna: secops-reader.
    • Jenis akses: Kunci akses — Akses terprogram.
  4. Klik Buat pengguna.
  5. Lampirkan kebijakan baca minimal (kustom): Pengguna > secops-reader > Izin > Tambahkan izin > Lampirkan kebijakan secara langsung > Buat kebijakan.
  6. Di editor JSON, masukkan kebijakan berikut:

    {
      "Version": "2012-10-17",
      "Statement": [
        { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::snyk-audit/*" },
        { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::snyk-audit" }
      ]
    }
    
  7. Tetapkan nama ke secops-reader-policy.

  8. Buka Buat kebijakan > cari/pilih > Berikutnya > Tambahkan izin.

  9. Buka Kredensial keamanan > Kunci akses > Buat kunci akses.

  10. Download CSV (nilai ini dimasukkan ke dalam feed).

Mengonfigurasi feed di Google SecOps untuk menyerap Log audit tingkat Grup Snyk

  1. Buka Setelan SIEM > Feed.
  2. Klik + Tambahkan Feed Baru.
  3. Di kolom Nama feed, masukkan nama untuk feed (misalnya, Snyk Group Audit Logs).
  4. Pilih Amazon S3 V2 sebagai Jenis sumber.
  5. Pilih Snyk Group level audit Logs sebagai Log type.
  6. Klik Berikutnya.
  7. Tentukan nilai untuk parameter input berikut:
    • URI S3: s3://snyk-audit/snyk/audit/
    • Opsi penghapusan sumber: Pilih opsi penghapusan sesuai preferensi Anda.
    • Usia File Maksimum: Menyertakan file yang diubah dalam jumlah hari terakhir. Defaultnya adalah 180 hari.
    • ID Kunci Akses: Kunci akses pengguna dengan akses ke bucket S3.
    • Kunci Akses Rahasia: Kunci rahasia pengguna dengan akses ke bucket S3.
    • Namespace aset: snyk.group_audit
    • Label penyerapan: Tambahkan jika diinginkan.
  8. Klik Berikutnya.
  9. Tinjau konfigurasi feed baru Anda di layar Selesaikan, lalu klik Kirim.

Tabel Pemetaan UDM

Kolom Log Pemetaan UDM Logika
content.url principal.url Dipetakan langsung dari kolom content.url dalam log mentah.
dibuat metadata.event_timestamp Diuraikan dari kolom created dalam log mentah menggunakan format ISO8601.
event metadata.product_event_type Dipetakan langsung dari kolom event dalam log mentah.
groupId principal.user.group_identifiers Dipetakan langsung dari kolom groupId dalam log mentah.
orgId principal.user.attribute.labels.key Ditetapkan ke "orgId".
orgId principal.user.attribute.labels.value Dipetakan langsung dari kolom orgId dalam log mentah.
userId principal.user.userid Dipetakan langsung dari kolom userId dalam log mentah.
T/A metadata.event_type Di-hardcode ke "USER_UNCATEGORIZED" dalam kode parser.
T/A metadata.log_type Di-hardcode ke "SNYK_SDLC" dalam kode parser.
T/A metadata.product_name Di-hardcode ke "SNYK SDLC" dalam kode parser.
T/A metadata.vendor_name Di-hardcode ke "SNYK_SDLC" dalam kode parser.

Perlu bantuan lain? Dapatkan jawaban dari anggota Komunitas dan profesional Google SecOps.