CSV カスタム IOC ファイルを収集する
このドキュメントでは、Amazon S3 を使用して CSV カスタム IOC ファイルを Google Security Operations に取り込む方法について説明します。次に、これらのフィールドを UDM にマッピングし、IP、ドメイン、ハッシュなどのさまざまなデータ型を処理して、脅威の詳細、エンティティ情報、重大度レベルで出力を拡充します。
始める前に
- Google SecOps インスタンス
- AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
- 1 つ以上の CSV IOC フィード URL(HTTPS)または CSV を提供する内部エンドポイントへのアクセス
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
csv-ioc
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションで、[権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索して選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutCsvIocObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::csv-ioc/*" } ] }
- 別のバケット名を入力した場合は、
csv-ioc
を置き換えます。
- 別のバケット名を入力した場合は、
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
WriteCsvIocToS3Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 csv_custom_ioc_to_s3
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール WriteCsvIocToS3Role
関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(
csv_custom_ioc_to_s3.py
)を入力します。#!/usr/bin/env python3 # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform) # - Multiple URLs (comma-separated) # - Optional auth header # - Retries for 429/5xx # - Unique filenames per page # - Sets ContentType=text/csv import os, time, json from urllib.request import Request, urlopen from urllib.error import HTTPError, URLError import boto3 BUCKET = os.environ["S3_BUCKET"] PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/") IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()] AUTH_HEADER = os.environ.get("AUTH_HEADER", "") # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>" TIMEOUT = int(os.environ.get("TIMEOUT", "60")) s3 = boto3.client("s3") def _build_request(url: str) -> Request: if not url.lower().startswith("https://"): raise ValueError("Only HTTPS URLs are allowed in IOC_URLS") req = Request(url, method="GET") # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization if AUTH_HEADER: if ":" in AUTH_HEADER: k, v = AUTH_HEADER.split(":", 1) req.add_header(k.strip(), v.strip()) else: req.add_header("Authorization", AUTH_HEADER.strip()) req.add_header("Accept", "text/csv, */*") return req def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes: attempt, backoff = 0, 1.0 while True: try: with urlopen(req, timeout=timeout) as r: return r.read() except HTTPError as e: if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise except URLError: if attempt < max_retries: time.sleep(backoff); attempt += 1; backoff *= 2; continue raise def _safe_name(url: str) -> str: # Create a short, filesystem-safe token for the URL return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100] def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str: key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv" s3.put_object( Bucket=BUCKET, Key=key, Body=blob, ContentType="text/csv", ) return key def lambda_handler(event=None, context=None): assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL" run_ts = int(time.time()) written = [] for i, url in enumerate(IOC_URLS): req = _build_request(url) data = _http_bytes(req) key = _put_csv(data, url, run_ts, i) written.append({"url": url, "s3_key": key, "bytes": len(data)}) return {"ok": True, "written": written} if __name__ == "__main__": print(json.dumps(lambda_handler(), indent=2))
[構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。
次の環境変数を入力し、実際の値に置き換えます。
キー 例 S3_BUCKET
csv-ioc
S3_PREFIX
csv-ioc/
IOC_URLS
https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
AUTH_HEADER
Authorization: Bearer <token>
TIMEOUT
60
関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
1 hour
)。 - ターゲット: Lambda 関数。
- 名前:
csv-custom-ioc-1h
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 一意の名前を入力します(例:
secops-reader
)。 - アクセスタイプ: [Access key - Programmatic access] を選択します。
- [ユーザーを作成] をクリックします。
- ユーザー: 一意の名前を入力します(例:
- 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] >
secops-reader
を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成] JSON エディタで次のポリシーを入力します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::<your-bucket>/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::<your-bucket>" } ] }
名前を
secops-reader-policy
に設定します。[ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。
[セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。
CSV をダウンロードします(これらの値はフィードに入力されます)。
CSV カスタム IOC ファイルを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [Add New Feed] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
CSV Custom IOC
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [CSV カスタム IOC] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://csv-ioc/csv-ioc/
- Source deletion options: 必要に応じて削除オプションを選択します。
- 最大ファイル経過時間: デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
UDM マッピング テーブル
ログフィールド | UDM マッピング | ロジック |
---|---|---|
asn |
entity.metadata.threat.detection_fields.asn_label.value | 「asn」フィールドから直接マッピングされます。 |
category |
entity.metadata.threat.category_details | 「category」フィールドから直接マッピングされます。 |
classification |
entity.metadata.threat.category_details | 「classification - 」に追加され、「entity.metadata.threat.category_details」フィールドにマッピングされます。 |
column2 |
entity.entity.hostname | [category] が「.?ip」または「.?proxy」と一致し、[not_ip] が true の場合、「entity.entity.hostname」にマッピングされます。 |
column2 |
entity.entity.ip | [category] が「.?ip」または「.?proxy」と一致し、[not_ip] が false の場合、「entity.entity.ip」に統合されます。 |
confidence |
entity.metadata.threat.confidence_score | 浮動小数点数に変換され、「entity.metadata.threat.confidence_score」フィールドにマッピングされます。 |
country |
entity.entity.location.country_or_region | 「country」フィールドから直接マッピングされます。 |
date_first |
entity.metadata.threat.first_discovered_time | ISO8601 として解析され、「entity.metadata.threat.first_discovered_time」フィールドにマッピングされます。 |
date_last |
entity.metadata.threat.last_updated_time | ISO8601 として解析され、「entity.metadata.threat.last_updated_time」フィールドにマッピングされます。 |
detail |
entity.metadata.threat.summary | 「detail」フィールドから直接マッピングされます。 |
detail2 |
entity.metadata.threat.description | 「detail2」フィールドから直接マッピングされます。 |
domain |
entity.entity.hostname | 「domain」フィールドから直接マッピングされます。 |
email |
entity.entity.user.email_addresses | 「entity.entity.user.email_addresses」フィールドに統合されます。 |
id |
entity.metadata.product_entity_id | 「id - 」に追加され、「entity.metadata.product_entity_id」フィールドにマッピングされます。 |
import_session_id |
entity.metadata.threat.detection_fields.import_session_id_label.value | 「import_session_id」フィールドから直接マッピングされます。 |
itype |
entity.metadata.threat.detection_fields.itype_label.value | 「itype」フィールドから直接マッピングされます。 |
lat |
entity.entity.location.region_latitude | 浮動小数点数に変換され、「entity.entity.location.region_latitude」フィールドにマッピングされます。 |
lon |
entity.entity.location.region_longitude | 浮動小数点数に変換され、「entity.entity.location.region_longitude」フィールドにマッピングされます。 |
maltype |
entity.metadata.threat.detection_fields.maltype_label.value | 「maltype」フィールドから直接マッピングされます。 |
md5 |
entity.entity.file.md5 | 「md5」フィールドから直接マッピングされます。 |
media |
entity.metadata.threat.detection_fields.media_label.value | 「media」フィールドから直接マッピングされます。 |
media_type |
entity.metadata.threat.detection_fields.media_type_label.value | 「media_type」フィールドから直接マッピングされます。 |
org |
entity.metadata.threat.detection_fields.org_label.value | 「org」フィールドから直接マッピングされます。 |
resource_uri |
entity.entity.url | [itype] が「(ip |
resource_uri |
entity.metadata.threat.url_back_to_product | [itype] が「(ip |
score |
entity.metadata.threat.confidence_details | 「score」フィールドから直接マッピングされます。 |
severity |
entity.metadata.threat.severity | 大文字に変換され、「LOW」、「MEDIUM」、「HIGH」、「CRITICAL」と一致する場合は、「entity.metadata.threat.severity」フィールドにマッピングされます。 |
source |
entity.metadata.threat.detection_fields.source_label.value | 「source」フィールドから直接マッピングされます。 |
source_feed_id |
entity.metadata.threat.detection_fields.source_feed_id_label.value | 「source_feed_id」フィールドから直接マッピングされます。 |
srcip |
entity.entity.ip | [srcip] が空ではなく、[value] と等しくない場合は、「entity.entity.ip」に統合されます。 |
state |
entity.metadata.threat.detection_fields.state_label.value | 「state」フィールドから直接マッピングされます。 |
trusted_circle_ids |
entity.metadata.threat.detection_fields.trusted_circle_ids_label.value | 「trusted_circle_ids」フィールドから直接マッピングされます。 |
update_id |
entity.metadata.threat.detection_fields.update_id_label.value | 「update_id」フィールドから直接マッピングされます。 |
value |
entity.entity.file.full_path | [category] が「.*?file」と一致する場合、「entity.entity.file.full_path」にマッピングされます。 |
value |
entity.entity.file.md5 | [category] が「.*?md5」と一致し、[value] が 32 文字の 16 進文字列の場合、「entity.entity.file.md5」にマッピングされます。 |
value |
entity.entity.file.sha1 | ([category] が「.?md5」と一致し、[value] が 40 文字の 16 進文字列である)場合、または([category] が「.?sha1」と一致し、[value] が 40 文字の 16 進文字列である)場合、「entity.entity.file.sha1」にマッピングされます。 |
value |
entity.entity.file.sha256 | ([category] が「.?md5」と一致し、[value] が 16 進文字列で、[file_type] が「md5」でない)場合、または([category] が「.?sha256」と一致し、[value] が 16 進文字列である)場合、「entity.entity.file.sha256」にマッピングされます。 |
value |
entity.entity.hostname | ([category] が「.?domain」と一致する) または ([category] が「.?ip」または「.*?proxy」と一致し、[not_ip] が true の場合) は、「entity.entity.hostname」にマッピングされます。 |
value |
entity.entity.url | ([category] が「.*?url」と一致する) または ([category] が「url」と一致し、[resource_uri] が空でない) 場合、「entity.entity.url」にマッピングされます。 |
なし | entity.metadata.collected_timestamp | イベントのタイムスタンプが入力されます。 |
なし | entity.metadata.interval.end_time | 253402300799 秒の定数値に設定します。 |
なし | entity.metadata.interval.start_time | イベントのタイムスタンプが入力されます。 |
なし | entity.metadata.vendor_name | 「カスタム IOC」の定数値に設定します。 |
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。