Recolha registos do Censys

Compatível com:

Este documento explica como carregar registos do Censys para o Google Security Operations através do Amazon S3. A Censys oferece uma gestão abrangente da superfície de ataque e inteligência da Internet através da respetiva API. Esta integração permite-lhe recolher eventos de deteção de anfitriões, eventos de risco e alterações de recursos do Censys ASM e encaminhá-los para o Google SecOps para análise e monitorização. O analisador transforma os registos não processados num formato estruturado em conformidade com o UDM do Google SecOps. Extrai campos da mensagem de registo não processada, realiza conversões de tipos de dados e mapeia as informações extraídas para os campos UDM correspondentes, enriquecendo os dados com contexto e etiquetas adicionais.

Antes de começar

Certifique-se de que tem os seguintes pré-requisitos:

  • Instância do Google SecOps
  • Acesso privilegiado ao Censys ASM
  • Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)

Recolha os pré-requisitos do Censys (credenciais da API)

  1. Inicie sessão na Censys ASM Console em app.censys.io.
  2. Aceda a Integrações na parte superior da página.
  3. Copie e guarde a chave da API e o ID da organização.
  4. Tenha em atenção o URL base da API: https://api.platform.censys.io

Configure o contentor do AWS S3 e o IAM para o Google SecOps

  1. Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
  2. Guarde o nome e a região do contentor para referência futura (por exemplo, censys-logs).
  3. Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
  4. Selecione o utilizador criado.
  5. Selecione o separador Credenciais de segurança.
  6. Clique em Criar chave de acesso na secção Chaves de acesso.
  7. Selecione Serviço de terceiros como o Exemplo de utilização.
  8. Clicar em Seguinte.
  9. Opcional: adicione uma etiqueta de descrição.
  10. Clique em Criar chave de acesso.
  11. Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
  12. Clique em Concluído.
  13. Selecione o separador Autorizações.
  14. Clique em Adicionar autorizações na secção Políticas de autorizações.
  15. Selecione Adicionar autorizações.
  16. Selecione Anexar políticas diretamente
  17. Pesquise e selecione a política AmazonS3FullAccess.
  18. Clicar em Seguinte.
  19. Clique em Adicionar autorizações.

Configure a política e a função de IAM para carregamentos do S3

  1. Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
  2. Introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::censys-logs/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::censys-logs/censys/state.json"
        }
      ]
    }
    
    • Substitua censys-logs se tiver introduzido um nome de contentor diferente.
  3. Clique em Seguinte > Criar política.

  4. Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.

  5. Anexe a política recém-criada e a política gerida AWSLambdaBasicExecutionRole (para acesso aos registos do CloudWatch).

  6. Dê o nome censys-lambda-role à função e clique em Criar função.

Crie a função Lambda

  1. Na consola da AWS, aceda a Lambda > Functions > Create function.
  2. Clique em Criar do zero.
  3. Faculte os seguintes detalhes de configuração:
Definição Valor
Nome censys-data-collector
Runtime Python 3.13
Arquitetura x86_64
Função de execução censys-lambda-role
  1. Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (censys-data-collector.py):

    import json
    import boto3
    import urllib3
    import gzip
    import logging
    import os
    from datetime import datetime, timedelta, timezone
    from typing import Dict, List, Any, Optional
    from urllib.parse import urlencode
    
    # Configure logging
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # AWS S3 client
    s3_client = boto3.client('s3')
    # HTTP client
    http = urllib3.PoolManager()
    
    # Environment variables
    S3_BUCKET = os.environ['S3_BUCKET']
    S3_PREFIX = os.environ['S3_PREFIX']
    STATE_KEY = os.environ['STATE_KEY']
    CENSYS_API_KEY = os.environ['CENSYS_API_KEY']
    CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID']
    API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io')
    
    class CensysCollector:
        def __init__(self):
            self.headers = {
                'Authorization': f'Bearer {CENSYS_API_KEY}',
                'X-Organization-ID': CENSYS_ORG_ID,
                'Content-Type': 'application/json'
            }
    
        def get_last_collection_time(self) -> Optional[datetime]:
            """Get the last collection timestamp from S3 state file."""
            try:
                response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY)
                state = json.loads(response['Body'].read().decode('utf-8'))
                return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z'))
            except Exception as e:
                logger.info(f"No state file found or error reading state: {e}")
                return datetime.now(timezone.utc) - timedelta(hours=1)
    
        def save_collection_time(self, collection_time: datetime):
            """Save the current collection timestamp to S3 state file."""
            state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')}
            s3_client.put_object(
                Bucket=S3_BUCKET,
                Key=STATE_KEY,
                Body=json.dumps(state),
                ContentType='application/json'
            )
    
        def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]:
            """Collect logbook events from Censys ASM API using cursor-based pagination."""
            events = []
            url = f"{API_BASE}/v3/logbook"
    
            # Use cursor-based pagination as per Censys API documentation
            params = {}
            if cursor:
                params['cursor'] = cursor
    
            try:
                query_string = urlencode(params) if params else ''
                full_url = f"{url}?{query_string}" if query_string else url
    
                response = http.request('GET', full_url, headers=self.headers)
    
                if response.status != 200:
                    logger.error(f"API request failed with status {response.status}: {response.data}")
                    return []
    
                data = json.loads(response.data.decode('utf-8'))
                events.extend(data.get('logbook_entries', []))
    
                # Handle cursor-based pagination
                next_cursor = data.get('next_cursor')
                if next_cursor:
                    events.extend(self.collect_logbook_events(next_cursor))
    
                logger.info(f"Collected {len(events)} logbook events")
                return events
    
            except Exception as e:
                logger.error(f"Error collecting logbook events: {e}")
                return []
    
        def collect_risks_events(self) -> List[Dict[str, Any]]:
            """Collect risk events from Censys ASM API."""
            events = []
            url = f"{API_BASE}/v3/risks"
    
            try:
                response = http.request('GET', url, headers=self.headers)
    
                if response.status != 200:
                    logger.error(f"API request failed with status {response.status}: {response.data}")
                    return []
    
                data = json.loads(response.data.decode('utf-8'))
                events.extend(data.get('risks', []))
    
                logger.info(f"Collected {len(events)} risk events")
                return events
    
            except Exception as e:
                logger.error(f"Error collecting risk events: {e}")
                return []
    
        def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str):
            """Save events to S3 in compressed NDJSON format."""
            if not events:
                return
    
            timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')
            filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz"
    
            try:
                # Convert events to newline-delimited JSON
                ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events)
    
                # Compress with gzip
                gz_bytes = gzip.compress(ndjson_content.encode('utf-8'))
    
                s3_client.put_object(
                    Bucket=S3_BUCKET,
                    Key=filename,
                    Body=gz_bytes,
                    ContentType='application/gzip',
                    ContentEncoding='gzip'
                )
    
                logger.info(f"Saved {len(events)} {event_type} events to {filename}")
    
            except Exception as e:
                logger.error(f"Error saving {event_type} events to S3: {e}")
                raise
    
    def lambda_handler(event, context):
        """AWS Lambda handler function."""
        try:
            collector = CensysCollector()
    
            # Get last collection time for cursor state management
            last_collection_time = collector.get_last_collection_time()
            current_time = datetime.now(timezone.utc)
    
            logger.info(f"Collecting events since {last_collection_time}")
    
            # Collect different types of events
            logbook_events = collector.collect_logbook_events()
            risk_events = collector.collect_risks_events()
    
            # Save events to S3
            collector.save_events_to_s3(logbook_events, 'logbook')
            collector.save_events_to_s3(risk_events, 'risks')
    
            # Update state
            collector.save_collection_time(current_time)
    
            return {
                'statusCode': 200,
                'body': json.dumps({
                    'message': 'Censys data collection completed successfully',
                    'logbook_events': len(logbook_events),
                    'risk_events': len(risk_events),
                    'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ')
                })
            }
    
        except Exception as e:
            logger.error(f"Lambda execution failed: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps({
                    'error': str(e)
                })
            }
    
  2. Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.

  3. Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:

    Chave Valor de exemplo
    S3_BUCKET censys-logs
    S3_PREFIX censys/
    STATE_KEY censys/state.json
    CENSYS_API_KEY <your-censys-api-key>
    CENSYS_ORG_ID <your-organization-id>
    API_BASE https://api.platform.censys.io
  4. Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).

  5. Selecione o separador Configuração.

  6. No painel Configuração geral, clique em Editar.

  7. Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.

Crie um horário do EventBridge

  1. Aceda a Amazon EventBridge > Scheduler > Create schedule.
  2. Indique os seguintes detalhes de configuração:
    • Agenda recorrente: Taxa (1 hour).
    • Alvo: a sua função Lambda censys-data-collector.
    • Nome: censys-data-collector-1h.
  3. Clique em Criar programação.

Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps

  1. Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
  2. Clique em Adicionar utilizadores.
  3. Indique os seguintes detalhes de configuração:
    • Utilizador: secops-reader.
    • Tipo de acesso: chave de acesso – acesso programático.
  4. Clique em Criar utilizador.
  5. Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
  6. No editor JSON, introduza a seguinte política:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::censys-logs/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::censys-logs"
        }
      ]
    }
    
  7. Defina o nome como secops-reader-policy.

  8. Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.

  9. Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.

  10. Transfira o CSV (estes valores são introduzidos no feed).

Configure um feed no Google SecOps para carregar registos do Censys

  1. Aceda a Definições do SIEM > Feeds.
  2. Clique em + Adicionar novo feed.
  3. No campo Nome do feed, introduza um nome para o feed (por exemplo, Censys logs).
  4. Selecione Amazon S3 V2 como o Tipo de origem.
  5. Selecione CENSYS como o Tipo de registo.
  6. Clicar em Seguinte.
  7. Especifique valores para os seguintes parâmetros de entrada:
    • URI do S3: s3://censys-logs/censys/
    • Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
    • Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
    • ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
    • Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
    • Espaço de nomes do recurso: o espaço de nomes do recurso.
    • Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
  8. Clicar em Seguinte.
  9. Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.

Tabela de mapeamento da UDM

Campo de registo Mapeamento do UDM Lógica
assetId read_only_udm.principal.asset.hostname Se o campo assetId não for um endereço IP, é mapeado para principal.asset.hostname.
assetId read_only_udm.principal.asset.ip Se o campo assetId for um endereço IP, é mapeado para principal.asset.ip.
assetId read_only_udm.principal.hostname Se o campo assetId não for um endereço IP, é mapeado para principal.hostname.
assetId read_only_udm.principal.ip Se o campo assetId for um endereço IP, é mapeado para principal.ip.
associatedAt read_only_udm.security_result.detection_fields.value O campo associatedAt está mapeado para security_result.detection_fields.value.
autonomousSystem.asn read_only_udm.additional.fields.value.string_value O campo autonomousSystem.asn é convertido numa string e mapeado para additional.fields.value.string_value com a chave "autonomousSystem_asn".
autonomousSystem.bgpPrefix read_only_udm.additional.fields.value.string_value O campo autonomousSystem.bgpPrefix é mapeado para additional.fields.value.string_value com a chave "autonomousSystem_bgpPrefix".
faixa read_only_udm.principal.resource.attribute.labels.value O campo do banner está mapeado para principal.resource.attribute.labels.value com a chave "banner".
nuvem read_only_udm.metadata.vendor_name O campo da nuvem está mapeado para metadata.vendor_name.
comments.refUrl read_only_udm.network.http.referral_url O campo comments.refUrl está mapeado para network.http.referral_url.
data.cve read_only_udm.additional.fields.value.string_value O campo data.cve está mapeado para additional.fields.value.string_value com a chave "data_cve".
data.cvss read_only_udm.additional.fields.value.string_value O campo data.cvss é mapeado para additional.fields.value.string_value com a chave "data_cvss".
data.ipAddress read_only_udm.principal.asset.ip Se o campo data.ipAddress não for igual ao campo assetId, é mapeado para principal.asset.ip.
data.ipAddress read_only_udm.principal.ip Se o campo data.ipAddress não for igual ao campo assetId, é mapeado para principal.ip.
data.location.city read_only_udm.principal.location.city Se o campo location.city estiver vazio, o campo data.location.city é mapeado para principal.location.city.
data.location.countryCode read_only_udm.principal.location.country_or_region Se o campo location.country estiver vazio, o campo data.location.countryCode é mapeado para principal.location.country_or_region.
data.location.latitude read_only_udm.principal.location.region_coordinates.latitude Se os campos location.coordinates.latitude e location.geoCoordinates.latitude estiverem vazios, o campo data.location.latitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.latitude.
data.location.longitude read_only_udm.principal.location.region_coordinates.longitude Se os campos location.coordinates.longitude e location.geoCoordinates.longitude estiverem vazios, o campo data.location.longitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.longitude.
data.location.province read_only_udm.principal.location.state Se o campo location.province estiver vazio, o campo data.location.province é mapeado para principal.location.state.
data.mailServers read_only_udm.additional.fields.value.list_value.values.string_value Cada elemento na matriz data.mailServers é mapeado para uma entrada additional.fields separada com a chave "Servidores de correio" e value.list_value.values.string_value definida para o valor do elemento.
data.names.forwardDns[].name read_only_udm.network.dns.questions.name Cada elemento na matriz data.names.forwardDns é mapeado para uma entrada network.dns.questions separada com o campo de nome definido para o campo de nome do elemento.
data.nameServers read_only_udm.additional.fields.value.list_value.values.string_value Cada elemento na matriz data.nameServers é mapeado para uma entrada additional.fields separada com a chave "Name nameServers" e value.list_value.values.string_value definida para o valor do elemento.
data.protocols[].transportProtocol read_only_udm.network.ip_protocol Se o campo data.protocols[].transportProtocol for um dos seguintes: TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP ou VRRP, é mapeado para network.ip_protocol.
data.protocols[].transportProtocol read_only_udm.principal.resource.attribute.labels.value O campo data.protocols[].transportProtocol é mapeado para principal.resource.attribute.labels.value com a chave "data_protocols {index}".
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.network.http.user_agent Se o campo http.request.headers[].key for "User-Agent", o campo http.request.headers[].value.headers.0 correspondente é mapeado para network.http.user_agent.
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.network.http.parsed_user_agent Se o campo http.request.headers[].key for "User-Agent", o campo http.request.headers[].value.headers.0 correspondente é analisado como uma string do agente do utilizador e mapeado para network.http.parsed_user_agent.
http.request.headers[].key, http.request.headers[].value.headers.0 read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value Para cada elemento na matriz http.request.headers, o campo key é mapeado para principal.resource.attribute.labels.key e o campo value.headers.0 é mapeado para principal.resource.attribute.labels.value.
http.request.uri read_only_udm.principal.asset.hostname A parte do nome do anfitrião do campo http.request.uri é extraída e mapeada para principal.asset.hostname.
http.request.uri read_only_udm.principal.hostname A parte do nome do anfitrião do campo http.request.uri é extraída e mapeada para principal.hostname.
http.response.body read_only_udm.principal.resource.attribute.labels.value O campo http.response.body é mapeado para principal.resource.attribute.labels.value com a chave "http_response_body".
http.response.headers[].key, http.response.headers[].value.headers.0 read_only_udm.target.hostname Se o campo http.response.headers[].key for "Server", o campo http.response.headers[].value.headers.0 correspondente é mapeado para target.hostname.
http.response.headers[].key, http.response.headers[].value.headers.0 read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value Para cada elemento na matriz http.response.headers, o campo key é mapeado para principal.resource.attribute.labels.key e o campo value.headers.0 é mapeado para principal.resource.attribute.labels.value.
http.response.statusCode read_only_udm.network.http.response_code O campo http.response.statusCode é convertido num número inteiro e mapeado para network.http.response_code.
ip read_only_udm.target.asset.ip O campo ip está mapeado para target.asset.ip.
ip read_only_udm.target.ip O campo ip está mapeado para target.ip.
isSeed read_only_udm.additional.fields.value.string_value O campo isSeed é convertido numa string e mapeado para additional.fields.value.string_value com a chave "isSeed".
location.city read_only_udm.principal.location.city O campo location.city está mapeado para principal.location.city.
location.continent read_only_udm.additional.fields.value.string_value O campo location.continent é mapeado para additional.fields.value.string_value com a chave "location_continent".
location.coordinates.latitude read_only_udm.principal.location.region_coordinates.latitude O campo location.coordinates.latitude é convertido num valor de vírgula flutuante e mapeado para principal.location.region_coordinates.latitude.
location.coordinates.longitude read_only_udm.principal.location.region_coordinates.longitude O campo location.coordinates.longitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.longitude.
location.country read_only_udm.principal.location.country_or_region O campo location.country está mapeado para principal.location.country_or_region.
location.geoCoordinates.latitude read_only_udm.principal.location.region_coordinates.latitude Se o campo location.coordinates.latitude estiver vazio, o campo location.geoCoordinates.latitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.latitude.
location.geoCoordinates.longitude read_only_udm.principal.location.region_coordinates.longitude Se o campo location.coordinates.longitude estiver vazio, o campo location.geoCoordinates.longitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.longitude.
location.postalCode read_only_udm.additional.fields.value.string_value O campo location.postalCode é mapeado para additional.fields.value.string_value com a chave "Código postal".
location.province read_only_udm.principal.location.state O campo location.province está mapeado para principal.location.state.
operação read_only_udm.security_result.action_details O campo de operação está mapeado para security_result.action_details.
perspectiveId read_only_udm.principal.group.product_object_id O campo perspectiveId está mapeado para principal.group.product_object_id.
porta read_only_udm.principal.port O campo de porta é convertido num número inteiro e mapeado para principal.port.
risks[].severity, risks[].title read_only_udm.security_result.category_details O campo risks[].severity é concatenado com o campo risks[].title e mapeado para security_result.category_details.
serviceName read_only_udm.network.application_protocol Se o campo serviceName for "HTTP" ou "HTTPS", é mapeado para network.application_protocol.
sourceIp read_only_udm.principal.asset.ip O campo sourceIp está mapeado para principal.asset.ip.
sourceIp read_only_udm.principal.ip O campo sourceIp está mapeado para principal.ip.
timestamp read_only_udm.metadata.event_timestamp O campo de data/hora é analisado como uma data/hora e mapeado para metadata.event_timestamp.
transportFingerprint.id read_only_udm.metadata.product_log_id O campo transportFingerprint.id é convertido numa string e mapeado para metadata.product_log_id.
transportFingerprint.raw read_only_udm.additional.fields.value.string_value O campo transportFingerprint.raw é mapeado para additional.fields.value.string_value com a chave "transportFingerprint_raw".
escrever read_only_udm.metadata.product_event_type O campo type está mapeado para metadata.product_event_type.
- read_only_udm.metadata.product_name O valor "CENSYS_ASM" é atribuído a metadata.product_name.
- read_only_udm.metadata.vendor_name O valor "CENSYS" é atribuído a metadata.vendor_name.
- read_only_udm.metadata.event_type O tipo de evento é determinado com base na presença de campos específicos: NETWORK_CONNECTION se has_princ_machine_id e has_target_machine forem verdadeiros e has_network_flow for falso, NETWORK_DNS se has_network_flow for verdadeiro, STATUS_UPDATE se has_princ_machine_id for verdadeiro e GENERIC_EVENT caso contrário.

Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.