Recolha registos do Censys
Este documento explica como carregar registos do Censys para o Google Security Operations através do Amazon S3. A Censys oferece uma gestão abrangente da superfície de ataque e inteligência da Internet através da respetiva API. Esta integração permite-lhe recolher eventos de deteção de anfitriões, eventos de risco e alterações de recursos do Censys ASM e encaminhá-los para o Google SecOps para análise e monitorização. O analisador transforma os registos não processados num formato estruturado em conformidade com o UDM do Google SecOps. Extrai campos da mensagem de registo não processada, realiza conversões de tipos de dados e mapeia as informações extraídas para os campos UDM correspondentes, enriquecendo os dados com contexto e etiquetas adicionais.
Antes de começar
Certifique-se de que tem os seguintes pré-requisitos:
- Instância do Google SecOps
- Acesso privilegiado ao Censys ASM
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Recolha os pré-requisitos do Censys (credenciais da API)
- Inicie sessão na Censys ASM Console em
app.censys.io
. - Aceda a Integrações na parte superior da página.
- Copie e guarde a chave da API e o ID da organização.
- Tenha em atenção o URL base da API:
https://api.platform.censys.io
Configure o contentor do AWS S3 e o IAM para o Google SecOps
- Crie um contentor do Amazon S3 seguindo este manual do utilizador: Criar um contentor
- Guarde o nome e a região do contentor para referência futura (por exemplo,
censys-logs
). - Crie um utilizador seguindo este guia do utilizador: Criar um utilizador do IAM.
- Selecione o utilizador criado.
- Selecione o separador Credenciais de segurança.
- Clique em Criar chave de acesso na secção Chaves de acesso.
- Selecione Serviço de terceiros como o Exemplo de utilização.
- Clicar em Seguinte.
- Opcional: adicione uma etiqueta de descrição.
- Clique em Criar chave de acesso.
- Clique em Transferir ficheiro CSV para guardar a chave de acesso e a chave de acesso secreta para utilização posterior.
- Clique em Concluído.
- Selecione o separador Autorizações.
- Clique em Adicionar autorizações na secção Políticas de autorizações.
- Selecione Adicionar autorizações.
- Selecione Anexar políticas diretamente
- Pesquise e selecione a política AmazonS3FullAccess.
- Clicar em Seguinte.
- Clique em Adicionar autorizações.
Configure a política e a função de IAM para carregamentos do S3
- Na consola da AWS, aceda a IAM > Políticas > Criar política > separador JSON.
Introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::censys-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- Substitua
censys-logs
se tiver introduzido um nome de contentor diferente.
- Substitua
Clique em Seguinte > Criar política.
Aceda a IAM > Funções > Criar função > Serviço AWS > Lambda.
Anexe a política recém-criada e a política gerida AWSLambdaBasicExecutionRole (para acesso aos registos do CloudWatch).
Dê o nome
censys-lambda-role
à função e clique em Criar função.
Crie a função Lambda
- Na consola da AWS, aceda a Lambda > Functions > Create function.
- Clique em Criar do zero.
- Faculte os seguintes detalhes de configuração:
Definição | Valor |
---|---|
Nome | censys-data-collector |
Runtime | Python 3.13 |
Arquitetura | x86_64 |
Função de execução | censys-lambda-role |
Depois de criar a função, abra o separador Código, elimine o fragmento e introduza o seguinte código (
censys-data-collector.py
):import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client s3_client = boto3.client('s3') # HTTP client http = urllib3.PoolManager() # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] CENSYS_API_KEY = os.environ['CENSYS_API_KEY'] CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID'] API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io') class CensysCollector: def __init__(self): self.headers = { 'Authorization': f'Bearer {CENSYS_API_KEY}', 'X-Organization-ID': CENSYS_ORG_ID, 'Content-Type': 'application/json' } def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from S3 state file.""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state = json.loads(response['Body'].read().decode('utf-8')) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: logger.info(f"No state file found or error reading state: {e}") return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to S3 state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state), ContentType='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{API_BASE}/v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) logger.info(f"Collected {len(events)} logbook events") return events except Exception as e: logger.error(f"Error collecting logbook events: {e}") return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{API_BASE}/v3/risks" try: response = http.request('GET', url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) logger.info(f"Collected {len(events)} risk events") return events except Exception as e: logger.error(f"Error collecting risk events: {e}") return [] def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str): """Save events to S3 in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=gz_bytes, ContentType='application/gzip', ContentEncoding='gzip' ) logger.info(f"Saved {len(events)} {event_type} events to {filename}") except Exception as e: logger.error(f"Error saving {event_type} events to S3: {e}") raise def lambda_handler(event, context): """AWS Lambda handler function.""" try: collector = CensysCollector() # Get last collection time for cursor state management last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) logger.info(f"Collecting events since {last_collection_time}") # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to S3 collector.save_events_to_s3(logbook_events, 'logbook') collector.save_events_to_s3(risk_events, 'risks') # Update state collector.save_collection_time(current_time) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Censys data collection completed successfully', 'logbook_events': len(logbook_events), 'risk_events': len(risk_events), 'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') }) } except Exception as e: logger.error(f"Lambda execution failed: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) }
Aceda a Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Introduza as seguintes variáveis de ambiente, substituindo-as pelos seus valores:
Chave Valor de exemplo S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
Depois de criar a função, permaneça na respetiva página (ou abra Lambda > Functions > your-function).
Selecione o separador Configuração.
No painel Configuração geral, clique em Editar.
Altere Tempo limite para 5 minutos (300 segundos) e clique em Guardar.
Crie um horário do EventBridge
- Aceda a Amazon EventBridge > Scheduler > Create schedule.
- Indique os seguintes detalhes de configuração:
- Agenda recorrente: Taxa (
1 hour
). - Alvo: a sua função Lambda
censys-data-collector
. - Nome:
censys-data-collector-1h
.
- Agenda recorrente: Taxa (
- Clique em Criar programação.
Opcional: crie um utilizador e chaves da IAM só de leitura para o Google SecOps
- Na consola da AWS, aceda a IAM > Utilizadores > Adicionar utilizadores.
- Clique em Adicionar utilizadores.
- Indique os seguintes detalhes de configuração:
- Utilizador:
secops-reader
. - Tipo de acesso: chave de acesso – acesso programático.
- Utilizador:
- Clique em Criar utilizador.
- Anexe a política de leitura mínima (personalizada): Users > secops-reader > Permissions > Add permissions > Attach policies directly > Create policy.
No editor JSON, introduza a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::censys-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::censys-logs" } ] }
Defina o nome como
secops-reader-policy
.Aceda a Criar política > pesquise/selecione > Seguinte > Adicionar autorizações.
Aceda a Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Transfira o CSV (estes valores são introduzidos no feed).
Configure um feed no Google SecOps para carregar registos do Censys
- Aceda a Definições do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, introduza um nome para o feed (por exemplo,
Censys logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione CENSYS como o Tipo de registo.
- Clicar em Seguinte.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://censys-logs/censys/
- Opções de eliminação de origens: selecione a opção de eliminação de acordo com a sua preferência.
- Idade máxima do ficheiro: inclua ficheiros modificados no último número de dias. A predefinição é 180 dias.
- ID da chave de acesso: chave de acesso do utilizador com acesso ao contentor do S3.
- Chave de acesso secreta: chave secreta do utilizador com acesso ao contentor do S3.
- Espaço de nomes do recurso: o espaço de nomes do recurso.
- Etiquetas de carregamento: a etiqueta aplicada aos eventos deste feed.
- URI do S3:
- Clicar em Seguinte.
- Reveja a nova configuração do feed no ecrã Finalizar e, de seguida, clique em Enviar.
Tabela de mapeamento da UDM
Campo de registo | Mapeamento do UDM | Lógica |
---|---|---|
assetId | read_only_udm.principal.asset.hostname | Se o campo assetId não for um endereço IP, é mapeado para principal.asset.hostname. |
assetId | read_only_udm.principal.asset.ip | Se o campo assetId for um endereço IP, é mapeado para principal.asset.ip. |
assetId | read_only_udm.principal.hostname | Se o campo assetId não for um endereço IP, é mapeado para principal.hostname. |
assetId | read_only_udm.principal.ip | Se o campo assetId for um endereço IP, é mapeado para principal.ip. |
associatedAt | read_only_udm.security_result.detection_fields.value | O campo associatedAt está mapeado para security_result.detection_fields.value. |
autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | O campo autonomousSystem.asn é convertido numa string e mapeado para additional.fields.value.string_value com a chave "autonomousSystem_asn". |
autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | O campo autonomousSystem.bgpPrefix é mapeado para additional.fields.value.string_value com a chave "autonomousSystem_bgpPrefix". |
faixa | read_only_udm.principal.resource.attribute.labels.value | O campo do banner está mapeado para principal.resource.attribute.labels.value com a chave "banner". |
nuvem | read_only_udm.metadata.vendor_name | O campo da nuvem está mapeado para metadata.vendor_name. |
comments.refUrl | read_only_udm.network.http.referral_url | O campo comments.refUrl está mapeado para network.http.referral_url. |
data.cve | read_only_udm.additional.fields.value.string_value | O campo data.cve está mapeado para additional.fields.value.string_value com a chave "data_cve". |
data.cvss | read_only_udm.additional.fields.value.string_value | O campo data.cvss é mapeado para additional.fields.value.string_value com a chave "data_cvss". |
data.ipAddress | read_only_udm.principal.asset.ip | Se o campo data.ipAddress não for igual ao campo assetId, é mapeado para principal.asset.ip. |
data.ipAddress | read_only_udm.principal.ip | Se o campo data.ipAddress não for igual ao campo assetId, é mapeado para principal.ip. |
data.location.city | read_only_udm.principal.location.city | Se o campo location.city estiver vazio, o campo data.location.city é mapeado para principal.location.city. |
data.location.countryCode | read_only_udm.principal.location.country_or_region | Se o campo location.country estiver vazio, o campo data.location.countryCode é mapeado para principal.location.country_or_region. |
data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | Se os campos location.coordinates.latitude e location.geoCoordinates.latitude estiverem vazios, o campo data.location.latitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.latitude. |
data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | Se os campos location.coordinates.longitude e location.geoCoordinates.longitude estiverem vazios, o campo data.location.longitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.longitude. |
data.location.province | read_only_udm.principal.location.state | Se o campo location.province estiver vazio, o campo data.location.province é mapeado para principal.location.state. |
data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | Cada elemento na matriz data.mailServers é mapeado para uma entrada additional.fields separada com a chave "Servidores de correio" e value.list_value.values.string_value definida para o valor do elemento. |
data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | Cada elemento na matriz data.names.forwardDns é mapeado para uma entrada network.dns.questions separada com o campo de nome definido para o campo de nome do elemento. |
data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | Cada elemento na matriz data.nameServers é mapeado para uma entrada additional.fields separada com a chave "Name nameServers" e value.list_value.values.string_value definida para o valor do elemento. |
data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | Se o campo data.protocols[].transportProtocol for um dos seguintes: TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP ou VRRP, é mapeado para network.ip_protocol. |
data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | O campo data.protocols[].transportProtocol é mapeado para principal.resource.attribute.labels.value com a chave "data_protocols {index}". |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | Se o campo http.request.headers[].key for "User-Agent", o campo http.request.headers[].value.headers.0 correspondente é mapeado para network.http.user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | Se o campo http.request.headers[].key for "User-Agent", o campo http.request.headers[].value.headers.0 correspondente é analisado como uma string do agente do utilizador e mapeado para network.http.parsed_user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Para cada elemento na matriz http.request.headers, o campo key é mapeado para principal.resource.attribute.labels.key e o campo value.headers.0 é mapeado para principal.resource.attribute.labels.value. |
http.request.uri | read_only_udm.principal.asset.hostname | A parte do nome do anfitrião do campo http.request.uri é extraída e mapeada para principal.asset.hostname. |
http.request.uri | read_only_udm.principal.hostname | A parte do nome do anfitrião do campo http.request.uri é extraída e mapeada para principal.hostname. |
http.response.body | read_only_udm.principal.resource.attribute.labels.value | O campo http.response.body é mapeado para principal.resource.attribute.labels.value com a chave "http_response_body". |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.target.hostname | Se o campo http.response.headers[].key for "Server", o campo http.response.headers[].value.headers.0 correspondente é mapeado para target.hostname. |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Para cada elemento na matriz http.response.headers, o campo key é mapeado para principal.resource.attribute.labels.key e o campo value.headers.0 é mapeado para principal.resource.attribute.labels.value. |
http.response.statusCode | read_only_udm.network.http.response_code | O campo http.response.statusCode é convertido num número inteiro e mapeado para network.http.response_code. |
ip | read_only_udm.target.asset.ip | O campo ip está mapeado para target.asset.ip. |
ip | read_only_udm.target.ip | O campo ip está mapeado para target.ip. |
isSeed | read_only_udm.additional.fields.value.string_value | O campo isSeed é convertido numa string e mapeado para additional.fields.value.string_value com a chave "isSeed". |
location.city | read_only_udm.principal.location.city | O campo location.city está mapeado para principal.location.city. |
location.continent | read_only_udm.additional.fields.value.string_value | O campo location.continent é mapeado para additional.fields.value.string_value com a chave "location_continent". |
location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | O campo location.coordinates.latitude é convertido num valor de vírgula flutuante e mapeado para principal.location.region_coordinates.latitude. |
location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | O campo location.coordinates.longitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.longitude. |
location.country | read_only_udm.principal.location.country_or_region | O campo location.country está mapeado para principal.location.country_or_region. |
location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | Se o campo location.coordinates.latitude estiver vazio, o campo location.geoCoordinates.latitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.latitude. |
location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | Se o campo location.coordinates.longitude estiver vazio, o campo location.geoCoordinates.longitude é convertido num número de vírgula flutuante e mapeado para principal.location.region_coordinates.longitude. |
location.postalCode | read_only_udm.additional.fields.value.string_value | O campo location.postalCode é mapeado para additional.fields.value.string_value com a chave "Código postal". |
location.province | read_only_udm.principal.location.state | O campo location.province está mapeado para principal.location.state. |
operação | read_only_udm.security_result.action_details | O campo de operação está mapeado para security_result.action_details. |
perspectiveId | read_only_udm.principal.group.product_object_id | O campo perspectiveId está mapeado para principal.group.product_object_id. |
porta | read_only_udm.principal.port | O campo de porta é convertido num número inteiro e mapeado para principal.port. |
risks[].severity, risks[].title | read_only_udm.security_result.category_details | O campo risks[].severity é concatenado com o campo risks[].title e mapeado para security_result.category_details. |
serviceName | read_only_udm.network.application_protocol | Se o campo serviceName for "HTTP" ou "HTTPS", é mapeado para network.application_protocol. |
sourceIp | read_only_udm.principal.asset.ip | O campo sourceIp está mapeado para principal.asset.ip. |
sourceIp | read_only_udm.principal.ip | O campo sourceIp está mapeado para principal.ip. |
timestamp | read_only_udm.metadata.event_timestamp | O campo de data/hora é analisado como uma data/hora e mapeado para metadata.event_timestamp. |
transportFingerprint.id | read_only_udm.metadata.product_log_id | O campo transportFingerprint.id é convertido numa string e mapeado para metadata.product_log_id. |
transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | O campo transportFingerprint.raw é mapeado para additional.fields.value.string_value com a chave "transportFingerprint_raw". |
escrever | read_only_udm.metadata.product_event_type | O campo type está mapeado para metadata.product_event_type. |
- | read_only_udm.metadata.product_name | O valor "CENSYS_ASM" é atribuído a metadata.product_name. |
- | read_only_udm.metadata.vendor_name | O valor "CENSYS" é atribuído a metadata.vendor_name. |
- | read_only_udm.metadata.event_type | O tipo de evento é determinado com base na presença de campos específicos: NETWORK_CONNECTION se has_princ_machine_id e has_target_machine forem verdadeiros e has_network_flow for falso, NETWORK_DNS se has_network_flow for verdadeiro, STATUS_UPDATE se has_princ_machine_id for verdadeiro e GENERIC_EVENT caso contrário. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais da Google SecOps.