CSV カスタム IOC ファイルを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して CSV カスタム IOC ファイルを Google Security Operations に取り込む方法について説明します。次に、これらのフィールドを UDM にマッピングし、IP、ドメイン、ハッシュなどのさまざまなデータ型を処理して、脅威の詳細、エンティティ情報、重大度レベルで出力を拡充します。

始める前に

  • Google SecOps インスタンス
  • AWS(S3、IAM、Lambda、EventBridge)への特権アクセス
  • 1 つ以上の CSV IOC フィード URL(HTTPS)または CSV を提供する内部エンドポイントへのアクセス

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: csv-ioc)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [CSV ファイルをダウンロード] をクリックして、[アクセスキー] と [シークレット アクセスキー] を保存し、後で使用できるようにします。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションで、[権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索して選択します。
  18. [次へ] をクリックします。
  19. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソール > IAM > ポリシー > ポリシーの作成 > [JSON] タブ に移動します。
  2. 次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutCsvIocObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::csv-ioc/*"
        }
      ]
    }
    
    • 別のバケット名を入力した場合は、csv-ioc を置き換えます。
  3. [次へ] > [ポリシーを作成] をクリックします。

  4. [IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。

  5. 新しく作成したポリシーを関連付けます。

  6. ロールに「WriteCsvIocToS3Role」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 csv_custom_ioc_to_s3
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール WriteCsvIocToS3Role
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(csv_custom_ioc_to_s3.py)を入力します。

    #!/usr/bin/env python3
    # Lambda: Pull CSV IOC feeds over HTTPS and write raw CSV to S3 (no transform)
    # - Multiple URLs (comma-separated)
    # - Optional auth header
    # - Retries for 429/5xx
    # - Unique filenames per page
    # - Sets ContentType=text/csv
    
    import os, time, json
    from urllib.request import Request, urlopen
    from urllib.error import HTTPError, URLError
    import boto3
    
    BUCKET = os.environ["S3_BUCKET"]
    PREFIX = os.environ.get("S3_PREFIX", "csv-ioc/").strip("/")
    IOC_URLS = [u.strip() for u in os.environ.get("IOC_URLS", "").split(",") if u.strip()]
    AUTH_HEADER = os.environ.get("AUTH_HEADER", "")  # e.g., "Authorization: Bearer <token>" OR just "Bearer <token>"
    TIMEOUT = int(os.environ.get("TIMEOUT", "60"))
    
    s3 = boto3.client("s3")
    
    def _build_request(url: str) -> Request:
        if not url.lower().startswith("https://"):
            raise ValueError("Only HTTPS URLs are allowed in IOC_URLS")
        req = Request(url, method="GET")
        # Auth header: either "Header-Name: value" or just "Bearer token" -> becomes Authorization
        if AUTH_HEADER:
            if ":" in AUTH_HEADER:
                k, v = AUTH_HEADER.split(":", 1)
                req.add_header(k.strip(), v.strip())
            else:
                req.add_header("Authorization", AUTH_HEADER.strip())
        req.add_header("Accept", "text/csv, */*")
        return req
    
    def _http_bytes(req: Request, timeout: int = TIMEOUT, max_retries: int = 5) -> bytes:
        attempt, backoff = 0, 1.0
        while True:
            try:
                with urlopen(req, timeout=timeout) as r:
                    return r.read()
            except HTTPError as e:
                if (e.code == 429 or 500 <= e.code <= 599) and attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
            except URLError:
                if attempt < max_retries:
                    time.sleep(backoff); attempt += 1; backoff *= 2; continue
                raise
    
    def _safe_name(url: str) -> str:
        # Create a short, filesystem-safe token for the URL
        return url.replace("://", "_").replace("/", "_").replace("?", "_").replace("&", "_")[:100]
    
    def _put_csv(blob: bytes, url: str, run_ts: int, idx: int) -> str:
        key = f"{PREFIX}/{time.strftime('%Y/%m/%d/%H%M%S', time.gmtime(run_ts))}-url{idx:03d}-{_safe_name(url)}.csv"
        s3.put_object(
            Bucket=BUCKET,
            Key=key,
            Body=blob,
            ContentType="text/csv",
        )
        return key
    
    def lambda_handler(event=None, context=None):
        assert IOC_URLS, "IOC_URLS must contain at least one HTTPS URL"
        run_ts = int(time.time())
        written = []
        for i, url in enumerate(IOC_URLS):
            req = _build_request(url)
            data = _http_bytes(req)
            key = _put_csv(data, url, run_ts, i)
            written.append({"url": url, "s3_key": key, "bytes": len(data)})
        return {"ok": True, "written": written}
    
    if __name__ == "__main__":
        print(json.dumps(lambda_handler(), indent=2))
    
  5. [構成> 環境変数 > 編集 > 新しい環境変数を追加] に移動します。

  6. 次の環境変数を入力し、実際の値に置き換えます。

    キー
    S3_BUCKET csv-ioc
    S3_PREFIX csv-ioc/
    IOC_URLS https://ioc.example.com/feed.csv,https://another.example.org/iocs.csv
    AUTH_HEADER Authorization: Bearer <token>
    TIMEOUT 60
  7. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  8. [CONFIGURATION] タブを選択します。

  9. [全般設定] パネルで、[編集] をクリックします。

  10. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. Amazon EventBridge > Scheduler > スケジュールの作成に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数。
    • 名前: csv-custom-ioc-1h
  3. [スケジュールを作成] をクリックします。

省略可: Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [Users] に移動し、[Add users] をクリックします。
  2. 次の構成の詳細を入力します。
    • ユーザー: 一意の名前を入力します(例: secops-reader)。
    • アクセスタイプ: [Access key - Programmatic access] を選択します。
    • [ユーザーを作成] をクリックします。
  3. 最小限の読み取りポリシー(カスタム)を適用する: [ユーザー] > secops-reader を選択 > [権限] > [権限を追加] > [ポリシーを直接適用] > [ポリシーを作成]
  4. JSON エディタで次のポリシーを入力します。

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::<your-bucket>/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::<your-bucket>"
        }
      ]
    }
    
  5. 名前を secops-reader-policy に設定します。

  6. [ポリシーの作成> 検索/選択> 次へ> 権限を追加] に移動します。

  7. [セキュリティ認証情報] > [アクセスキー] > [アクセスキーを作成] に移動します。

  8. CSV をダウンロードします(これらの値はフィードに入力されます)。

CSV カスタム IOC ファイルを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [Add New Feed] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: CSV Custom IOC)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [CSV カスタム IOC] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://csv-ioc/csv-ioc/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • 最大ファイル経過時間: デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • アセットの名前空間: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
asn entity.metadata.threat.detection_fields.asn_label.value 「asn」フィールドから直接マッピングされます。
category entity.metadata.threat.category_details 「category」フィールドから直接マッピングされます。
classification entity.metadata.threat.category_details 「classification - 」に追加され、「entity.metadata.threat.category_details」フィールドにマッピングされます。
column2 entity.entity.hostname [category] が「.?ip」または「.?proxy」と一致し、[not_ip] が true の場合、「entity.entity.hostname」にマッピングされます。
column2 entity.entity.ip [category] が「.?ip」または「.?proxy」と一致し、[not_ip] が false の場合、「entity.entity.ip」に統合されます。
confidence entity.metadata.threat.confidence_score 浮動小数点数に変換され、「entity.metadata.threat.confidence_score」フィールドにマッピングされます。
country entity.entity.location.country_or_region 「country」フィールドから直接マッピングされます。
date_first entity.metadata.threat.first_discovered_time ISO8601 として解析され、「entity.metadata.threat.first_discovered_time」フィールドにマッピングされます。
date_last entity.metadata.threat.last_updated_time ISO8601 として解析され、「entity.metadata.threat.last_updated_time」フィールドにマッピングされます。
detail entity.metadata.threat.summary 「detail」フィールドから直接マッピングされます。
detail2 entity.metadata.threat.description 「detail2」フィールドから直接マッピングされます。
domain entity.entity.hostname 「domain」フィールドから直接マッピングされます。
email entity.entity.user.email_addresses 「entity.entity.user.email_addresses」フィールドに統合されます。
id entity.metadata.product_entity_id 「id - 」に追加され、「entity.metadata.product_entity_id」フィールドにマッピングされます。
import_session_id entity.metadata.threat.detection_fields.import_session_id_label.value 「import_session_id」フィールドから直接マッピングされます。
itype entity.metadata.threat.detection_fields.itype_label.value 「itype」フィールドから直接マッピングされます。
lat entity.entity.location.region_latitude 浮動小数点数に変換され、「entity.entity.location.region_latitude」フィールドにマッピングされます。
lon entity.entity.location.region_longitude 浮動小数点数に変換され、「entity.entity.location.region_longitude」フィールドにマッピングされます。
maltype entity.metadata.threat.detection_fields.maltype_label.value 「maltype」フィールドから直接マッピングされます。
md5 entity.entity.file.md5 「md5」フィールドから直接マッピングされます。
media entity.metadata.threat.detection_fields.media_label.value 「media」フィールドから直接マッピングされます。
media_type entity.metadata.threat.detection_fields.media_type_label.value 「media_type」フィールドから直接マッピングされます。
org entity.metadata.threat.detection_fields.org_label.value 「org」フィールドから直接マッピングされます。
resource_uri entity.entity.url [itype] が「(ip
resource_uri entity.metadata.threat.url_back_to_product [itype] が「(ip
score entity.metadata.threat.confidence_details 「score」フィールドから直接マッピングされます。
severity entity.metadata.threat.severity 大文字に変換され、「LOW」、「MEDIUM」、「HIGH」、「CRITICAL」と一致する場合は、「entity.metadata.threat.severity」フィールドにマッピングされます。
source entity.metadata.threat.detection_fields.source_label.value 「source」フィールドから直接マッピングされます。
source_feed_id entity.metadata.threat.detection_fields.source_feed_id_label.value 「source_feed_id」フィールドから直接マッピングされます。
srcip entity.entity.ip [srcip] が空ではなく、[value] と等しくない場合は、「entity.entity.ip」に統合されます。
state entity.metadata.threat.detection_fields.state_label.value 「state」フィールドから直接マッピングされます。
trusted_circle_ids entity.metadata.threat.detection_fields.trusted_circle_ids_label.value 「trusted_circle_ids」フィールドから直接マッピングされます。
update_id entity.metadata.threat.detection_fields.update_id_label.value 「update_id」フィールドから直接マッピングされます。
value entity.entity.file.full_path [category] が「.*?file」と一致する場合、「entity.entity.file.full_path」にマッピングされます。
value entity.entity.file.md5 [category] が「.*?md5」と一致し、[value] が 32 文字の 16 進文字列の場合、「entity.entity.file.md5」にマッピングされます。
value entity.entity.file.sha1 ([category] が「.?md5」と一致し、[value] が 40 文字の 16 進文字列である)場合、または([category] が「.?sha1」と一致し、[value] が 40 文字の 16 進文字列である)場合、「entity.entity.file.sha1」にマッピングされます。
value entity.entity.file.sha256 ([category] が「.?md5」と一致し、[value] が 16 進文字列で、[file_type] が「md5」でない)場合、または([category] が「.?sha256」と一致し、[value] が 16 進文字列である)場合、「entity.entity.file.sha256」にマッピングされます。
value entity.entity.hostname ([category] が「.?domain」と一致する) または ([category] が「.?ip」または「.*?proxy」と一致し、[not_ip] が true の場合) は、「entity.entity.hostname」にマッピングされます。
value entity.entity.url ([category] が「.*?url」と一致する) または ([category] が「url」と一致し、[resource_uri] が空でない) 場合、「entity.entity.url」にマッピングされます。
なし entity.metadata.collected_timestamp イベントのタイムスタンプが入力されます。
なし entity.metadata.interval.end_time 253402300799 秒の定数値に設定します。
なし entity.metadata.interval.start_time イベントのタイムスタンプが入力されます。
なし entity.metadata.vendor_name 「カスタム IOC」の定数値に設定します。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。