Recolha registos de operações do Zoom

Compatível com:

Este documento explica como carregar registos de operações do Zoom para o Google Security Operations através do Amazon S3. O analisador transforma os registos não processados num modelo de dados unificado (UDM). Extrai campos da mensagem de registo não processada, realiza a limpeza e a normalização de dados e mapeia as informações extraídas para os campos da UDM correspondentes, enriquecendo, em última análise, os dados para análise e correlação num sistema SIEM.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Instância do Google SecOps
  • Acesso privilegiado ao Zoom
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Recolha os pré-requisitos dos registos de operações do Zoom (IDs, chaves da API, IDs da organização e tokens)

  1. Inicie sessão no Zoom App Marketplace.
  2. Aceda a Desenvolver > Criar app > OAuth servidor a servidor.
  3. Crie a app e adicione o seguinte âmbito: report:read:operation_logs:admin (ou report:read:admin).
  4. Em Credenciais da app, copie e guarde os seguintes detalhes numa localização segura:
    • ID da conta.
    • ID de cliente.
    • Segredo do cliente.

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, zoom-operation-logs).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutZoomOperationLogs",
          "Effect": "Allow",
          "Action": ["s3:PutObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/*"
        },
        {
          "Sid": "AllowStateReadWrite",
          "Effect": "Allow",
          "Action": ["s3:GetObject", "s3:PutObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/zoom/operationlogs/state.json"
        }
      ]
    }
    
    • Substitua zoom-operation-logs se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política criada recentemente.

  6. Dê o nome WriteZoomOperationLogsToS3Role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:
Definição Valor
Nome zoom_operationlogs_to_s3
Runtime Python 3.13
Arquitetura x86_64
Função de execução WriteZoomOperationLogsToS3Role
  1. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código(zoom_operationlogs_to_s3.py):

    #!/usr/bin/env python3
    import os, json, gzip, io, uuid, datetime as dt, base64, urllib.parse, urllib.request
    import boto3
    
    # ---- Environment ----
    S3_BUCKET = os.environ["S3_BUCKET"]
    S3_PREFIX = os.environ.get("S3_PREFIX", "zoom/operationlogs/")
    STATE_KEY = os.environ.get("STATE_KEY", S3_PREFIX + "state.json")
    ZOOM_ACCOUNT_ID = os.environ["ZOOM_ACCOUNT_ID"]
    ZOOM_CLIENT_ID = os.environ["ZOOM_CLIENT_ID"]
    ZOOM_CLIENT_SECRET = os.environ["ZOOM_CLIENT_SECRET"]
    PAGE_SIZE = int(os.environ.get("PAGE_SIZE", "300"))  # API default 30; max may vary
    TIMEOUT = int(os.environ.get("TIMEOUT", "30"))
    
    TOKEN_URL = "https://zoom.us/oauth/token"
    REPORT_URL = "https://api.zoom.us/v2/report/operationlogs"
    
    s3 = boto3.client("s3")
    
    # ---- Helpers ----
    
    def _http(req: urllib.request.Request):
        return urllib.request.urlopen(req, timeout=TIMEOUT)
    
    def get_token() -> str:
        params = urllib.parse.urlencode({
            "grant_type": "account_credentials",
            "account_id": ZOOM_ACCOUNT_ID,
        }).encode()
        basic = base64.b64encode(f"{ZOOM_CLIENT_ID}:{ZOOM_CLIENT_SECRET}".encode()).decode()
        req = urllib.request.Request(
            TOKEN_URL,
            data=params,
            headers={
                "Authorization": f"Basic {basic}",
                "Content-Type": "application/x-www-form-urlencoded",
                "Accept": "application/json",
                "Host": "zoom.us",
            },
            method="POST",
        )
        with _http(req) as r:
            body = json.loads(r.read())
            return body["access_token"]
    
    def get_state() -> dict:
        try:
            obj = s3.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
            return json.loads(obj["Body"].read())
        except Exception:
            # initial state: start today
            today = dt.date.today().isoformat()
            return {"cursor_date": today, "next_page_token": None}
    
    def put_state(state: dict):
        state["updated_at"] = dt.datetime.utcnow().isoformat() + "Z"
        s3.put_object(Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state).encode())
    
    def write_chunk(items: list[dict], ts: dt.datetime) -> str:
        key = f"{S3_PREFIX}{ts:%Y/%m/%d}/zoom-operationlogs-{uuid.uuid4()}.json.gz"
        buf = io.BytesIO()
        with gzip.GzipFile(fileobj=buf, mode="w") as gz:
            for rec in items:
                gz.write((json.dumps(rec) + "n").encode())
        buf.seek(0)
        s3.upload_fileobj(buf, S3_BUCKET, key)
        return key
    
    def fetch_page(token: str, from_date: str, to_date: str, next_page_token: str | None) -> dict:
        q = {
            "from": from_date,
            "to": to_date,
            "page_size": str(PAGE_SIZE),
        }
        if next_page_token:
            q["next_page_token"] = next_page_token
        url = REPORT_URL + "?" + urllib.parse.urlencode(q)
        req = urllib.request.Request(url, headers={
            "Authorization": f"Bearer {token}",
            "Accept": "application/json",
        })
        with _http(req) as r:
            return json.loads(r.read())
    
    def lambda_handler(event=None, context=None):
        token = get_token()
        state = get_state()
    
        cursor_date = state.get("cursor_date")  # YYYY-MM-DD
        # API requires from/to in yyyy-mm-dd, max one month per request
        from_date = cursor_date
        to_date = cursor_date
    
        total_written = 0
        next_token = state.get("next_page_token")
    
        while True:
            page = fetch_page(token, from_date, to_date, next_token)
            items = page.get("operation_logs", []) or []
            if items:
                write_chunk(items, dt.datetime.utcnow())
                total_written += len(items)
            next_token = page.get("next_page_token")
            if not next_token:
                break
    
        # Advance to next day if we've finished this date
        today = dt.date.today().isoformat()
        if cursor_date < today:
            nxt = (dt.datetime.fromisoformat(cursor_date) + dt.timedelta(days=1)).date().isoformat()
            state["cursor_date"] = nxt
            state["next_page_token"] = None
        else:
            # stay on today; continue later with next_page_token=None
            state["next_page_token"] = None
    
        put_state(state)
        return {"ok": True, "written": total_written, "date": from_date}
    
    if __name__ == "__main__":
        print(lambda_handler())
    
  2. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:

    Chave Valor de exemplo
    S3_BUCKET zoom-operation-logs
    S3_PREFIX zoom/operationlogs/
    STATE_KEY zoom/operationlogs/state.json
    ZOOM_ACCOUNT_ID <your-zoom-account-id>
    ZOOM_CLIENT_ID <your-zoom-client-id>
    ZOOM_CLIENT_SECRET <your-zoom-client-secret>
    PAGE_SIZE 300
    TIMEOUT 30
  4. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).

  5. Selecione o separador Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler.
  2. Clique em Criar programação.
  3. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (15 min).
    • Destino: a sua função Lambda zoom_operationlogs_to_s3.
    • Nome: zoom-operationlogs-schedule-15min.
  4. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: secops-reader.
    • Tipo de acesso: chave de acesso – acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::zoom-operation-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::zoom-operation-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos de operações do Zoom

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Zoom Operation Logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione Registos de operações do Zoom como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://zoom-operation-logs/zoom/operationlogs/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento de UDM Lógica
ação metadata.product_event_type O campo de registo não processado "action" está mapeado para este campo da UDM.
category_type additional.fields.key O campo de registo não processado "category_type" está mapeado para este campo UDM.
category_type additional.fields.value.string_value O campo de registo não processado "category_type" está mapeado para este campo UDM.
Departamento target.user.department O campo de registo não processado "Department" (extraído do campo "operation_detail") é mapeado para este campo UDM.
Descrição target.user.role_description O campo de registo não processado "Description" (extraído do campo "operation_detail") está mapeado para este campo da UDM.
Nome a apresentar target.user.user_display_name O campo de registo não processado "Nome a apresentar" (extraído do campo "operation_detail") está mapeado para este campo UDM.
Endereço de email target.user.email_addresses O campo de registo não processado "Endereço de email" (extraído do campo "operation_detail") é mapeado para este campo UDM.
Nome próprio target.user.first_name O campo de registo não processado "Nome próprio" (extraído do campo "operation_detail") está mapeado para este campo UDM.
Cargo target.user.title O campo de registo não processado "Job Title" (extraído do campo "operation_detail") está mapeado para este campo UDM.
Apelido target.user.last_name O campo de registo não processado "Last Name" (extraído do campo "operation_detail") está mapeado para este campo UDM.
Localização target.location.name O campo de registo não processado "Location" (extraído do campo "operation_detail") está mapeado para este campo UDM.
operation_detail metadata.description O campo de registo não processado "operation_detail" está mapeado para este campo da GDU.
operador principal.user.email_addresses O campo de registo não processado "operator" é mapeado para este campo UDM se corresponder a uma regex de email.
operador principal.user.userid O campo de registo não processado "operator" é mapeado para este campo UDM se não corresponder a uma regex de email.
Nome da sala target.user.attribute.labels.value O campo de registo não processado "Room Name" (extraído do campo "operation_detail") está mapeado para este campo UDM.
Nome da Função target.user.attribute.roles.name O campo de registo não processado "Nome da função" (extraído do campo "operation_detail") está mapeado para este campo UDM.
tempo metadata.event_timestamp.seconds O campo de registo não processado "time" é analisado e mapeado para este campo do UDM.
Tipo target.user.attribute.labels.value O campo de registo não processado "Type" (extraído do campo "operation_detail") está mapeado para este campo UDM.
Função de utilizador target.user.attribute.roles.name O campo de registo não processado "Função do utilizador" (extraído do campo "operation_detail") é mapeado para este campo da UDM.
Tipo de utilizador target.user.attribute.labels.value O campo de registo não processado "Tipo de utilizador" (extraído do campo "operation_detail") está mapeado para este campo da UDM.
metadata.log_type O valor "ZOOM_OPERATION_LOGS" é atribuído a este campo UDM.
metadata.vendor_name O valor "ZOOM" é atribuído a este campo UDM.
metadata.product_name O valor "ZOOM_OPERATION_LOGS" é atribuído a este campo UDM.
metadata.event_type O valor é determinado com base na seguinte lógica:
1. Se o campo "event_type" não estiver vazio, é usado o respetivo valor.
2. Se os campos "operator", "email" ou "email2" não estiverem vazios, o valor é definido como "USER_UNCATEGORIZED".
3. Caso contrário, o valor é definido como "GENERIC_EVENT".
json_data about.user.attribute.labels.value O campo de registo não processado "json_data" (extraído do campo "operation_detail") é analisado como JSON. Os campos "assistant" e "options" de cada elemento da matriz JSON analisada são mapeados para o campo "value" da matriz "labels" no UDM.
json_data about.user.userid O campo de registo não processado "json_data" (extraído do campo "operation_detail") é analisado como JSON. O campo "userId" de cada elemento da matriz JSON analisada (exceto o primeiro) é mapeado para o campo "userid" do objeto "about.user" no UDM.
json_data target.user.attribute.labels.value O campo de registo não processado "json_data" (extraído do campo "operation_detail") é analisado como JSON. Os campos "assistant" e "options" do primeiro elemento da matriz JSON analisada são mapeados para o campo "value" da matriz "labels" no UDM.
json_data target.user.userid O campo de registo não processado "json_data" (extraído do campo "operation_detail") é analisado como JSON. O campo "userId" do primeiro elemento da matriz JSON analisada é mapeado para o campo "userid" do objeto "target.user" no UDM.
email target.user.email_addresses O campo de registo não processado "email" (extraído do campo "operation_detail") é mapeado para este campo da UDM.
email2 target.user.email_addresses O campo de registo não processado "email2" (extraído do campo "operation_detail") é mapeado para este campo da UDM.
função target.user.attribute.roles.name O campo de registo não processado "role" (extraído do campo "operation_detail") está mapeado para este campo UDM.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.