Coletar registros do Censys
Este documento explica como ingerir registros do Censys no Google Security Operations usando o Amazon S3. A Censys oferece gerenciamento abrangente de superfície de ataque e inteligência da Internet pela API. Com essa integração, é possível coletar eventos de descoberta de host, eventos de risco e mudanças de recursos do Censys ASM e encaminhá-los para o Google SecOps para análise e monitoramento. O analisador transforma registros brutos em um formato estruturado de acordo com o UDM do Google SecOps. Ele extrai campos da mensagem de registro bruta, realiza conversões de tipo de dados e mapeia as informações extraídas para os campos correspondentes do UDM, enriquecendo os dados com contexto e rótulos adicionais.
Antes de começar
Verifique se você tem os pré-requisitos a seguir:
- Instância do Google SecOps
- Acesso privilegiado ao Censys ASM
- Acesso privilegiado à AWS (S3, IAM, Lambda, EventBridge)
Coletar pré-requisitos do Censys (credenciais da API)
- Faça login no console do Censys ASM em
app.censys.io
. - Acesse Integrações na parte de cima da página.
- Copie e salve a chave de API e o ID da organização.
- Anote o URL base da API:
https://api.platform.censys.io
Configurar o bucket do AWS S3 e o IAM para o Google SecOps
- Crie um bucket do Amazon S3 seguindo este guia do usuário: Como criar um bucket
- Salve o Nome e a Região do bucket para referência futura (por exemplo,
censys-logs
). - Crie um usuário seguindo este guia: Como criar um usuário do IAM.
- Selecione o usuário criado.
- Selecione a guia Credenciais de segurança.
- Clique em Criar chave de acesso na seção Chaves de acesso.
- Selecione Serviço de terceiros como o Caso de uso.
- Clique em Próxima.
- Opcional: adicione uma tag de descrição.
- Clique em Criar chave de acesso.
- Clique em Fazer o download do arquivo CSV para salvar a chave de acesso e a chave de acesso secreta para uso posterior.
- Clique em Concluído.
- Selecione a guia Permissões.
- Clique em Adicionar permissões na seção Políticas de permissões.
- Selecione Adicionar permissões.
- Selecione Anexar políticas diretamente.
- Pesquise e selecione a política AmazonS3FullAccess.
- Clique em Próxima.
- Clique em Adicionar permissões
Configurar a política e o papel do IAM para uploads do S3
- No console da AWS, acesse IAM > Políticas > Criar política > guia JSON.
Insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::censys-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- Substitua
censys-logs
se você tiver inserido um nome de bucket diferente.
- Substitua
Clique em Próxima > Criar política.
Acesse IAM > Funções > Criar função > Serviço da AWS > Lambda.
Anexe a política recém-criada e a política gerenciada AWSLambdaBasicExecutionRole (para acesso aos registros do CloudWatch).
Nomeie a função como
censys-lambda-role
e clique em Criar função.
Criar a função Lambda
- No console da AWS, acesse Lambda > Functions > Create function.
- Clique em Criar do zero.
- Informe os seguintes detalhes de configuração:
Configuração | Valor |
---|---|
Nome | censys-data-collector |
Ambiente de execução | Python 3.13 |
Arquitetura | x86_64 |
Função de execução | censys-lambda-role |
Depois que a função for criada, abra a guia Código, exclua o stub e insira o seguinte código (
censys-data-collector.py
):import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client s3_client = boto3.client('s3') # HTTP client http = urllib3.PoolManager() # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] CENSYS_API_KEY = os.environ['CENSYS_API_KEY'] CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID'] API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io') class CensysCollector: def __init__(self): self.headers = { 'Authorization': f'Bearer {CENSYS_API_KEY}', 'X-Organization-ID': CENSYS_ORG_ID, 'Content-Type': 'application/json' } def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from S3 state file.""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state = json.loads(response['Body'].read().decode('utf-8')) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: logger.info(f"No state file found or error reading state: {e}") return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to S3 state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state), ContentType='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{API_BASE}/v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) logger.info(f"Collected {len(events)} logbook events") return events except Exception as e: logger.error(f"Error collecting logbook events: {e}") return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{API_BASE}/v3/risks" try: response = http.request('GET', url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) logger.info(f"Collected {len(events)} risk events") return events except Exception as e: logger.error(f"Error collecting risk events: {e}") return [] def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str): """Save events to S3 in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=gz_bytes, ContentType='application/gzip', ContentEncoding='gzip' ) logger.info(f"Saved {len(events)} {event_type} events to {filename}") except Exception as e: logger.error(f"Error saving {event_type} events to S3: {e}") raise def lambda_handler(event, context): """AWS Lambda handler function.""" try: collector = CensysCollector() # Get last collection time for cursor state management last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) logger.info(f"Collecting events since {last_collection_time}") # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to S3 collector.save_events_to_s3(logbook_events, 'logbook') collector.save_events_to_s3(risk_events, 'risks') # Update state collector.save_collection_time(current_time) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Censys data collection completed successfully', 'logbook_events': len(logbook_events), 'risk_events': len(risk_events), 'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') }) } except Exception as e: logger.error(f"Lambda execution failed: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) }
Acesse Configuração > Variáveis de ambiente > Editar > Adicionar nova variável de ambiente.
Insira as seguintes variáveis de ambiente, substituindo pelos seus valores:
Chave Valor de exemplo S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
Depois que a função for criada, permaneça na página dela ou abra Lambda > Functions > sua-função.
Selecione a guia Configuração.
No painel Configuração geral, clique em Editar.
Mude Tempo limite para 5 minutos (300 segundos) e clique em Salvar.
Criar uma programação do EventBridge
- Acesse Amazon EventBridge > Scheduler > Criar programação.
- Informe os seguintes detalhes de configuração:
- Programação recorrente: Taxa (
1 hour
). - Destino: sua função Lambda
censys-data-collector
. - Nome:
censys-data-collector-1h
.
- Programação recorrente: Taxa (
- Clique em Criar programação.
Opcional: criar um usuário e chaves do IAM somente leitura para o Google SecOps
- No console da AWS, acesse IAM > Usuários > Adicionar usuários.
- Clique em Add users.
- Informe os seguintes detalhes de configuração:
- Usuário:
secops-reader
. - Tipo de acesso: Chave de acesso — Acesso programático.
- Usuário:
- Clique em Criar usuário.
- Anexe a política de leitura mínima (personalizada): Usuários > secops-reader > Permissões > Adicionar permissões > Anexar políticas diretamente > Criar política.
No editor JSON, insira a seguinte política:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::censys-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::censys-logs" } ] }
Defina o nome como
secops-reader-policy
.Acesse Criar política > pesquise/selecione > Próxima > Adicionar permissões.
Acesse Credenciais de segurança > Chaves de acesso > Criar chave de acesso.
Faça o download do CSV (esses valores são inseridos no feed).
Configurar um feed no Google SecOps para ingerir registros do Censys
- Acesse Configurações do SIEM > Feeds.
- Clique em + Adicionar novo feed.
- No campo Nome do feed, insira um nome para o feed (por exemplo,
Censys logs
). - Selecione Amazon S3 V2 como o Tipo de origem.
- Selecione CENSYS como o Tipo de registro.
- Clique em Próxima.
- Especifique valores para os seguintes parâmetros de entrada:
- URI do S3:
s3://censys-logs/censys/
- Opções de exclusão de fontes: selecione a opção de exclusão de acordo com sua preferência.
- Idade máxima do arquivo: inclui arquivos modificados no último número de dias. O padrão é de 180 dias.
- ID da chave de acesso: chave de acesso do usuário com acesso ao bucket do S3.
- Chave de acesso secreta: chave secreta do usuário com acesso ao bucket do S3.
- Namespace do recurso: o namespace do recurso.
- Rótulos de ingestão: o rótulo aplicado aos eventos deste feed.
- URI do S3:
- Clique em Próxima.
- Revise a nova configuração do feed na tela Finalizar e clique em Enviar.
Tabela de mapeamento do UDM
Campo de registro | Mapeamento do UDM | Lógica |
---|---|---|
assetId | read_only_udm.principal.asset.hostname | Se o campo "assetId" não for um endereço IP, ele será mapeado para "principal.asset.hostname". |
assetId | read_only_udm.principal.asset.ip | Se o campo "assetId" for um endereço IP, ele será mapeado para "principal.asset.ip". |
assetId | read_only_udm.principal.hostname | Se o campo "assetId" não for um endereço IP, ele será mapeado para "principal.hostname". |
assetId | read_only_udm.principal.ip | Se o campo "assetId" for um endereço IP, ele será mapeado para "principal.ip". |
associatedAt | read_only_udm.security_result.detection_fields.value | O campo "associatedAt" é mapeado para "security_result.detection_fields.value". |
autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | O campo "autonomousSystem.asn" é convertido em uma string e mapeado para "additional.fields.value.string_value" com a chave "autonomousSystem_asn". |
autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | O campo "autonomousSystem.bgpPrefix" é mapeado para "additional.fields.value.string_value" com a chave "autonomousSystem_bgpPrefix". |
banner | read_only_udm.principal.resource.attribute.labels.value | O campo de banner é mapeado para principal.resource.attribute.labels.value com a chave "banner". |
nuvem | read_only_udm.metadata.vendor_name | O campo da nuvem é mapeado para "metadata.vendor_name". |
comments.refUrl | read_only_udm.network.http.referral_url | O campo "comments.refUrl" é mapeado para "network.http.referral_url". |
data.cve | read_only_udm.additional.fields.value.string_value | O campo "data.cve" é mapeado para "additional.fields.value.string_value" com a chave "data_cve". |
data.cvss | read_only_udm.additional.fields.value.string_value | O campo "data.cvss" é mapeado para "additional.fields.value.string_value" com a chave "data_cvss". |
data.ipAddress | read_only_udm.principal.asset.ip | Se o campo "data.ipAddress" não for igual ao campo "assetId", ele será mapeado para "principal.asset.ip". |
data.ipAddress | read_only_udm.principal.ip | Se o campo "data.ipAddress" não for igual ao campo "assetId", ele será mapeado para "principal.ip". |
data.location.city | read_only_udm.principal.location.city | Se o campo "location.city" estiver vazio, o campo "data.location.city" será mapeado para "principal.location.city". |
data.location.countryCode | read_only_udm.principal.location.country_or_region | Se o campo "location.country" estiver vazio, o campo "data.location.countryCode" será mapeado para "principal.location.country_or_region". |
data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | Se os campos "location.coordinates.latitude" e "location.geoCoordinates.latitude" estiverem vazios, o campo "data.location.latitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.latitude". |
data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | Se os campos "location.coordinates.longitude" e "location.geoCoordinates.longitude" estiverem vazios, o campo "data.location.longitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.longitude". |
data.location.province | read_only_udm.principal.location.state | Se o campo "location.province" estiver vazio, o campo "data.location.province" será mapeado para "principal.location.state". |
data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | Cada elemento na matriz data.mailServers é mapeado para uma entrada additional.fields separada com a chave "Servidores de e-mail" e value.list_value.values.string_value definido como o valor do elemento. |
data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | Cada elemento na matriz data.names.forwardDns é mapeado para uma entrada network.dns.questions separada com o campo "name" definido como o campo "name" do elemento. |
data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | Cada elemento na matriz data.nameServers é mapeado para uma entrada additional.fields separada com a chave "Name nameServers" e value.list_value.values.string_value definido como o valor do elemento. |
data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | Se o campo data.protocols[].transportProtocol for um dos seguintes: TCP, EIGRP, ESP, ETHERIP, GRE, ICMP, IGMP, IP6IN4, PIM, UDP ou VRRP, ele será mapeado para network.ip_protocol. |
data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | O campo "data.protocols[].transportProtocol" é mapeado para "principal.resource.attribute.labels.value" com a chave "data_protocols {index}". |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | Se o campo http.request.headers[].key for "User-Agent", o campo correspondente http.request.headers[].value.headers.0 será mapeado para network.http.user_agent. |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | Se o campo "http.request.headers[].key" for "User-Agent", o campo "http.request.headers[].value.headers.0" correspondente será analisado como uma string de user agent e mapeado para "network.http.parsed_user_agent". |
http.request.headers[].key, http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Para cada elemento na matriz "http.request.headers", o campo "key" é mapeado para "principal.resource.attribute.labels.key", e o campo "value.headers.0" é mapeado para "principal.resource.attribute.labels.value". |
http.request.uri | read_only_udm.principal.asset.hostname | A parte do nome do host do campo http.request.uri é extraída e mapeada para principal.asset.hostname. |
http.request.uri | read_only_udm.principal.hostname | A parte do nome do host do campo http.request.uri é extraída e mapeada para principal.hostname. |
http.response.body | read_only_udm.principal.resource.attribute.labels.value | O campo "http.response.body" é mapeado para "principal.resource.attribute.labels.value" com a chave "http_response_body". |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.target.hostname | Se o campo "key" de http.response.headers[] for "Server", o campo correspondente http.response.headers[].value.headers.0 será mapeado para target.hostname. |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key, read_only_udm.principal.resource.attribute.labels.value | Para cada elemento na matriz "http.response.headers", o campo "key" é mapeado para "principal.resource.attribute.labels.key", e o campo "value.headers.0" é mapeado para "principal.resource.attribute.labels.value". |
http.response.statusCode | read_only_udm.network.http.response_code | O campo "http.response.statusCode" é convertido em um número inteiro e mapeado para "network.http.response_code". |
ip | read_only_udm.target.asset.ip | O campo "ip" é mapeado para "target.asset.ip". |
ip | read_only_udm.target.ip | O campo "ip" é mapeado para "target.ip". |
isSeed | read_only_udm.additional.fields.value.string_value | O campo "isSeed" é convertido em uma string e mapeado para "additional.fields.value.string_value" com a chave "isSeed". |
location.city | read_only_udm.principal.location.city | O campo "location.city" é mapeado para "principal.location.city". |
location.continent | read_only_udm.additional.fields.value.string_value | O campo "location.continent" é mapeado para "additional.fields.value.string_value" com a chave "location_continent". |
location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | O campo "location.coordinates.latitude" é convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.latitude". |
location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | O campo "location.coordinates.longitude" é convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.longitude". |
location.country | read_only_udm.principal.location.country_or_region | O campo "location.country" é mapeado para "principal.location.country_or_region". |
location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | Se o campo "location.coordinates.latitude" estiver vazio, o campo "location.geoCoordinates.latitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.latitude". |
location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | Se o campo "location.coordinates.longitude" estiver vazio, o campo "location.geoCoordinates.longitude" será convertido em um ponto flutuante e mapeado para "principal.location.region_coordinates.longitude". |
location.postalCode | read_only_udm.additional.fields.value.string_value | O campo "location.postalCode" é mapeado para "additional.fields.value.string_value" com a chave "Postal code". |
location.province | read_only_udm.principal.location.state | O campo "location.province" é mapeado para "principal.location.state". |
operação | read_only_udm.security_result.action_details | O campo "operation" é mapeado para "security_result.action_details". |
perspectiveId | read_only_udm.principal.group.product_object_id | O campo "perspectiveId" é mapeado para "principal.group.product_object_id". |
porta | read_only_udm.principal.port | O campo de porta é convertido em um número inteiro e mapeado para principal.port. |
risks[].severity, risks[].title | read_only_udm.security_result.category_details | O campo risks[].severity é concatenado com o campo risks[].title e mapeado para security_result.category_details. |
serviceName | read_only_udm.network.application_protocol | Se o campo "serviceName" for "HTTP" ou "HTTPS", ele será mapeado para "network.application_protocol". |
sourceIp | read_only_udm.principal.asset.ip | O campo "sourceIp" é mapeado para "principal.asset.ip". |
sourceIp | read_only_udm.principal.ip | O campo "sourceIp" é mapeado para "principal.ip". |
timestamp | read_only_udm.metadata.event_timestamp | O campo de carimbo de data/hora é analisado como um carimbo de data/hora e mapeado para "metadata.event_timestamp". |
transportFingerprint.id | read_only_udm.metadata.product_log_id | O campo transportFingerprint.id é convertido em uma string e mapeado para metadata.product_log_id. |
transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | O campo "transportFingerprint.raw" é mapeado para "additional.fields.value.string_value" com a chave "transportFingerprint_raw". |
tipo | read_only_udm.metadata.product_event_type | O campo "type" é mapeado para "metadata.product_event_type". |
- | read_only_udm.metadata.product_name | O valor "CENSYS_ASM" é atribuído a metadata.product_name. |
- | read_only_udm.metadata.vendor_name | O valor "CENSYS" é atribuído a metadata.vendor_name. |
- | read_only_udm.metadata.event_type | O tipo de evento é determinado com base na presença de campos específicos: NETWORK_CONNECTION se has_princ_machine_id e has_target_machine forem verdadeiros e has_network_flow for falso, NETWORK_DNS se has_network_flow for verdadeiro, STATUS_UPDATE se has_princ_machine_id for verdadeiro e GENERIC_EVENT caso contrário. |
Precisa de mais ajuda? Receba respostas de membros da comunidade e profissionais do Google SecOps.