收集 Censys 記錄
本文說明如何使用 Amazon S3,將 Censys 記錄擷取至 Google Security Operations。Censys 透過 API 提供全面的攻擊面管理和網際網路情報。整合後,您就能從 Censys ASM 收集主機探索事件、風險事件和資產變更,並轉送至 Google SecOps 進行分析和監控。剖析器會將原始記錄轉換為符合 Google SecOps UDM 的結構化格式。這項服務會從原始記錄訊息中擷取欄位、執行資料類型轉換,並將擷取的資訊對應至相應的 UDM 欄位,以額外的內容和標籤擴充資料。
事前準備
請確認您已完成下列事前準備事項:
- Google SecOps 執行個體
- Censys ASM 的特殊存取權
- AWS (S3、IAM、Lambda、EventBridge) 的具備權限存取權
收集 Censys 先決條件 (API 憑證)
- 前往
app.censys.io
登入 Censys ASM 控制台。 - 前往頁面頂端的「整合」。
- 複製並儲存「API 金鑰」和「機構 ID」。
- 請記下 API 基礎網址:
https://api.platform.censys.io
為 Google SecOps 設定 AWS S3 值區和 IAM
- 按照這份使用者指南建立 Amazon S3 值區:建立值區
- 儲存 bucket 的「名稱」和「區域」,以供日後參考 (例如
censys-logs
)。 - 按照這份使用者指南建立使用者:建立 IAM 使用者。
- 選取建立的「使用者」。
- 選取「安全憑證」分頁標籤。
- 在「Access Keys」部分中,按一下「Create Access Key」。
- 選取「第三方服務」做為「用途」。
- 點選「下一步」。
- 選用:新增說明標記。
- 按一下「建立存取金鑰」。
- 按一下「下載 CSV 檔案」,儲存「存取金鑰」和「私密存取金鑰」以供日後使用。
- 按一下 [完成]。
- 選取 [權限] 分頁標籤。
- 在「Permissions policies」(權限政策) 區段中,按一下「Add permissions」(新增權限)。
- 選取「新增權限」。
- 選取「直接附加政策」
- 搜尋並選取 AmazonS3FullAccess 政策。
- 點選「下一步」。
- 按一下「Add permissions」。
設定 S3 上傳的身分與存取權管理政策和角色
- 在 AWS 控制台中,依序前往「IAM」>「Policies」>「Create policy」>「JSON」分頁標籤。
輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::censys-logs/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::censys-logs/censys/state.json" } ] }
- 如果您輸入的值區名稱不同,請替換
censys-logs
。
- 如果您輸入的值區名稱不同,請替換
依序點選「Next」>「Create policy」。
依序前往「IAM」>「Roles」>「Create role」>「AWS service」>「Lambda」。
附加新建立的政策和 AWSLambdaBasicExecutionRole 受管理政策 (適用於 CloudWatch Logs 存取權)。
為角色命名
censys-lambda-role
,然後按一下「建立角色」。
建立 Lambda 函式
- 在 AWS 控制台中,依序前往「Lambda」>「Functions」>「Create function」。
- 按一下「從頭開始撰寫」。
- 請提供下列設定詳細資料:
設定 | 值 |
---|---|
名稱 | censys-data-collector |
執行階段 | Python 3.13 |
架構 | x86_64 |
執行角色 | censys-lambda-role |
建立函式後,開啟「程式碼」分頁,刪除存根並輸入下列程式碼 (
censys-data-collector.py
):import json import boto3 import urllib3 import gzip import logging import os from datetime import datetime, timedelta, timezone from typing import Dict, List, Any, Optional from urllib.parse import urlencode # Configure logging logger = logging.getLogger() logger.setLevel(logging.INFO) # AWS S3 client s3_client = boto3.client('s3') # HTTP client http = urllib3.PoolManager() # Environment variables S3_BUCKET = os.environ['S3_BUCKET'] S3_PREFIX = os.environ['S3_PREFIX'] STATE_KEY = os.environ['STATE_KEY'] CENSYS_API_KEY = os.environ['CENSYS_API_KEY'] CENSYS_ORG_ID = os.environ['CENSYS_ORG_ID'] API_BASE = os.environ.get('API_BASE', 'https://api.platform.censys.io') class CensysCollector: def __init__(self): self.headers = { 'Authorization': f'Bearer {CENSYS_API_KEY}', 'X-Organization-ID': CENSYS_ORG_ID, 'Content-Type': 'application/json' } def get_last_collection_time(self) -> Optional[datetime]: """Get the last collection timestamp from S3 state file.""" try: response = s3_client.get_object(Bucket=S3_BUCKET, Key=STATE_KEY) state = json.loads(response['Body'].read().decode('utf-8')) return datetime.fromisoformat(state.get('last_collection_time', '2024-01-01T00:00:00Z')) except Exception as e: logger.info(f"No state file found or error reading state: {e}") return datetime.now(timezone.utc) - timedelta(hours=1) def save_collection_time(self, collection_time: datetime): """Save the current collection timestamp to S3 state file.""" state = {'last_collection_time': collection_time.strftime('%Y-%m-%dT%H:%M:%SZ')} s3_client.put_object( Bucket=S3_BUCKET, Key=STATE_KEY, Body=json.dumps(state), ContentType='application/json' ) def collect_logbook_events(self, cursor: str = None) -> List[Dict[str, Any]]: """Collect logbook events from Censys ASM API using cursor-based pagination.""" events = [] url = f"{API_BASE}/v3/logbook" # Use cursor-based pagination as per Censys API documentation params = {} if cursor: params['cursor'] = cursor try: query_string = urlencode(params) if params else '' full_url = f"{url}?{query_string}" if query_string else url response = http.request('GET', full_url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('logbook_entries', [])) # Handle cursor-based pagination next_cursor = data.get('next_cursor') if next_cursor: events.extend(self.collect_logbook_events(next_cursor)) logger.info(f"Collected {len(events)} logbook events") return events except Exception as e: logger.error(f"Error collecting logbook events: {e}") return [] def collect_risks_events(self) -> List[Dict[str, Any]]: """Collect risk events from Censys ASM API.""" events = [] url = f"{API_BASE}/v3/risks" try: response = http.request('GET', url, headers=self.headers) if response.status != 200: logger.error(f"API request failed with status {response.status}: {response.data}") return [] data = json.loads(response.data.decode('utf-8')) events.extend(data.get('risks', [])) logger.info(f"Collected {len(events)} risk events") return events except Exception as e: logger.error(f"Error collecting risk events: {e}") return [] def save_events_to_s3(self, events: List[Dict[str, Any]], event_type: str): """Save events to S3 in compressed NDJSON format.""" if not events: return timestamp = datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S') filename = f"{S3_PREFIX}{event_type}_{timestamp}.json.gz" try: # Convert events to newline-delimited JSON ndjson_content = 'n'.join(json.dumps(event, separators=(',', ':')) for event in events) # Compress with gzip gz_bytes = gzip.compress(ndjson_content.encode('utf-8')) s3_client.put_object( Bucket=S3_BUCKET, Key=filename, Body=gz_bytes, ContentType='application/gzip', ContentEncoding='gzip' ) logger.info(f"Saved {len(events)} {event_type} events to {filename}") except Exception as e: logger.error(f"Error saving {event_type} events to S3: {e}") raise def lambda_handler(event, context): """AWS Lambda handler function.""" try: collector = CensysCollector() # Get last collection time for cursor state management last_collection_time = collector.get_last_collection_time() current_time = datetime.now(timezone.utc) logger.info(f"Collecting events since {last_collection_time}") # Collect different types of events logbook_events = collector.collect_logbook_events() risk_events = collector.collect_risks_events() # Save events to S3 collector.save_events_to_s3(logbook_events, 'logbook') collector.save_events_to_s3(risk_events, 'risks') # Update state collector.save_collection_time(current_time) return { 'statusCode': 200, 'body': json.dumps({ 'message': 'Censys data collection completed successfully', 'logbook_events': len(logbook_events), 'risk_events': len(risk_events), 'collection_time': current_time.strftime('%Y-%m-%dT%H:%M:%SZ') }) } except Exception as e: logger.error(f"Lambda execution failed: {str(e)}") return { 'statusCode': 500, 'body': json.dumps({ 'error': str(e) }) }
依序前往「Configuration」>「Environment variables」>「Edit」>「Add new environment variable」。
輸入下列環境變數,並將 換成您的值:
鍵 範例值 S3_BUCKET
censys-logs
S3_PREFIX
censys/
STATE_KEY
censys/state.json
CENSYS_API_KEY
<your-censys-api-key>
CENSYS_ORG_ID
<your-organization-id>
API_BASE
https://api.platform.censys.io
建立函式後,請留在函式頁面 (或依序開啟「Lambda」>「Functions」>「your-function」)。
選取「設定」分頁標籤。
在「一般設定」面板中,按一下「編輯」。
將「Timeout」(逾時間隔) 變更為「5 minutes (300 seconds)」(5 分鐘 (300 秒)),然後按一下「Save」(儲存)。
建立 EventBridge 排程
- 依序前往「Amazon EventBridge」>「Scheduler」>「Create schedule」。
- 提供下列設定詳細資料:
- 週期性時間表:費率 (
1 hour
)。 - 目標:您的 Lambda 函式
censys-data-collector
。 - 名稱:
censys-data-collector-1h
。
- 週期性時間表:費率 (
- 按一下「建立時間表」。
選用:為 Google SecOps 建立唯讀 IAM 使用者和金鑰
- 在 AWS 控制台中,依序前往「IAM」>「Users」>「Add users」。
- 點選 [Add users] (新增使用者)。
- 提供下列設定詳細資料:
- 使用者:
secops-reader
。 - 存取類型:存取金鑰 - 程式輔助存取。
- 使用者:
- 按一下「建立使用者」。
- 附加最低讀取權限政策 (自訂):依序選取「使用者」>「secops-reader」>「權限」>「新增權限」>「直接附加政策」>「建立政策」。
在 JSON 編輯器中輸入下列政策:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::censys-logs/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::censys-logs" } ] }
將名稱設為
secops-reader-policy
。依序前往「建立政策」> 搜尋/選取 >「下一步」>「新增權限」。
依序前往「安全憑證」>「存取金鑰」>「建立存取金鑰」。
下載 CSV (這些值會輸入至動態饋給)。
在 Google SecOps 中設定資訊提供,擷取 Censys 記錄
- 依序前往「SIEM 設定」>「動態饋給」。
- 按一下「+ 新增動態消息」。
- 在「動態饋給名稱」欄位中輸入動態饋給名稱 (例如
Censys logs
)。 - 選取「Amazon S3 V2」做為「來源類型」。
- 選取「CENSYS」做為「記錄類型」。
- 點選「下一步」。
- 指定下列輸入參數的值:
- S3 URI:
s3://censys-logs/censys/
- 來源刪除選項:根據偏好設定選取刪除選項。
- 檔案存在時間上限:包含在過去天數內修改的檔案。預設值為 180 天。
- 存取金鑰 ID:具有 S3 值區存取權的使用者存取金鑰。
- 存取密鑰:具有 S3 bucket 存取權的使用者私密金鑰。
- 資產命名空間:資產命名空間。
- 擷取標籤:套用至這個動態饋給事件的標籤。
- S3 URI:
- 點選「下一步」。
- 在「完成」畫面中檢查新的動態饋給設定,然後按一下「提交」。
UDM 對應表
記錄欄位 | UDM 對應 | 邏輯 |
---|---|---|
assetId | read_only_udm.principal.asset.hostname | 如果 assetId 欄位不是 IP 位址,則會對應至 principal.asset.hostname。 |
assetId | read_only_udm.principal.asset.ip | 如果 assetId 欄位是 IP 位址,則會對應至 principal.asset.ip。 |
assetId | read_only_udm.principal.hostname | 如果 assetId 欄位不是 IP 位址,則會對應至 principal.hostname。 |
assetId | read_only_udm.principal.ip | 如果 assetId 欄位是 IP 位址,則會對應至 principal.ip。 |
associatedAt | read_only_udm.security_result.detection_fields.value | associatedAt 欄位會對應至 security_result.detection_fields.value。 |
autonomousSystem.asn | read_only_udm.additional.fields.value.string_value | autonomousSystem.asn 欄位會轉換為字串,並對應至 additional.fields.value.string_value,鍵為「autonomousSystem_asn」。 |
autonomousSystem.bgpPrefix | read_only_udm.additional.fields.value.string_value | autonomousSystem.bgpPrefix 欄位會對應至 additional.fields.value.string_value,並使用「autonomousSystem_bgpPrefix」鍵。 |
橫幅 | read_only_udm.principal.resource.attribute.labels.value | 橫幅欄位會對應至 principal.resource.attribute.labels.value,並以「banner」做為鍵。 |
雲朵 | read_only_udm.metadata.vendor_name | 雲端欄位會對應至 metadata.vendor_name。 |
comments.refUrl | read_only_udm.network.http.referral_url | comments.refUrl 欄位會對應至 network.http.referral_url。 |
data.cve | read_only_udm.additional.fields.value.string_value | data.cve 欄位會對應至 additional.fields.value.string_value,並使用「data_cve」做為鍵。 |
data.cvss | read_only_udm.additional.fields.value.string_value | data.cvss 欄位會對應至 additional.fields.value.string_value,並使用「data_cvss」鍵。 |
data.ipAddress | read_only_udm.principal.asset.ip | 如果 data.ipAddress 欄位不等於 assetId 欄位,則會對應至 principal.asset.ip。 |
data.ipAddress | read_only_udm.principal.ip | 如果 data.ipAddress 欄位不等於 assetId 欄位,則會對應至 principal.ip。 |
data.location.city | read_only_udm.principal.location.city | 如果 location.city 欄位空白,data.location.city 欄位會對應至 principal.location.city。 |
data.location.countryCode | read_only_udm.principal.location.country_or_region | 如果 location.country 欄位為空白,data.location.countryCode 欄位會對應至 principal.location.country_or_region。 |
data.location.latitude | read_only_udm.principal.location.region_coordinates.latitude | 如果 location.coordinates.latitude 和 location.geoCoordinates.latitude 欄位空白,data.location.latitude 欄位會轉換為浮點數,並對應至 principal.location.region_coordinates.latitude。 |
data.location.longitude | read_only_udm.principal.location.region_coordinates.longitude | 如果 location.coordinates.longitude 和 location.geoCoordinates.longitude 欄位為空白,data.location.longitude 欄位會轉換為浮點數,並對應至 principal.location.region_coordinates.longitude。 |
data.location.province | read_only_udm.principal.location.state | 如果 location.province 欄位為空白,data.location.province 欄位會對應至 principal.location.state。 |
data.mailServers | read_only_udm.additional.fields.value.list_value.values.string_value | data.mailServers 陣列中的每個元素都會對應至個別的 additional.fields 項目,其中鍵為「Mail Servers」,且 value.list_value.values.string_value 設為元素值。 |
data.names.forwardDns[].name | read_only_udm.network.dns.questions.name | data.names.forwardDns 陣列中的每個元素都會對應至個別的 network.dns.questions 項目,且名稱欄位會設為元素的名稱欄位。 |
data.nameServers | read_only_udm.additional.fields.value.list_value.values.string_value | data.nameServers 陣列中的每個元素都會對應至個別的 additional.fields 項目,且索引鍵為「Name nameServers」,值為 list_value.values.string_value 設為元素值。 |
data.protocols[].transportProtocol | read_only_udm.network.ip_protocol | 如果 data.protocols[].transportProtocol 欄位是 TCP、EIGRP、ESP、ETHERIP、GRE、ICMP、IGMP、IP6IN4、PIM、UDP 或 VRRP 其中之一,則會對應至 network.ip_protocol。 |
data.protocols[].transportProtocol | read_only_udm.principal.resource.attribute.labels.value | data.protocols[].transportProtocol 欄位會對應至 principal.resource.attribute.labels.value,並以「data_protocols {index}」做為鍵。 |
http.request.headers[].key、http.request.headers[].value.headers.0 | read_only_udm.network.http.user_agent | 如果 http.request.headers[].key 欄位為「User-Agent」,對應的 http.request.headers[].value.headers.0 欄位會對應至 network.http.user_agent。 |
http.request.headers[].key、http.request.headers[].value.headers.0 | read_only_udm.network.http.parsed_user_agent | 如果 http.request.headers[].key 欄位為「User-Agent」,系統會將對應的 http.request.headers[].value.headers.0 欄位剖析為使用者代理程式字串,並對應至 network.http.parsed_user_agent。 |
http.request.headers[].key、http.request.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key、read_only_udm.principal.resource.attribute.labels.value | 針對 http.request.headers 陣列中的每個元素,鍵欄位會對應至 principal.resource.attribute.labels.key 和 value.headers.0 欄位,而值欄位則會對應至 principal.resource.attribute.labels.value。 |
http.request.uri | read_only_udm.principal.asset.hostname | 系統會擷取 http.request.uri 欄位的主機名稱部分,並對應至 principal.asset.hostname。 |
http.request.uri | read_only_udm.principal.hostname | 系統會擷取 http.request.uri 欄位的主機名稱部分,並對應至 principal.hostname。 |
http.response.body | read_only_udm.principal.resource.attribute.labels.value | http.response.body 欄位會對應至 principal.resource.attribute.labels.value,且鍵為「http_response_body」。 |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.target.hostname | 如果 http.response.headers[].key 欄位為「Server」,對應的 http.response.headers[].value.headers.0 欄位會對應至 target.hostname。 |
http.response.headers[].key, http.response.headers[].value.headers.0 | read_only_udm.principal.resource.attribute.labels.key、read_only_udm.principal.resource.attribute.labels.value | 針對 http.response.headers 陣列中的每個元素,鍵欄位會對應至 principal.resource.attribute.labels.key,而 value.headers.0 欄位會對應至 principal.resource.attribute.labels.value。 |
http.response.statusCode | read_only_udm.network.http.response_code | http.response.statusCode 欄位會轉換為整數,並對應至 network.http.response_code。 |
ip | read_only_udm.target.asset.ip | ip 欄位會對應至 target.asset.ip。 |
ip | read_only_udm.target.ip | ip 欄位會對應至 target.ip。 |
isSeed | read_only_udm.additional.fields.value.string_value | isSeed 欄位會轉換為字串,並對應至 additional.fields.value.string_value,索引鍵為「isSeed」。 |
location.city | read_only_udm.principal.location.city | location.city 欄位會對應至 principal.location.city。 |
location.continent | read_only_udm.additional.fields.value.string_value | location.continent 欄位會對應至 additional.fields.value.string_value,並以「location_continent」做為鍵。 |
location.coordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | location.coordinates.latitude 欄位會轉換為浮點數,並對應至 principal.location.region_coordinates.latitude。 |
location.coordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | location.coordinates.longitude 欄位會轉換為浮點數,並對應至 principal.location.region_coordinates.longitude。 |
location.country | read_only_udm.principal.location.country_or_region | location.country 欄位會對應至 principal.location.country_or_region。 |
location.geoCoordinates.latitude | read_only_udm.principal.location.region_coordinates.latitude | 如果 location.coordinates.latitude 欄位為空白,系統會將 location.geoCoordinates.latitude 欄位轉換為浮點數,並對應至 principal.location.region_coordinates.latitude。 |
location.geoCoordinates.longitude | read_only_udm.principal.location.region_coordinates.longitude | 如果 location.coordinates.longitude 欄位空白,系統會將 location.geoCoordinates.longitude 欄位轉換為浮點數,並對應至 principal.location.region_coordinates.longitude。 |
location.postalCode | read_only_udm.additional.fields.value.string_value | location.postalCode 欄位會對應至 additional.fields.value.string_value,索引鍵為「Postal code」。 |
location.province | read_only_udm.principal.location.state | location.province 欄位會對應至 principal.location.state。 |
作業 | read_only_udm.security_result.action_details | 作業欄位會對應至 security_result.action_details。 |
perspectiveId | read_only_udm.principal.group.product_object_id | perspectiveId 欄位會對應至 principal.group.product_object_id。 |
通訊埠 | read_only_udm.principal.port | 通訊埠欄位會轉換為整數,並對應至 principal.port。 |
risks[].severity、risks[].title | read_only_udm.security_result.category_details | 風險的 []severity 欄位會與風險的 []title 欄位串連,並對應至 security_result.category_details。 |
serviceName | read_only_udm.network.application_protocol | 如果 serviceName 欄位為「HTTP」或「HTTPS」,則會對應至 network.application_protocol。 |
sourceIp | read_only_udm.principal.asset.ip | sourceIp 欄位會對應至 principal.asset.ip。 |
sourceIp | read_only_udm.principal.ip | sourceIp 欄位會對應至 principal.ip。 |
時間戳記 | read_only_udm.metadata.event_timestamp | 時間戳記欄位會剖析為時間戳記,並對應至 metadata.event_timestamp。 |
transportFingerprint.id | read_only_udm.metadata.product_log_id | transportFingerprint.id 欄位會轉換為字串,並對應至 metadata.product_log_id。 |
transportFingerprint.raw | read_only_udm.additional.fields.value.string_value | transportFingerprint.raw 欄位會對應至 additional.fields.value.string_value,並使用「transportFingerprint_raw」鍵。 |
類型 | read_only_udm.metadata.product_event_type | 類型欄位會對應至 metadata.product_event_type。 |
- | read_only_udm.metadata.product_name | 「CENSYS_ASM」值會指派給 metadata.product_name。 |
- | read_only_udm.metadata.vendor_name | 值「CENSYS」會指派給 metadata.vendor_name。 |
- | read_only_udm.metadata.event_type | 系統會根據特定欄位是否存在來判斷事件類型:如果 has_princ_machine_id 和 has_target_machine 為 true,且 has_network_flow 為 false,則為 NETWORK_CONNECTION;如果 has_network_flow 為 true,則為 NETWORK_DNS;如果 has_princ_machine_id 為 true,則為 STATUS_UPDATE;否則為 GENERIC_EVENT。 |
還有其他問題嗎?向社群成員和 Google SecOps 專業人員尋求答案。