Coletar registros do Censys

Compatível com:

Este documento explica como ingerir registros do Censys no Google Security Operations usando o Amazon S3. A Censys oferece gerenciamento abrangente de superfície de ataque e inteligência da Internet pela API. Com essa integração, é possível coletar eventos de descoberta de host, eventos de risco e mudanças de recursos do Censys ASM e encaminhá-los para o Google SecOps para análise e monitoramento. O analisador transforma registros brutos em um formato estruturado de acordo com o UDM do Google SecOps. Ele extrai campos da mensagem de registro bruta, realiza conversões de tipo de dados e mapeia as informações extraídas para os campos correspondentes do UDM, enriquecendo os dados com contexto e rótulos adicionais.

Antes de começar

Verifique se você tem os pré-requisitos a seguir:

  • Instância do Google SecOps
  • Acesso privilegiado ao Censys ASM
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Coletar pré-requisitos do Censys (credenciais da API)

  1. Faça login no console do Censys ASM em app.censys.io.
  2. Acesse Integrações na parte de cima da página.
  3. Copie e salve a chave de API e o ID da organização.
  4. Anote o URL base da API: https://api.platform.censys.io

Configurar o bucket do AWS S3 e o IAM para o Google SecOps

  1. Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
  2. Salve o Nome e a Região do bucket para referência futura (por exemplo, censys-logs).
  3. Crie um usuário seguindo este guia: Como criar um usuário do IAM.
  4. Selecione o usuário criado.
  5. Selecione a guia Credenciais de segurança.
  6. Clique em Criar chave de acesso na seção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Caso de uso.
  8. Clique em Próxima.
  9. Opcional: adicione uma tag de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
  12. Clique em Concluído.
  13. Selecione a guia Permissões.
  14. Clique em Adicionar permissões na seção Políticas de permissões.
  15. Selecione Adicionar permissões.
  16. Selecione Anexar políticas diretamente.
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clique em Próxima.
  19. Clique em Adicionar permissões

Configurar a política e o papel do IAM para uploads do S3

  1. No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
  2. Insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::censys-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::censys-logs/censys/state.json"
        }
      ]
    }
    
    • Substitua censys-logs se você tiver inserido um nome de bucket diferente.
  3. Clique em Próxima > Criar política.

  4. Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.

  5. Anexe a política recém-criada e a política gerenciada AWSLambdaBasicExecutionRole (para acesso aos registros do CloudWatch).

  6. Nomeie a função como censys-lambda-role e clique em Criar função.

Criar a função Lambda

  1. No console da AWS, acesse Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Informe os seguintes detalhes de configuração:
Configuração Valor
Nome censys-data-collector
Ambiente de execução Python 3.13
Arquitetura x86_64
Função de execução censys-lambda-role
  1. Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (censys-data-collector.py):

    import json
    import boto3
    import urllib3
    import gzip
    import logging
    import os
    from datetime import datetime, timedelta, timezone
    from typing import Dict, List, Any, Optional
    from urllib.parse import urlencode
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # AWS S3 client
    s3_client = boto3.client('s3')
    # HTTP client
    http = urllib3.PoolManager()
    
    # Environment variables
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ['S3_PREFIX']
    STATE_KEY = os.environ['STATE_KEY']
    CENSYS_API_KEY = os.environ['CENSYS_API_KEY']
    CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID']
    API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io')
    
    class CensysCollector:
        def __init__(self):
            self.headers = {
                'Authorization': f'Bearer {CENSYS_API_KEY}',
                'X-Organization-ID': CENSYS_ORG_ID,
                'Content-Type': 'application/json'
            }
    
        def get_last_collection_time(self) -> Optional[datetime]:
            """Get the last collection timestamp from S3 state file."""
            try:
                response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
                state = json.loads(response['Body'].read().decode('utf-8'))
                return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z'))
            except Exception as e:
                logger.info(f"No state file found or error reading state: {e}")
                return datetime.now(timezone.utc) - timedelta(hours=1)
    
        def save_collection_time(self, collection_time: datetime):
            """Save the current collection timestamp to S3 state file."""
            state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')}
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state),
                ContentType='application/json'
            )
    
        def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]:
            """Collect logbook events from Censys ASM API using cursor-based pagination."""
            events = []
            url = f"{API_BASE}/v3/logbook"
    
            # Use cursor-based pagination as per Censys API documentation
            params = {}
            if cursor:
                params['cursor'] = cursor
    
            try:
                query_string = urlencode(params) if params else ''
                full_url = f"{url}?{query_string}" if query_string else url
    
                response = http.request('GET', full_url, headers=self.headers)
    
                if response.status != 200:
                    logger.error(f"API request failed with status {response.status}: {response.data}")
                    return []
    
                data = json.loads(response.data.decode('utf-8'))
                events.extend(data.get('logbook_entries', []))
    
                # Handle cursor-based pagination
                next_cursor = data.get('next_cursor')
                if next_cursor:
                    events.extend(self.collect_logbook_events(next_cursor))
    
                logger.info(f"Collected {len(events)} logbook events")
                return events
    
            except Exception as e:
                logger.error(f"Error collecting logbook events: {e}")
                return []
    
        def collect_risks_events(self) -> List[Dict[str, Any]]:
            """Collect risk events from Censys ASM API."""
            events = []
            url = f"{API_BASE}/v3/risks"
    
            try:
                response = http.request('GET', url, headers=self.headers)
    
                if response.status != 200:
                    logger.error(f"API request failed with status {response.status}: {response.data}")
                    return []
    
                data = json.loads(response.data.decode('utf-8'))
                events.extend(data.get('risks', []))
    
                logger.info(f"Collected {len(events)} risk events")
                return events
    
            except Exception as e:
                logger.error(f"Error collecting risk events: {e}")
                return []
    
        def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str):
            """Save events to S3 in compressed NDJSON format."""
            if not events:
                return
    
            timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
            filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz"
    
            try:
                # Convert events to newline-delimited JSON
                ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events)
    
                # Compress with gzip
                gz_bytes = gzip.compress(ndjson_content.encode('utf-8'))
    
                s3_client.put_object(
                    Bucket=S3_BUCKET,
                    Key=filename,
                    Body=gz_bytes,
                    ContentType='application/gzip',
                    ContentEncoding='gzip'
                )
    
                logger.info(f"Saved {len(events)} {event_type} events to {filename}")
    
            except Exception as e:
                logger.error(f"Error saving {event_type} events to S3: {e}")
                raise
    
    def lambda_handler(event, context):
        """AWS Lambda handler function."""
        try:
            collector = CensysCollector()
    
            # Get last collection time for cursor state management
            last_collection_time = collector.get_last_collection_time()
            current_time = datetime.now(timezone.utc)
    
            logger.info(f"Collecting events since {last_collection_time}")
    
            # Collect different types of events
            logbook_events = collector.collect_logbook_events()
            risk_events = collector.collect_risks_events()
    
            # Save events to S3
            collector.save_events_to_s3(logbook_events, 'logbook')
            collector.save_events_to_s3(risk_events, 'risks')
    
            # Update state
            collector.save_collection_time(current_time)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Censys data collection completed successfully',
                    'logbook_events': len(logbook_events),
                    'risk_events': len(risk_events),
                    'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ')
                })
            }
    
        except Exception as e:
            logger.error(f"Lambda execution failed: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }
    
  2. Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Insira as seguintes variáveis de ambiente, substituindo pelos seus valores:

    Chave Valor de exemplo
    S3_BUCKET censys-logs
    S3_PREFIX censys/
    STATE_KEY censys/state.json
    CENSYS_API_KEY <your-censys-api-key>
    CENSYS_ORG_ID <your-organization-id>
    API_BASE https://api.platform.censys.io
  4. Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.

  5. Selecione a guia Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.

Criar uma programação do EventBridge

  1. Acesse Amazon EventBridge > Scheduler > Criar programação.
  2. Informe os seguintes detalhes de configuração:
    • Programação recorrente: Taxa (1 hour).
    • Destino: sua função Lambda censys-data-collector.
    • Nome: censys-data-collector-1h.
  3. Clique em Criar programação.

Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps

  1. No console da AWS, acesse IAM > Usuários > Adicionar usuários.
  2. Clique em Add users.
  3. Informe os seguintes detalhes de configuração:
    • Usuário: secops-reader.
    • Tipo de acesso: Chave de acesso — Acesso programático.
  4. Clique em Criar usuário.
  5. Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
  6. No editor JSON, insira a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::censys-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::censys-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.

  9. Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Faça o download do CSV (esses valores são inseridos no feed).

Configurar um feed no Google SecOps para ingerir registros do Censys

  1. Acesse Configurações do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, insira um nome para o feed (por exemplo, Censys logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione CENSYS como o Tipo de registro.
  6. Clique em Próxima.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://censys-logs/censys/
    • Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
    • Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
    • ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
    • Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
    • Namespace do recurso: o namespace do recurso.
    • Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
  8. Clique em Próxima.
  9. Revise a nova configuração do feed na tela Finalizar e clique em Enviar.

Tabela de mapeamento do UDM

Campo de registro Mapeamento do UDM Lógica
assetId read_only_udm.principal.asset.hostname Se o campo "assetId" não for um endereço IP, ele será mapeado para "principal.asset.hostname".
assetId read_only_udm.principal.asset.ip Se o campo "assetId" for um endereço IP, ele será mapeado para "principal.asset.ip".
assetId read_only_udm.principal.hostname Se o campo "assetId" não for um endereço IP, ele será mapeado para "principal.hostname".
assetId read_only_udm.principal.ip Se o campo "assetId" for um endereço IP, ele será mapeado para "principal.ip".
associatedAt read_only_udm.security_result.detection_fields.value O campo "associatedAt" é mapeado para "security_result.detection_fields.value".
autonomousSystem.asn read_only_udm.additional.fields.value.string_value O campo "autonomousSystem.asn" é convertido em uma string e mapeado para "additional.fields.value.string_value" com a chave "autonomousSystem_asn".
autonomousSystem.bgpPrefix read_only_udm.additional.fields.value.string_value O campo "autonomousSystem.bgpPrefix" é mapeado para "additional.fields.value.string_value" com a chave "autonomousSystem_bgpPrefix".
banner read_only_udm.principal.resource.attribute.labels.value O campo de banner é mapeado para principal.resource.attribute.labels.value com a chave "banner".
nuvem read_only_udm.metadata.vendor_name O campo da nuvem é mapeado para "metadata.vendor_name".
comments.refUrl read_only_udm.network.http.referral_url O campo "comments.refUrl" é mapeado para "network.http.referral_url".
data.cve read_only_udm.additional.fields.value.string_value O campo "data.cve" é mapeado para "additional.fields.value.string_value" com a chave "data_cve".
data.cvss read_only_udm.additional.fields.value.string_value O campo "data.cvss" é mapeado para "additional.fields.value.string_value" com a chave "data_cvss".
data.ipAddress read_only_udm.principal.asset.ip Se o campo "data.ipAddress" não for igual ao campo "assetId", ele será mapeado para "principal.asset.ip".
data.ipAddress read_only_udm.principal.ip Se o campo "data.ipAddress" não for igual ao campo "assetId", ele será mapeado para "principal.ip".
data.location.city read_only_udm.principal.location.city Se o campo "location.city" estiver vazio, o campo "data.location.city" será mapeado para "principal.location.city".
data.location.countryCode read_only_udm.principal.location.country_or_region Se o campo "location.country" estiver vazio, o campo "data.location.countryCode" será mapeado para "principal.location.country_or_region".
data.location.latitude read_only_udm.principal.location.region_coordinates.latitude Se os campos "location.coordinates.latitude" e "location.geoCoordinates.latitude" estiverem vazios, o campo "data.location.latitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.latitude".
data.location.longitude read_only_udm.principal.location.region_coordinates.longitude Se os campos "location.coordinates.longitude" e "location.geoCoordinates.longitude" estiverem vazios, o campo "data.location.longitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.longitude".
data.location.province read_only_udm.principal.location.state Se o campo "location.province" estiver vazio, o campo "data.location.province" será mapeado para "principal.location.state".
data.mailServers read_only_udm.additional.fields.value.list_value.values.string_value Cada elemento na matriz data.mailServers é mapeado para uma entrada additional.fields separada com a chave "Servidores de e-mail" e value.list_value.values.string_value definido como o valor do elemento.
data.names.forwardDns[].name read_only_udm.network.dns.questions.name Cada elemento na matriz data.names.forwardDns é mapeado para uma entrada network.dns.questions separada com o campo "name" definido como o campo "name" do elemento.
data.nameServers read_only_udm.additional.fields.value.list_value.values.string_value Cada elemento na matriz data.nameServers é mapeado para uma entrada additional.fields separada com a chave "Name nameServers" e value.list_value.values.string_value definido como o valor do elemento.
data.protocols[].transportProtocol read_only_udm.network.ip_protocol Se o campo data.protocols[].transportProtocol for um dos seguintes: TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP ou VRRP, ele será mapeado para network.ip_protocol.
data.protocols[].transportProtocol read_only_udm.principal.resource.attribute.labels.value O campo "data.protocols[].transportProtocol" é mapeado para "principal.resource.attribute.labels.value" com a chave "data_protocols {index}".
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.network.http.user_agent Se o campo http.request.headers[].key for "User-Agent", o campo correspondente http.request.headers[].value.headers.0 será mapeado para network.http.user_agent.
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.network.http.parsed_user_agent Se o campo "http.request.headers[].key" for "User-Agent", o campo "http.request.headers[].value.headers.0" correspondente será analisado como uma string de user agent e mapeado para "network.http.parsed_user_agent".
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value Para cada elemento na matriz "http.request.headers", o campo "key" é mapeado para "principal.resource.attribute.labels.key", e o campo "value.headers.0" é mapeado para "principal.resource.attribute.labels.value".
http.request.uri read_only_udm.principal.asset.hostname A parte do nome do host do campo http.request.uri é extraída e mapeada para principal.asset.hostname.
http.request.uri read_only_udm.principal.hostname A parte do nome do host do campo http.request.uri é extraída e mapeada para principal.hostname.
http.response.body read_only_udm.principal.resource.attribute.labels.value O campo "http.response.body" é mapeado para "principal.resource.attribute.labels.value" com a chave "http_response_body".
http.response.headers[].key, http.response.headers[].value.headers.0 read_only_udm.target.hostname Se o campo "key" de http.response.headers[] for "Server", o campo correspondente http.response.headers[].value.headers.0 será mapeado para target.hostname.
http.response.headers[].key, http.response.headers[].value.headers.0 read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value Para cada elemento na matriz "http.response.headers", o campo "key" é mapeado para "principal.resource.attribute.labels.key", e o campo "value.headers.0" é mapeado para "principal.resource.attribute.labels.value".
http.response.statusCode read_only_udm.network.http.response_code O campo "http.response.statusCode" é convertido em um número inteiro e mapeado para "network.http.response_code".
ip read_only_udm.target.asset.ip O campo "ip" é mapeado para "target.asset.ip".
ip read_only_udm.target.ip O campo "ip" é mapeado para "target.ip".
isSeed read_only_udm.additional.fields.value.string_value O campo "isSeed" é convertido em uma string e mapeado para "additional.fields.value.string_value" com a chave "isSeed".
location.city read_only_udm.principal.location.city O campo "location.city" é mapeado para "principal.location.city".
location.continent read_only_udm.additional.fields.value.string_value O campo "location.continent" é mapeado para "additional.fields.value.string_value" com a chave "location_continent".
location.coordinates.latitude read_only_udm.principal.location.region_coordinates.latitude O campo "location.coordinates.latitude" é convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.latitude".
location.coordinates.longitude read_only_udm.principal.location.region_coordinates.longitude O campo "location.coordinates.longitude" é convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.longitude".
location.country read_only_udm.principal.location.country_or_region O campo "location.country" é mapeado para "principal.location.country_or_region".
location.geoCoordinates.latitude read_only_udm.principal.location.region_coordinates.latitude Se o campo "location.coordinates.latitude" estiver vazio, o campo "location.geoCoordinates.latitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.latitude".
location.geoCoordinates.longitude read_only_udm.principal.location.region_coordinates.longitude Se o campo "location.coordinates.longitude" estiver vazio, o campo "location.geoCoordinates.longitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.longitude".
location.postalCode read_only_udm.additional.fields.value.string_value O campo "location.postalCode" é mapeado para "additional.fields.value.string_value" com a chave "Postal code".
location.province read_only_udm.principal.location.state O campo "location.province" é mapeado para "principal.location.state".
operação read_only_udm.security_result.action_details O campo "operation" é mapeado para "security_result.action_details".
perspectiveId read_only_udm.principal.group.product_object_id O campo "perspectiveId" é mapeado para "principal.group.product_object_id".
porta read_only_udm.principal.port O campo de porta é convertido em um número inteiro e mapeado para principal.port.
risks[].severity, risks[].title read_only_udm.security_result.category_details O campo risks[].severity é concatenado com o campo risks[].title e mapeado para security_result.category_details.
serviceName read_only_udm.network.application_protocol Se o campo "serviceName" for "HTTP" ou "HTTPS", ele será mapeado para "network.application_protocol".
sourceIp read_only_udm.principal.asset.ip O campo "sourceIp" é mapeado para "principal.asset.ip".
sourceIp read_only_udm.principal.ip O campo "sourceIp" é mapeado para "principal.ip".
timestamp read_only_udm.metadata.event_timestamp O campo de carimbo de data/hora é analisado como um carimbo de data/hora e mapeado para "metadata.event_timestamp".
transportFingerprint.id read_only_udm.metadata.product_log_id O campo transportFingerprint.id é convertido em uma string e mapeado para metadata.product_log_id.
transportFingerprint.raw read_only_udm.additional.fields.value.string_value O campo "transportFingerprint.raw" é mapeado para "additional.fields.value.string_value" com a chave "transportFingerprint_raw".
tipo read_only_udm.metadata.product_event_type O campo "type" é mapeado para "metadata.product_event_type".
- read_only_udm.metadata.product_name O valor "CENSYS_ASM" é atribuído a metadata.product_name.
- read_only_udm.metadata.vendor_name O valor "CENSYS" é atribuído a metadata.vendor_name.
- read_only_udm.metadata.event_type O tipo de evento é determinado com base na presença de campos específicos: NETWORK_CONNECTION se has_princ_machine_id e has_target_machine forem verdadeiros e has_network_flow for falso, NETWORK_DNS se has_network_flow for verdadeiro, STATUS_UPDATE se has_princ_machine_id for verdadeiro e GENERIC_EVENT caso contrário.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.