Raccogliere i log delle attività di Rippling

Supportato in:

Questo documento spiega come importare i log attività di Rippling in Google Security Operations utilizzando Amazon S3.

Prima di iniziare

Assicurati di soddisfare i seguenti prerequisiti:

  • Un'istanza Google SecOps.
  • Accesso privilegiato a Rippling (token API con accesso all'attività aziendale).
  • Accesso privilegiato ad AWS (S3, Identity and Access Management (IAM), Lambda, EventBridge).

Ottenere i prerequisiti di Rippling

  1. Accedi a Rippling Admin.
  2. Apri Ricerca > Token API.
    Percorso alternativo: Impostazioni > Impostazioni azienda > Token API.
  3. Fai clic su Genera token API.
  4. Fornisci i seguenti dettagli di configurazione:
    • Nome: fornisci un nome univoco e significativo (ad esempio, Google SecOps S3 Export).
    • Versione API: API di base (v1)
    • Ambiti/Autorizzazioni: attiva company:activity:read (obbligatorio per Attività aziendale).
  5. Fai clic su Crea e salva il valore del token in una posizione sicura. Lo utilizzerai come token di autenticazione.

Configura il bucket AWS S3 e IAM per Google SecOps

  1. Crea un bucket Amazon S3 seguendo questa guida utente: Creazione di un bucket
  2. Salva il nome e la regione del bucket per riferimento futuro (ad esempio, rippling-activity-logs).
  3. Crea un utente seguendo questa guida utente: Creazione di un utente IAM.
  4. Seleziona l'utente creato.
  5. Seleziona la scheda Credenziali di sicurezza.
  6. Fai clic su Crea chiave di accesso nella sezione Chiavi di accesso.
  7. Seleziona Servizio di terze parti come Caso d'uso.
  8. Fai clic su Avanti.
  9. (Facoltativo) Aggiungi il tag della descrizione.
  10. Fai clic su Crea chiave di accesso.
  11. Fai clic su Scarica file CSV per salvare la chiave di accesso e la chiave di accesso segreta per riferimento futuro.
  12. Fai clic su Fine.
  13. Seleziona la scheda Autorizzazioni.
  14. Fai clic su Aggiungi autorizzazioni nella sezione Criteri per le autorizzazioni.
  15. Seleziona Aggiungi autorizzazioni.
  16. Seleziona Allega direttamente i criteri.
  17. Cerca i criteri AmazonS3FullAccess.
  18. Seleziona la policy.
  19. Fai clic su Avanti.
  20. Fai clic su Aggiungi autorizzazioni.

Configura il ruolo e il criterio IAM per i caricamenti S3

  1. Nella console AWS, vai a IAM > Policy.
  2. Fai clic su Crea criterio > scheda JSON.
  3. Copia e incolla i seguenti criteri.
  4. Policy JSON (sostituisci i valori se hai inserito un bucket o un prefisso diverso):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::rippling-activity-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::rippling-activity-logs/rippling/activity/state.json"
        }
      ]
    }
    ````
    
  5. Fai clic su Avanti > Crea criterio.

  6. Vai a IAM > Ruoli > Crea ruolo > Servizio AWS > Lambda.

  7. Allega il criterio appena creato.

  8. Assegna al ruolo il nome WriteRipplingToS3Role e fai clic su Crea ruolo.

Crea la funzione Lambda

  1. Nella console AWS, vai a Lambda > Funzioni > Crea funzione.
  2. Fai clic su Crea autore da zero.
  3. Fornisci i seguenti dettagli di configurazione:

    Impostazione Valore
    Nome rippling_activity_to_s3
    Tempo di esecuzione Python 3.13
    Architettura x86_64
    Ruolo di esecuzione WriteRipplingToS3Role
  4. Dopo aver creato la funzione, apri la scheda Codice, elimina lo stub e incolla il seguente codice (rippling_activity_to_s3.py).

    #!/usr/bin/env python3
    # Lambda: Pull Rippling Company Activity logs to S3 (raw JSON, no transforms)
    
    import os, json, time, urllib.parse
    from urllib.request import Request, urlopen
    from datetime import datetime, timezone, timedelta
    import boto3
    
    API_TOKEN = os.environ["RIPPLING_API_TOKEN"]
    ACTIVITY_URL = os.environ.get("RIPPLING_ACTIVITY_URL", "https://api.rippling.com/platform/api/company_activity")
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "rippling/activity/")
    STATE_KEY = os.environ.get("STATE_KEY", "rippling/activity/state.json")
    
    LIMIT = int(os.environ.get("LIMIT", "1000"))
    MAX_PAGES = int(os.environ.get("MAX_PAGES", "10"))
    LOOKBACK_MINUTES = int(os.environ.get("LOOKBACK_MINUTES", "60"))
    END_LAG_SECONDS = int(os.environ.get("END_LAG_SECONDS", "120"))
    
    s3 = boto3.client("s3")
    
    def _headers():
        return {"Authorization": f"Bearer {API_TOKEN}", "Accept": "application/json"}
    
    def _get_state():
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            j = json.loads(obj["Body"].read())
            return {"since": j.get("since"), "next": j.get("next")}
        except Exception:
            return {"since": None, "next": None}
    
    def _put_state(since_iso, next_cursor):
        body = json.dumps({"since": since_iso, "next": next_cursor}, separators=(",", ":")).encode("utf-8")
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=body)
    
    def _get(url):
        req = Request(url, method="GET")
        for k, v in _headers().items():
            req.add_header(k, v)
        with urlopen(req, timeout=60) as r:
            return json.loads(r.read().decode("utf-8"))
    
    def _build_url(base, params):
        qs = urllib.parse.urlencode(params)
        return f"{base}?{qs}" if qs else base
    
    def _parse_iso(ts):
        if ts.endswith("Z"):
            ts = ts[:-1] + "+00:00"
        return datetime.fromisoformat(ts)
    
    def _iso_from_epoch(sec):
        return datetime.fromtimestamp(sec, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z")
    
    def _write(payload, run_ts_iso, page_index, source="company_activity"):
        day_path = _parse_iso(run_ts_iso).strftime("%Y/%m/%d")
        key = f"{S3_PREFIX.strip('/')}/{day_path}/{run_ts_iso.replace(':','').replace('-','')}-page{page_index:05d}-{source}.json"
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=json.dumps(payload, separators=(",", ":")).encode("utf-8"))
        return key
    
    def lambda_handler(event=None, context=None):
        state = _get_state()
        run_end = datetime.now(timezone.utc) - timedelta(seconds=END_LAG_SECONDS)
        end_iso = run_end.replace(microsecond=0).isoformat().replace("+00:00", "Z")
    
        since_iso = state["since"]
        next_cursor = state["next"]
    
        if since_iso is None:
            since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60)
        else:
            try:
                since_iso = (_parse_iso(since_iso) + timedelta(seconds=1)).replace(microsecond=0).isoformat().replace("+00:00", "Z")
            except Exception:
                since_iso = _iso_from_epoch(time.time() - LOOKBACK_MINUTES * 60)
    
        run_ts_iso = end_iso
        pages = 0
        total = 0
        newest_ts = None
        pending_next = None
    
        while pages < MAX_PAGES:
            params = {"limit": str(LIMIT)}
            if next_cursor:
                params["next"] = next_cursor
            else:
                params["startDate"] = since_iso
                params["endDate"] = end_iso
    
            url = _build_url(ACTIVITY_URL, params)
            data = _get(url)
            _write(data, run_ts_iso, pages)
    
            events = data.get("events") or data.get("items") or data.get("data") or []
            total += len(events) if isinstance(events, list) else 0
    
            if isinstance(events, list):
                for ev in events:
                    t = ev.get("timestamp") or ev.get("time") or ev.get("event_time")
                    if isinstance(t, str):
                        try:
                            dt_ts = _parse_iso(t)
                            if newest_ts is None or dt_ts > newest_ts:
                                newest_ts = dt_ts
                        except Exception:
                            pass
    
            nxt = data.get("next") or data.get("next_cursor") or None
            pages += 1
    
            if nxt:
                next_cursor = nxt
                pending_next = nxt
                continue
            else:
                pending_next = None
                break
    
        new_since_iso = (newest_ts or run_end).replace(microsecond=0).isoformat().replace("+00:00", "Z")
        _put_state(new_since_iso, pending_next)
    
        return {"ok": True, "pages": pages, "events": total, "since": new_since_iso, "next": pending_next}
    
  5. Vai a Configurazione > Variabili di ambiente.

  6. Fai clic su Modifica > Aggiungi nuova variabile di ambiente.

  7. Inserisci le variabili di ambiente fornite nella tabella seguente, sostituendo i valori di esempio con i tuoi valori.

    Variabili di ambiente

    Chiave Valore di esempio
    S3_BUCKET rippling-activity-logs
    S3_PREFIX rippling/activity/
    STATE_KEY rippling/activity/state.json
    RIPPLING_API_TOKEN your-api-token
    RIPPLING_ACTIVITY_URL https://api.rippling.com/platform/api/company_activity
    LIMIT 1000
    MAX_PAGES 10
    LOOKBACK_MINUTES 60
    END_LAG_SECONDS 120
  8. Dopo aver creato la funzione, rimani sulla relativa pagina (o apri Lambda > Funzioni > la tua funzione).

  9. Seleziona la scheda Configurazione.

  10. Nel riquadro Configurazione generale, fai clic su Modifica.

  11. Modifica Timeout impostando 5 minuti (300 secondi) e fai clic su Salva.

Creare una pianificazione EventBridge

  1. Vai a Amazon EventBridge > Scheduler > Crea pianificazione.
  2. Fornisci i seguenti dettagli di configurazione:
    • Programma ricorrente: Tariffa (1 hour).
    • Target: la tua funzione Lambda rippling_activity_to_s3.
    • Nome: rippling-activity-logs-1h
  3. Fai clic su Crea pianificazione.

(Facoltativo) Crea chiavi e utenti IAM di sola lettura per Google SecOps

  1. Nella console AWS, vai a IAM > Utenti > Aggiungi utenti.
  2. Fai clic su Add users (Aggiungi utenti).
  3. Fornisci i seguenti dettagli di configurazione:
    • Utente: inserisci secops-reader.
    • Tipo di accesso: seleziona Chiave di accesso - Accesso programmatico.
  4. Fai clic su Crea utente.
  5. Collega la criterio per la lettura minima (personalizzata): Utenti > secops-reader > Autorizzazioni > Aggiungi autorizzazioni > Collega le norme direttamente > Crea norma.
  6. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::rippling-activity-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::rippling-activity-logs"
        }
      ]
    }
    
  7. Name = secops-reader-policy.

  8. Fai clic su Crea criterio > cerca/seleziona > Avanti > Aggiungi autorizzazioni.

  9. Crea la chiave di accesso per secops-reader: Credenziali di sicurezza > Chiavi di accesso.

  10. Fai clic su Crea chiave di accesso.

  11. Scarica il .CSV. Incollerai questi valori nel feed.

Configura un feed in Google SecOps per importare i log delle attività di Rippling

  1. Vai a Impostazioni SIEM > Feed.
  2. Fai clic su + Aggiungi nuovo feed.
  3. Nel campo Nome feed, inserisci un nome per il feed (ad esempio, Rippling Activity Logs).
  4. Seleziona Amazon S3 V2 come Tipo di origine.
  5. Seleziona Log delle attività di Rippling come Tipo di log.
  6. Fai clic su Avanti.
  7. Specifica i valori per i seguenti parametri di input:
    • URI S3: s3://rippling-activity-logs/rippling/activity/
    • Opzioni di eliminazione dell'origine: seleziona l'opzione di eliminazione in base alle tue preferenze.
    • Età massima del file: includi i file modificati nell'ultimo numero di giorni. Il valore predefinito è 180 giorni.
    • ID chiave di accesso: chiave di accesso utente con accesso al bucket S3.
    • Chiave di accesso segreta: chiave segreta dell'utente con accesso al bucket S3.
    • Spazio dei nomi dell'asset: rippling.activity
    • (Facoltativo) Etichette di importazione: aggiungi l'etichetta di importazione.
  8. Fai clic su Avanti.
  9. Controlla la nuova configurazione del feed nella schermata Finalizza e poi fai clic su Invia.

Hai bisogno di ulteriore assistenza? Ricevi risposte dai membri della community e dai professionisti di Google SecOps.