CrowdStrike IDP Services のログを収集する
このドキュメントでは、Amazon S3 を使用して CrowdStrike Identity Protection(IDP)サービスのログを Google Security Operations に取り込む方法について説明します。このインテグレーションでは、CrowdStrike Unified Alerts API を使用して Identity Protection イベントを収集し、組み込みの CS_IDP パーサーで処理するために NDJSON 形式で保存します。
始める前に
次の前提条件を満たしていることを確認してください。
- Google SecOps インスタンス。
- CrowdStrike Falcon Console への特権アクセスと API キーの管理。
- AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。
CrowdStrike Identity Protection の前提条件を取得する
- CrowdStrike Falcon Console にログインします。
- [サポートとリソース] > [API クライアントとキー] に移動します。
- [新しい API クライアントを追加] をクリックします。
- 次の構成の詳細を入力します。
- お客様名: 「
Google SecOps IDP Integration
」と入力します。 - 説明: 「
API client for Google SecOps integration
」と入力します。 - スコープ: アラート: 読み取り(
alerts:read
)スコープを選択します(これには Identity Protection アラートが含まれます)。
- お客様名: 「
- [追加] をクリックします。
- 次の詳細をコピーして安全な場所に保存します。
- クライアント ID
- クライアント シークレット(1 回のみ表示されます)
- ベース URL(例: US-1 の場合は
api.crowdstrike.com
、US-2 の場合はapi.us-2.crowdstrike.com
、EU-1 の場合はapi.eu-1.crowdstrike.com
)
Google SecOps 用に AWS S3 バケットと IAM を構成する
- バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
- 後で参照できるように、バケットの名前とリージョンを保存します(例:
crowdstrike-idp-logs-bucket
)。 - IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
- 作成したユーザーを選択します。
- [セキュリティ認証情報] タブを選択します。
- [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
- [ユースケース] として [サードパーティ サービス] を選択します。
- [次へ] をクリックします。
- 省略可: 説明タグを追加します。
- [アクセスキーを作成] をクリックします。
- [.csv ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
- [完了] をクリックします。
- [権限] タブを選択します。
- [権限ポリシー] セクションの [権限を追加] をクリックします。
- [権限を追加] を選択します。
- [ポリシーを直接アタッチする] を選択します。
- AmazonS3FullAccess ポリシーを検索します。
- ポリシーを選択します。
- [次へ] をクリックします。
- [権限を追加] をクリックします。
S3 アップロードの IAM ポリシーとロールを構成する
- AWS コンソールで、[IAM] > [ポリシー] に移動します。
- [ポリシーを作成> [JSON] タブ] をクリックします。
- 次のポリシーをコピーして貼り付けます。
ポリシー JSON(別のバケット名を入力した場合は
crowdstrike-idp-logs-bucket
を置き換えます):{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowPutObjects", "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" }, { "Sid": "AllowGetStateObject", "Effect": "Allow", "Action": "s3:GetObject", "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/crowdstrike-idp/state.json" } ] }
[次へ] > [ポリシーを作成] をクリックします。
[IAM] > [ロール] > [ロールの作成] > [AWS サービス] > [Lambda] に移動します。
新しく作成したポリシーを関連付けます。
ロールに「
CrowdStrike-IDP-Lambda-Role
」という名前を付けて、[ロールを作成] をクリックします。
Lambda 関数を作成する
- AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
- [Author from scratch] をクリックします。
次の構成情報を提供してください。
設定 値 名前 CrowdStrike-IDP-Collector
ランタイム Python 3.13 アーキテクチャ x86_64 実行ロール CrowdStrike-IDP-Lambda-Role
関数が作成されたら、[コード] タブを開き、スタブを削除して次のコードを貼り付けます。
import json import boto3 import urllib3 import os from datetime import datetime, timezone from urllib.parse import urlencode HTTP = urllib3.PoolManager() def lambda_handler(event, context): """ Fetch CrowdStrike Identity Protection alerts (Unified Alerts API) and store RAW JSON (NDJSON) to S3 for the CS_IDP parser. No transformation is performed. """ # Environment variables s3_bucket = os.environ['S3_BUCKET'] s3_prefix = os.environ['S3_PREFIX'] state_key = os.environ['STATE_KEY'] client_id = os.environ['CROWDSTRIKE_CLIENT_ID'] client_secret = os.environ['CROWDSTRIKE_CLIENT_SECRET'] api_base = os.environ['API_BASE'] s3 = boto3.client('s3') token = get_token(client_id, client_secret, api_base) last_ts = get_last_timestamp(s3, s3_bucket, state_key) # FQL filter for Identity Protection alerts only, newer than checkpoint fql_filter = f"product:'idp'+updated_timestamp:>'{last_ts}'" sort = 'updated_timestamp.asc' # Step 1: Get list of alert IDs all_ids = [] per_page = int(os.environ.get('ALERTS_LIMIT', '1000')) # up to 10000 per SDK docs offset = 0 while True: page_ids = query_alert_ids(api_base, token, fql_filter, sort, per_page, offset) if not page_ids: break all_ids.extend(page_ids) if len(page_ids) < per_page: break offset += per_page if not all_ids: return {'statusCode': 200, 'body': 'No new Identity Protection alerts.'} # Step 2: Get alert details in batches (max 1000 IDs per request) details = [] max_batch = 1000 for i in range(0, len(all_ids), max_batch): batch = all_ids[i:i+max_batch] details.extend(fetch_alert_details(api_base, token, batch)) if details: details.sort(key=lambda d: d.get('updated_timestamp', d.get('created_timestamp', ''))) latest = details[-1].get('updated_timestamp') or details[-1].get('created_timestamp') key = f"{s3_prefix}cs_idp_{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}.json" body = '\n'.join(json.dumps(d, separators=(',', ':')) for d in details) s3.put_object( Bucket=s3_bucket, Key=key, Body=body.encode('utf-8'), ContentType='application/x-ndjson' ) update_state(s3, s3_bucket, state_key, latest) return {'statusCode': 200, 'body': f'Wrote {len(details)} alerts to S3.'} def get_token(client_id, client_secret, api_base): """Get OAuth2 token from CrowdStrike API""" url = f"https://{api_base}/oauth2/token" data = f"client_id={client_id}&client_secret={client_secret}&grant_type=client_credentials" headers = {'Content-Type': 'application/x-www-form-urlencoded'} r = HTTP.request('POST', url, body=data, headers=headers) if r.status != 200: raise Exception(f'Auth failed: {r.status} {r.data}') return json.loads(r.data.decode('utf-8'))['access_token'] def query_alert_ids(api_base, token, fql_filter, sort, limit, offset): """Query alert IDs using filters""" url = f"https://{api_base}/alerts/queries/alerts/v2" params = {'filter': fql_filter, 'sort': sort, 'limit': str(limit), 'offset': str(offset)} qs = urlencode(params) r = HTTP.request('GET', f"{url}?{qs}", headers={'Authorization': f'Bearer {token}'}) if r.status != 200: raise Exception(f'Query alerts failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def fetch_alert_details(api_base, token, composite_ids): """Fetch detailed alert data by composite IDs""" url = f"https://{api_base}/alerts/entities/alerts/v2" body = {'composite_ids': composite_ids} headers = {'Authorization': f'Bearer {token}', 'Content-Type': 'application/json'} r = HTTP.request('POST', url, body=json.dumps(body).encode('utf-8'), headers=headers) if r.status != 200: raise Exception(f'Fetch alert details failed: {r.status} {r.data}') resp = json.loads(r.data.decode('utf-8')) return resp.get('resources', []) def get_last_timestamp(s3, bucket, key, default='2023-01-01T00:00:00Z'): """Get last processed timestamp from S3 state file""" try: obj = s3.get_object(Bucket=bucket, Key=key) state = json.loads(obj['Body'].read().decode('utf-8')) return state.get('last_timestamp', default) except s3.exceptions.NoSuchKey: return default def update_state(s3, bucket, key, ts): """Update last processed timestamp in S3 state file""" state = {'last_timestamp': ts, 'updated': datetime.now(timezone.utc).isoformat()} s3.put_object(Bucket=bucket, Key=key, Body=json.dumps(state).encode('utf-8'), ContentType='application/json')
[構成] > [環境変数] に移動します。
[編集>新しい環境変数を追加] をクリックします。
次の表に示す環境変数を入力し、サンプル値を独自の値に置き換えます。
環境変数
キー 値の例 S3_BUCKET
crowdstrike-idp-logs-bucket
S3_PREFIX
crowdstrike-idp/
STATE_KEY
crowdstrike-idp/state.json
CROWDSTRIKE_CLIENT_ID
<your-client-id>
CROWDSTRIKE_CLIENT_SECRET
<your-client-secret>
API_BASE
api.crowdstrike.com
(US-1)、api.us-2.crowdstrike.com
(US-2)、api.eu-1.crowdstrike.com
(EU-1)ALERTS_LIMIT
1000
(省略可、ページあたり最大 10, 000)関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。
[CONFIGURATION] タブを選択します。
[全般設定] パネルで、[編集] をクリックします。
[Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。
EventBridge スケジュールを作成する
- [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
- 次の構成の詳細を入力します。
- 定期的なスケジュール: レート(
15 minutes
)。 - ターゲット: Lambda 関数
CrowdStrike-IDP-Collector
。 - 名前:
CrowdStrike-IDP-Collector-15m
- 定期的なスケジュール: レート(
- [スケジュールを作成] をクリックします。
(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する
- AWS コンソール > IAM > ユーザーに移動します。
- [ユーザーを追加] をクリックします。
- 次の構成の詳細を入力します。
- ユーザー: 「
secops-reader
」と入力します。 - アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
- ユーザー: 「
- [ユーザーを作成] をクリックします。
- 最小限の読み取りポリシー(カスタム)を関連付ける: [ユーザー] > [secops-reader] > [権限] > [権限を追加] > [ポリシーを直接関連付ける] > [ポリシーを作成]。
JSON:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:GetObject"], "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket/*" }, { "Effect": "Allow", "Action": ["s3:ListBucket"], "Resource": "arn:aws:s3:::crowdstrike-idp-logs-bucket" } ] }
名前 =
secops-reader-policy
。[ポリシーを作成> 検索/選択> 次へ> 権限を追加] をクリックします。
secops-reader
のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。[アクセスキーを作成] をクリックします。
.CSV
をダウンロードします。(これらの値はフィードに貼り付けます)。
CrowdStrike Identity Protection Services のログを取り込むように Google SecOps でフィードを構成する
- [SIEM 設定] > [フィード] に移動します。
- [+ 新しいフィードを追加] をクリックします。
- [フィード名] フィールドに、フィードの名前を入力します(例:
CrowdStrike Identity Protection Services logs
)。 - [ソースタイプ] として [Amazon S3 V2] を選択します。
- [ログタイプ] として [Crowdstrike Identity Protection Services] を選択します。
- [次へ] をクリックします。
- 次の入力パラメータの値を指定します。
- S3 URI:
s3://crowdstrike-idp-logs-bucket/crowdstrike-idp/
- Source deletion options: 必要に応じて削除オプションを選択します。
- ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
- アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
- シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
- アセットの名前空間: アセットの名前空間。
- Ingestion labels: このフィードのイベントに適用されるラベル。
- S3 URI:
- [次へ] をクリックします。
- [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。
さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。