Delinea SSO ログを収集する

以下でサポートされています。

このドキュメントでは、Amazon S3 を使用して Delinea(旧 Centrify)シングル サインオン(SSO)ログを Google Security Operations に取り込む方法について説明します。パーサーはログを抽出し、JSON 形式と syslog 形式の両方を処理します。Key-Value ペア、タイムスタンプ、その他の関連フィールドを解析して UDM モデルにマッピングします。ログイン失敗、ユーザー エージェント、重大度レベル、認証メカニズム、さまざまなイベントタイプを処理する特定のロジックがあります。エラー イベントの宛先メールアドレスでは、NormalizedUser よりも FailUserName が優先されます。

始める前に

次の前提条件を満たしていることを確認してください。

  • Google SecOps インスタンス。
  • Delinea(Centrify)SSO テナントへの特権アクセス。
  • AWS(S3、Identity and Access Management(IAM)、Lambda、EventBridge)への特権アクセス。

Delinea(Centrify)SSO の前提条件(ID、API キー、組織 ID、トークン)を収集する

  1. Delinea 管理ポータルにログインします。
  2. [アプリ] > [アプリを追加] に移動します。
  3. [OAuth2 Client] を検索して、[追加] をクリックします。
  4. [ウェブアプリを追加] ダイアログで [はい] をクリックします。
  5. [ウェブアプリを追加] ダイアログで [閉じる] をクリックします。
  6. [アプリケーション構成] ページで、次の項目を構成します。
    • [全般] タブ:
      • Application ID: 一意の識別子を入力します(例: secops-oauth-client)。
      • アプリケーション名: わかりやすい名前を入力します(例: SecOps Data Export)。
      • Application Description(アプリケーションの説明): 説明を入力します(例: OAuth client for exporting audit events to SecOps
    • [信頼] タブ:
      • Application is Confidential: このオプションをオンにします。
      • クライアント ID のタイプ: [Confidential] を選択します。
      • 発行されたクライアント ID: この値をコピーして保存します。
      • 発行されたクライアント シークレット: この値をコピーして保存します。
    • [トークン] タブ:
      • Auth methods: [Client Creds] を選択します。
      • トークンタイプ: [Jwt RS256] を選択します。
    • [Scope] タブ:
      • 説明に「SIEM Integration Access」と入力して、スコープ siem を追加します。
      • スコープ redrock/query を追加し、説明を Query API Access にします。
  7. [保存] をクリックして、OAuth クライアントを作成します。
  8. [Core Services] > [Users] > [Add User] に移動します。
  9. サービス ユーザーを構成します。
    • ログイン名: ステップ 6 のクライアント ID を入力します。
    • メールアドレス: 有効なメールアドレスを入力します(必須項目)。
    • 表示名: わかりやすい名前を入力します(例: SecOps Service User)。
    • [Password] と [Confirm Password]: 手順 6 で取得したクライアント シークレットを入力します。
    • ステータス: [OAuth の機密クライアントである] を選択します。
  10. [Create User] をクリックします。
  11. [アクセス > ロール] に移動し、監査イベントのクエリに必要な権限を持つロールにサービスユーザーを割り当てます。
  12. 次の詳細をコピーして安全な場所に保存します。
    • テナント URL: Centrify テナントの URL(例: https://yourtenant.my.centrify.com
    • クライアント ID: 手順 6 で取得したクライアント ID
    • クライアント シークレット: ステップ 6 で取得した値
    • OAuth Application ID: アプリケーション構成から取得

Google SecOps 用に AWS S3 バケットと IAM を構成する

  1. バケットの作成のユーザーガイドに沿って、Amazon S3 バケットを作成します。
  2. 後で参照できるように、バケットの名前リージョンを保存します(例: delinea-centrify-logs-bucket)。
  3. IAM ユーザーの作成のユーザーガイドに沿って、ユーザーを作成します。
  4. 作成したユーザーを選択します。
  5. [セキュリティ認証情報] タブを選択します。
  6. [アクセスキー] セクションで [アクセスキーを作成] をクリックします。
  7. [ユースケース] として [サードパーティ サービス] を選択します。
  8. [次へ] をクリックします。
  9. 省略可: 説明タグを追加します。
  10. [アクセスキーを作成] をクリックします。
  11. [.csv ファイルをダウンロード] をクリックし、[アクセスキー] と [シークレット アクセスキー] を保存して、今後の参照に備えます。
  12. [完了] をクリックします。
  13. [権限] タブを選択します。
  14. [権限ポリシー] セクションの [権限を追加] をクリックします。
  15. [権限を追加] を選択します。
  16. [ポリシーを直接アタッチする] を選択します。
  17. AmazonS3FullAccess ポリシーを検索します。
  18. ポリシーを選択します。
  19. [次へ] をクリックします。
  20. [権限を追加] をクリックします。

S3 アップロードの IAM ポリシーとロールを構成する

  1. AWS コンソールで、[IAM] > [ポリシー] に移動します。
  2. [ポリシーを作成> [JSON] タブ] をクリックします。
  3. 次のポリシーをコピーして貼り付けます。
  4. ポリシー JSON(別のバケット名を入力した場合は delinea-centrify-logs-bucket を置き換えます):

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "AllowPutObjects",
          "Effect": "Allow",
          "Action": "s3:PutObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Sid": "AllowGetStateObject",
          "Effect": "Allow",
          "Action": "s3:GetObject",
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/centrify-sso-logs/state.json"
        }
      ]
    }
    
  5. [次へ] > [ポリシーを作成] をクリックします。

  6. [IAM]> [ロール] に移動します。

  7. [ロールを作成> AWS サービス > Lambda] をクリックします。

  8. 新しく作成したポリシーと、マネージド ポリシー AWSLambdaBasicExecutionRole(CloudWatch ロギング用)を関連付けます。

  9. ロールに「CentrifySSOLogExportRole」という名前を付けて、[ロールを作成] をクリックします。

Lambda 関数を作成する

  1. AWS コンソールで、[Lambda] > [Functions] > [Create function] に移動します。
  2. [Author from scratch] をクリックします。
  3. 次の構成情報を提供してください。

    設定
    名前 CentrifySSOLogExport
    ランタイム Python 3.13
    アーキテクチャ x86_64
    実行ロール CentrifySSOLogExportRole
  4. 関数を作成したら、[コード] タブを開き、スタブを削除して次のコード(CentrifySSOLogExport.py)を貼り付けます。

    import json
    import boto3
    import requests
    import base64
    from datetime import datetime, timedelta
    import os
    from typing import Dict, List, Optional
    
    def lambda_handler(event, context):
        """
        Lambda function to fetch Delinea Centrify SSO audit events and store them in S3
        """
    
        # Environment variables
        S3_BUCKET = os.environ['S3_BUCKET']
        S3_PREFIX = os.environ['S3_PREFIX']
        STATE_KEY = os.environ['STATE_KEY']
    
        # Centrify API credentials
        TENANT_URL = os.environ['TENANT_URL']
        CLIENT_ID = os.environ['CLIENT_ID']
        CLIENT_SECRET = os.environ['CLIENT_SECRET']
        OAUTH_APP_ID = os.environ['OAUTH_APP_ID']
    
        # Optional parameters
        PAGE_SIZE = int(os.environ.get('PAGE_SIZE', '1000'))
        MAX_PAGES = int(os.environ.get('MAX_PAGES', '10'))
    
        s3_client = boto3.client('s3')
    
        try:
            # Get last execution state
            last_timestamp = get_last_state(s3_client, S3_BUCKET, STATE_KEY)
    
            # Get OAuth access token
            access_token = get_oauth_token(TENANT_URL, CLIENT_ID, CLIENT_SECRET, OAUTH_APP_ID)
    
            # Fetch audit events
            events = fetch_audit_events(TENANT_URL, access_token, last_timestamp, PAGE_SIZE, MAX_PAGES)
    
            if events:
                # Store events in S3
                current_timestamp = datetime.utcnow()
                filename = f"{S3_PREFIX}centrify-sso-events-{current_timestamp.strftime('%Y%m%d_%H%M%S')}.json"
    
                store_events_to_s3(s3_client, S3_BUCKET, filename, events)
    
                # Update state with latest timestamp
                latest_timestamp = get_latest_event_timestamp(events)
                update_state(s3_client, S3_BUCKET, STATE_KEY, latest_timestamp)
    
                print(f"Successfully processed {len(events)} events and stored to {filename}")
            else:
                print("No new events found")
    
            return {
                'statusCode': 200,
                'body': json.dumps(f'Successfully processed {len(events) if events else 0} events')
            }
    
        except Exception as e:
            print(f"Error processing Centrify SSO logs: {str(e)}")
            return {
                'statusCode': 500,
                'body': json.dumps(f'Error: {str(e)}')
            }
    
    def get_oauth_token(tenant_url: str, client_id: str, client_secret: str, oauth_app_id: str) -> str:
        """
        Get OAuth access token using client credentials flow
        """
    
        # Create basic auth token
        credentials = f"{client_id}:{client_secret}"
        basic_auth = base64.b64encode(credentials.encode()).decode()
    
        token_url = f"{tenant_url}/oauth2/token/{oauth_app_id}"
    
        headers = {
            'Authorization': f'Basic {basic_auth}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/x-www-form-urlencoded'
        }
    
        data = {
            'grant_type': 'client_credentials',
            'scope': 'siem redrock/query'
        }
    
        response = requests.post(token_url, headers=headers, data=data)
        response.raise_for_status()
    
        token_data = response.json()
        return token_data['access_token']
    
    def fetch_audit_events(tenant_url: str, access_token: str, last_timestamp: str, page_size: int, max_pages: int) -> List[Dict]:
        """
        Fetch audit events from Centrify using the Redrock/query API
        """
    
        query_url = f"{tenant_url}/Redrock/query"
    
        headers = {
            'Authorization': f'Bearer {access_token}',
            'X-CENTRIFY-NATIVE-CLIENT': 'True',
            'Content-Type': 'application/json'
        }
    
        # Build SQL query with timestamp filter
        if last_timestamp:
            sql_query = f"Select * from Event where WhenOccurred > '{last_timestamp}' ORDER BY WhenOccurred ASC"
        else:
            # First run - get events from last 24 hours
            sql_query = "Select * from Event where WhenOccurred > datefunc('now', '-1') ORDER BY WhenOccurred ASC"
    
        payload = {
            "Script": sql_query,
            "args": {
                "PageSize": page_size,
                "Limit": page_size * max_pages,
                "Caching": -1
            }
        }
    
        response = requests.post(query_url, headers=headers, json=payload)
        response.raise_for_status()
    
        response_data = response.json()
    
        if not response_data.get('success', False):
            raise Exception(f"API query failed: {response_data.get('Message', 'Unknown error')}")
    
        # Parse the response
        result = response_data.get('Result', {})
        columns = {col['Name']: i for i, col in enumerate(result.get('Columns', []))}
        raw_results = result.get('Results', [])
    
        events = []
        for raw_event in raw_results:
            event = {}
            row_data = raw_event.get('Row', {})
    
            # Map column names to values
            for col_name, col_index in columns.items():
                if col_name in row_data and row_data[col_name] is not None:
                    event[col_name] = row_data[col_name]
    
            # Add metadata
            event['_source'] = 'centrify_sso'
            event['_collected_at'] = datetime.utcnow().isoformat() + 'Z'
    
            events.append(event)
    
        return events
    
    def get_last_state(s3_client, bucket: str, state_key: str) -> Optional[str]:
        """
        Get the last processed timestamp from S3 state file
        """
        try:
            response = s3_client.get_object(Bucket=bucket, Key=state_key)
            state_data = json.loads(response['Body'].read().decode('utf-8'))
            return state_data.get('last_timestamp')
        except s3_client.exceptions.NoSuchKey:
            print("No previous state found, starting from 24 hours ago")
            return None
        except Exception as e:
            print(f"Error reading state: {e}")
            return None
    
    def update_state(s3_client, bucket: str, state_key: str, timestamp: str):
        """
        Update the state file with the latest processed timestamp
        """
        state_data = {
            'last_timestamp': timestamp,
            'updated_at': datetime.utcnow().isoformat() + 'Z'
        }
    
        s3_client.put_object(
            Bucket=bucket,
            Key=state_key,
            Body=json.dumps(state_data),
            ContentType='application/json'
        )
    
    def store_events_to_s3(s3_client, bucket: str, key: str, events: List[Dict]):
        """
        Store events as JSONL (one JSON object per line) in S3
        """
        # Convert to JSONL format (one JSON object per line)
        jsonl_content = 'n'.join(json.dumps(event, default=str) for event in events)
    
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=jsonl_content,
            ContentType='application/x-ndjson'
        )
    
    def get_latest_event_timestamp(events: List[Dict]) -> str:
        """
        Get the latest timestamp from the events for state tracking
        """
        if not events:
            return datetime.utcnow().isoformat() + 'Z'
    
        latest = None
        for event in events:
            when_occurred = event.get('WhenOccurred')
            if when_occurred:
                if latest is None or when_occurred > latest:
                    latest = when_occurred
    
        return latest or datetime.utcnow().isoformat() + 'Z'
    
  5. [構成] > [環境変数] に移動します。

  6. [編集>新しい環境変数を追加] をクリックします。

  7. 次の表に示す環境変数を入力し、サンプル値を自分の値に置き換えます。

    環境変数

    キー 値の例
    S3_BUCKET delinea-centrify-logs-bucket
    S3_PREFIX centrify-sso-logs/
    STATE_KEY centrify-sso-logs/state.json
    TENANT_URL https://yourtenant.my.centrify.com
    CLIENT_ID your-client-id
    CLIENT_SECRET your-client-secret
    OAUTH_APP_ID your-oauth-application-id
    OAUTH_SCOPE siem
    PAGE_SIZE 1000
    MAX_PAGES 10
  8. 関数が作成されたら、そのページにとどまるか、[Lambda] > [関数] > [your-function] を開きます。

  9. [CONFIGURATION] タブを選択します。

  10. [全般設定] パネルで、[編集] をクリックします。

  11. [Timeout] を [5 minutes (300 seconds)] に変更し、[Save] をクリックします。

EventBridge スケジュールを作成する

  1. [Amazon EventBridge] > [Scheduler] > [スケジュールの作成] に移動します。
  2. 次の構成の詳細を入力します。
    • 定期的なスケジュール: レート1 hour)。
    • ターゲット: Lambda 関数 CentrifySSOLogExport
    • 名前: CentrifySSOLogExport-1h
  3. [スケジュールを作成] をクリックします。

(省略可)Google SecOps 用の読み取り専用の IAM ユーザーと鍵を作成する

  1. AWS コンソールで、[IAM] > [ユーザー] に移動します。
  2. [ユーザーを追加] をクリックします。
  3. 次の構成の詳細を入力します。
    • ユーザー: 「secops-reader」と入力します。
    • アクセスの種類: [アクセスキー - プログラムによるアクセス] を選択します。
  4. [ユーザーを作成] をクリックします。
  5. 最小限の読み取りポリシー(カスタム)を適用します: [ユーザー] > [secops-reader] > [権限]
  6. [権限を追加> ポリシーを直接アタッチする] をクリックします。
  7. [ポリシーを作成] を選択します。
  8. JSON:

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": ["s3:GetObject"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket/*"
        },
        {
          "Effect": "Allow",
          "Action": ["s3:ListBucket"],
          "Resource": "arn:aws:s3:::delinea-centrify-logs-bucket"
        }
      ]
    }
    
  9. 名前 = secops-reader-policy

  10. [ポリシーを作成> 検索/選択 > 次へ] をクリックします。

  11. [権限を追加] をクリックします。

  12. secops-reader のアクセスキーを作成します。[セキュリティ認証情報] > [アクセスキー] に移動します。

  13. [アクセスキーを作成] をクリックします。

  14. .CSV をダウンロードします。(これらの値はフィードに貼り付けます)。

Delinea(Centrify)SSO ログを取り込むように Google SecOps でフィードを構成する

  1. [SIEM 設定] > [フィード] に移動します。
  2. [+ 新しいフィードを追加] をクリックします。
  3. [フィード名] フィールドに、フィードの名前を入力します(例: Delinea Centrify SSO logs)。
  4. [ソースタイプ] として [Amazon S3 V2] を選択します。
  5. [ログタイプ] として [Centrify] を選択します。
  6. [次へ] をクリックします。
  7. 次の入力パラメータの値を指定します。
    • S3 URI: s3://delinea-centrify-logs-bucket/centrify-sso-logs/
    • Source deletion options: 必要に応じて削除オプションを選択します。
    • ファイルの最大経過日数: 指定した日数以内に変更されたファイルを含めます。デフォルトは 180 日です。
    • アクセスキー ID: S3 バケットにアクセスできるユーザー アクセスキー。
    • シークレット アクセスキー: S3 バケットにアクセスできるユーザーのシークレット キー。
    • Asset namespace: アセットの名前空間
    • Ingestion labels: このフィードのイベントに適用されるラベル。
  8. [次へ] をクリックします。
  9. [Finalize] 画面で新しいフィードの設定を確認し、[送信] をクリックします。

UDM マッピング テーブル

ログフィールド UDM マッピング ロジック
AccountID security_result.detection_fields.value 未加工ログの AccountID の値は、key:Account ID を持つ security_result.detection_fields オブジェクトに割り当てられます。
ApplicationName target.application 未加工ログの ApplicationName の値が target.application フィールドに割り当てられます。
AuthorityFQDN target.asset.network_domain 未加工ログの AuthorityFQDN の値が target.asset.network_domain フィールドに割り当てられます。
AuthorityID target.asset.asset_id 未加工ログの AuthorityID の値が target.asset.asset_id フィールドに割り当てられ、「AuthorityID:」という接頭辞が付加されます。
AzDeploymentId security_result.detection_fields.value 未加工ログの AzDeploymentId の値は、key:AzDeploymentId を持つ security_result.detection_fields オブジェクトに割り当てられます。
AzRoleId additional.fields.value.string_value 未加工ログの AzRoleId の値は、key:AzRole Id を持つ additional.fields オブジェクトに割り当てられます。
AzRoleName target.user.attribute.roles.name 未加工ログの AzRoleName の値が target.user.attribute.roles.name フィールドに割り当てられます。
ComputerFQDN principal.asset.network_domain 未加工ログの ComputerFQDN の値が principal.asset.network_domain フィールドに割り当てられます。
ComputerID principal.asset.asset_id 未加工ログの ComputerID の値が principal.asset.asset_id フィールドに割り当てられ、「ComputerId:」という接頭辞が付加されます。
ComputerName about.hostname 未加工ログの ComputerName の値が about.hostname フィールドに割り当てられます。
CredentialId security_result.detection_fields.value 未加工ログの CredentialId の値は、key:Credential Id を持つ security_result.detection_fields オブジェクトに割り当てられます。
DirectoryServiceName security_result.detection_fields.value 未加工ログの DirectoryServiceName の値は、key:Directory Service Name を持つ security_result.detection_fields オブジェクトに割り当てられます。
DirectoryServiceNameLocalized security_result.detection_fields.value 未加工ログの DirectoryServiceNameLocalized の値は、key:Directory Service Name Localized を持つ security_result.detection_fields オブジェクトに割り当てられます。
DirectoryServiceUuid security_result.detection_fields.value 未加工ログの DirectoryServiceUuid の値は、key:Directory Service Uuid を持つ security_result.detection_fields オブジェクトに割り当てられます。
EventMessage security_result.summary 未加工ログの EventMessage の値が security_result.summary フィールドに割り当てられます。
EventType metadata.product_event_type 未加工ログの EventType の値が metadata.product_event_type フィールドに割り当てられます。また、metadata.event_type の決定にも使用されます。
FailReason security_result.summary 未加工ログの FailReason の値は、存在する場合は security_result.summary フィールドに割り当てられます。
FailUserName target.user.email_addresses 未加工ログの FailUserName の値は、存在する場合は target.user.email_addresses フィールドに割り当てられます。
FromIPAddress principal.ip 未加工ログの FromIPAddress の値が principal.ip フィールドに割り当てられます。
ID security_result.detection_fields.value 未加工ログの ID の値は、key:ID を持つ security_result.detection_fields オブジェクトに割り当てられます。
InternalTrackingID metadata.product_log_id 未加工ログの InternalTrackingID の値が metadata.product_log_id フィールドに割り当てられます。
JumpType additional.fields.value.string_value 未加工ログの JumpType の値は、key:Jump Type を持つ additional.fields オブジェクトに割り当てられます。
NormalizedUser target.user.email_addresses 未加工ログの NormalizedUser の値が target.user.email_addresses フィールドに割り当てられます。
OperationMode additional.fields.value.string_value 未加工ログの OperationMode の値は、key:Operation Mode を持つ additional.fields オブジェクトに割り当てられます。
ProxyId security_result.detection_fields.value 未加工ログの ProxyId の値は、key:Proxy Id を持つ security_result.detection_fields オブジェクトに割り当てられます。
RequestUserAgent network.http.user_agent 未加工ログの RequestUserAgent の値が network.http.user_agent フィールドに割り当てられます。
SessionGuid network.session_id 未加工ログの SessionGuid の値が network.session_id フィールドに割り当てられます。
Tenant additional.fields.value.string_value 未加工ログの Tenant の値は、key:Tenant を持つ additional.fields オブジェクトに割り当てられます。
ThreadType additional.fields.value.string_value 未加工ログの ThreadType の値は、key:Thread Type を持つ additional.fields オブジェクトに割り当てられます。
UserType principal.user.attribute.roles.name 未加工ログの UserType の値が principal.user.attribute.roles.name フィールドに割り当てられます。
WhenOccurred metadata.event_timestamp 未加工ログの WhenOccurred の値が解析され、metadata.event_timestamp フィールドに割り当てられます。このフィールドには、最上位の timestamp フィールドも入力されます。ハードコードされた値「SSO」。EventType フィールドによって決定されます。EventType が存在しない場合や、特定の条件に一致しない場合は、デフォルトで STATUS_UPDATE になります。USER_LOGINUSER_CREATIONUSER_RESOURCE_ACCESSUSER_LOGOUT、または USER_CHANGE_PASSWORD のいずれかです。ハードコードされた値「CENTRIFY_SSO」。ハードコードされた値「SSO」。ハードコードされた値「Centrify」。message フィールドにセッション ID が含まれている場合は、抽出されて使用されます。それ以外の場合は、デフォルトで「1」になります。syslog ヘッダーから取得された host フィールドから抽出されます(利用可能な場合)。syslog ヘッダーから取得された pid フィールドから抽出されます(利用可能な場合)。UserGuid が存在する場合は、その値が使用されます。それ以外の場合、message フィールドにユーザー ID が含まれている場合は、抽出されて使用されます。Level が「Info」の場合は「ALLOW」、FailReason が存在する場合は「BLOCK」に設定されます。FailReason が存在する場合は、「AUTH_VIOLATION」に設定されます。Level フィールドによって決定されます。Level が「Info」の場合は「INFORMATIONAL」、Level が「Warning」の場合は「MEDIUM」、Level が「Error」の場合は「ERROR」に設定します。

さらにサポートが必要な場合 コミュニティ メンバーや Google SecOps のプロフェッショナルから回答を得ることができます。