AWS Elastic MapReduce 로그 수집

다음에서 지원:

이 문서에서는 AWS Elastic MapReduce (EMR) 로그를 Google Security Operations에 수집하는 방법을 설명합니다. AWS EMR은 대량의 데이터를 신속하게 처리하는 클라우드 네이티브 빅데이터 플랫폼입니다. EMR 로그를 Google SecOps에 통합하면 클러스터 활동을 분석하고 잠재적인 보안 위협을 감지할 수 있습니다.

시작하기 전에

다음 기본 요건이 충족되었는지 확인합니다.

  • Google SecOps 인스턴스
  • AWS에 대한 액세스 권한

Amazon S3 버킷 구성

  1. 이 사용자 가이드(버킷 만들기)에 따라 Amazon S3 버킷을 만듭니다.
  2. 나중에 사용할 수 있도록 버킷 이름리전을 저장합니다.
  3. 이 사용자 가이드(IAM 사용자 만들기)에 따라 사용자를 만듭니다.
  4. 생성된 사용자를 선택합니다.
  5. 보안 사용자 인증 정보 탭을 선택합니다.
  6. 액세스 키 섹션에서 액세스 키 만들기를 클릭합니다.
  7. 사용 사례서드 파티 서비스를 선택합니다.
  8. 다음을 클릭합니다.
  9. 선택사항: 설명 태그를 추가합니다.
  10. 액세스 키 만들기를 클릭합니다.
  11. CSV 파일 다운로드를 클릭하여 나중에 사용할 수 있도록 액세스 키비밀 액세스 키를 저장합니다.
  12. 완료를 클릭합니다.
  13. 권한 탭을 선택합니다.
  14. 권한 정책 섹션에서 권한 추가를 클릭합니다.
  15. 권한 추가를 선택합니다.
  16. 정책 직접 연결을 선택합니다.
  17. AmazonS3FullAccessCloudWatchLogsFullAccess 정책을 검색하여 선택합니다.
  18. 다음을 클릭합니다.
  19. 권한 추가를 클릭합니다.

로그를 전달하도록 AWS EMR을 구성하는 방법

  1. AWS 관리 콘솔에 로그인합니다.
  2. 검색창에 EMR을 입력하고 서비스 목록에서 Amazon EMR을 선택합니다.
  3. 클러스터를 클릭합니다.
  4. 로깅을 사용 설정할 EMR 클러스터를 찾아 선택합니다.
  5. 클러스터 세부정보 페이지에서 수정을 클릭합니다.
  6. 클러스터 수정 화면에서 로깅 섹션으로 이동합니다.
  7. 로깅 사용 설정을 선택합니다.
  8. 로그가 저장될 S3 버킷을 지정합니다.
  9. s3://your-bucket-name/ 형식으로 S3 URI를 지정합니다 (이렇게 하면 모든 EMR 로그가 버킷의 루트에 저장됨).
  10. 다음 로그 유형을 선택합니다.
    • Step logs
    • Application logs
    • YARN logs
    • System logs
    • HDFS Logs (Hadoop을 사용하는 경우)
  11. 저장을 클릭합니다.

피드 설정

Google SecOps 플랫폼에서 피드를 설정하는 방법은 두 가지입니다.

  • SIEM 설정 > 피드 > 새로 추가
  • 콘텐츠 허브 > 콘텐츠 팩 > 시작하기

AWS EMR 피드를 설정하는 방법

  1. Amazon Cloud Platform 팩을 클릭합니다.
  2. AWS EMR 로그 유형을 찾습니다.
  3. 다음 필드에 값을 지정합니다.

    • 소스 유형: Amazon SQS V2
    • Queue Name: 읽어올 SQS 큐 이름
    • S3 URI: 버킷 URI입니다.
      • s3://your-log-bucket-name/
        • your-log-bucket-name을 실제 S3 버킷 이름으로 바꿉니다.
    • 소스 삭제 옵션: 수집 환경설정에 따라 삭제 옵션을 선택합니다.

    • 최대 파일 기간: 지난 일수 동안 수정된 파일을 포함합니다. 기본값은 180일입니다.

    • SQS 대기열 액세스 키 ID: 20자리 영숫자 문자열인 계정 액세스 키입니다.

    • SQS 대기열 보안 비밀 액세스 키: 40자로 된 영숫자 문자열인 계정 액세스 키입니다.

    고급 옵션

    • 피드 이름: 피드를 식별하는 미리 채워진 값입니다.
    • 애셋 네임스페이스: 피드와 연결된 네임스페이스입니다.
    • 수집 라벨: 이 피드의 모든 이벤트에 적용되는 라벨입니다.
  4. 피드 만들기를 클릭합니다.

이 제품군 내에서 다양한 로그 유형에 대해 여러 피드를 구성하는 방법에 관한 자세한 내용은 제품별 피드 구성을 참고하세요.

UDM 매핑 테이블

로그 필드 UDM 매핑 논리
app_id additional.fields[].key 'APP' 값은 파서를 통해 할당됩니다.
app_id additional.fields[].value.string_value 원시 로그의 APP 필드에서 직접 매핑됩니다.
app_name additional.fields[].key 'APPNAME' 값은 파서를 통해 할당됩니다.
app_name additional.fields[].value.string_value 원시 로그의 APPNAME 필드에서 직접 매핑됩니다.
blockid additional.fields[].key 'blockid' 값은 파서를 통해 할당됩니다.
blockid additional.fields[].value.string_value 원시 로그의 blockid 필드에서 직접 매핑됩니다.
bytes network.received_bytes 원시 로그의 bytes 필드에서 직접 매핑되며 부호 없는 정수로 변환됩니다.
cliID additional.fields[].key 'cliID' 값은 파서를 통해 할당됩니다.
cliID additional.fields[].value.string_value 원시 로그의 cliID 필드에서 직접 매핑됩니다.
cmd target.process.command_line 원시 로그의 cmd 필드에서 직접 매핑됩니다.
comp_name additional.fields[].key 'COMP' 값은 파서를 통해 할당됩니다.
comp_name additional.fields[].value.string_value 원시 로그의 COMP 필드에서 직접 매핑됩니다.
configuration_version additional.fields[].key 'configuration_version' 값은 파서를 통해 할당됩니다.
configuration_version additional.fields[].value.string_value 원시 로그의 configuration_version 필드에서 직접 매핑되며 문자열로 변환됩니다.
containerID additional.fields[].key 'containerID' 값은 파서를 통해 할당됩니다.
containerID additional.fields[].value.string_value 원시 로그의 CONTAINERID 필드에서 직접 매핑됩니다.
description security_result.description 원시 로그의 description 필드에서 직접 매핑됩니다.
dfs.FSNamesystem.* additional.fields[].key 키는 'dfs.FSNamesystem.'과 JSON 데이터의 키를 연결하여 생성됩니다.
dfs.FSNamesystem.* additional.fields[].value.string_value 값은 dfs.FSNamesystem JSON 객체의 해당 값에서 직접 매핑되고 문자열로 변환됩니다.
duration additional.fields[].key 'duration' 값은 파서를 통해 할당됩니다.
duration additional.fields[].value.string_value 원시 로그의 duration 필드에서 직접 매핑됩니다.
duration network.session_duration.seconds 원시 로그의 duration 필드에서 직접 매핑되며 정수로 변환됩니다.
environment additional.fields[].key 'environment' 값은 파서를 통해 할당됩니다.
environment additional.fields[].value.string_value 원시 로그의 environment 필드에서 직접 매핑됩니다. grok 및 문자열 조작을 사용하여 ip_port 필드에서 추출됩니다. grok 및 문자열 조작을 사용하여 ip_port 필드에서 추출하고 정수로 변환했습니다.
event_type metadata.event_type principaltarget 정보의 존재 여부에 따라 파서 로직에 의해 결정됩니다. NETWORK_CONNECTION, USER_RESOURCE_ACCESS, STATUS_UPDATE 또는 GENERIC_EVENT일 수 있습니다.
file_path target.file.full_path 원시 로그의 file_path 필드에서 직접 매핑됩니다.
host principal.hostname 원시 로그의 host 필드에서 직접 매핑됩니다.
host target.hostname 원시 로그의 host 필드에서 직접 매핑됩니다.
host_ip principal.ip 원시 로그의 host_ip 필드에서 직접 매핑됩니다.
host_port principal.port 원시 로그의 host_port 필드에서 직접 매핑되며 정수로 변환됩니다.
http_url target.url 원시 로그의 http_url 필드에서 직접 매핑됩니다.
index additional.fields[].key 'index' 값은 파서를 통해 할당됩니다.
index additional.fields[].value.string_value 원시 로그의 index 필드에서 직접 매핑됩니다.
kind metadata.product_event_type 원시 로그의 kind 필드에서 직접 매핑됩니다. 'AWS_EMR' 값은 파서를 통해 할당됩니다. 'AWS EMR' 값은 파서를 통해 할당됩니다. 'AMAZON' 값은 파서를 통해 할당됩니다.
offset additional.fields[].key 'offset' 값은 파서를 통해 할당됩니다.
offset additional.fields[].value.string_value 원시 로그의 offset 필드에서 직접 매핑됩니다.
op metadata.product_event_type 원시 로그의 op 또는 OPERATION 필드에서 직접 매핑됩니다.
proto network.application_protocol grok을 사용하여 http_url 필드에서 추출하고 대문자로 변환했습니다.
puppet_version additional.fields[].key 'puppet_version' 값은 파서를 통해 할당됩니다.
puppet_version additional.fields[].value.string_value 원시 로그의 puppet_version 필드에서 직접 매핑됩니다.
queue_name additional.fields[].key 'queue_name' 값은 파서를 통해 할당됩니다.
queue_name additional.fields[].value.string_value 원시 로그의 queue_name 필드에서 직접 매핑됩니다.
report_format additional.fields[].key 'report_format' 값은 파서를 통해 할당됩니다.
report_format additional.fields[].value.string_value 원시 로그의 report_format 필드에서 직접 매핑되며 문자열로 변환됩니다.
resource additional.fields[].key '리소스' 값은 파서를 통해 할당됩니다.
resource additional.fields[].value.string_value 원시 로그의 resource 필드에서 직접 매핑됩니다.
result security_result.action_details 원시 로그의 RESULT 필드에서 직접 매핑됩니다.
security_id additional.fields[].key 'security_id' 값은 파서를 통해 할당됩니다.
security_id additional.fields[].value.string_value 원시 로그의 security_id 필드에서 직접 매핑됩니다.
severity security_result.severity 원시 로그의 severity 필드에서 매핑됩니다. INFOINFORMATIONAL에 매핑되고 WARNMEDIUM에 매핑됩니다.
srvID additional.fields[].key 'srvID' 값은 파서를 통해 할당됩니다.
srvID additional.fields[].value.string_value 원시 로그의 srvID 필드에서 직접 매핑됩니다.
status additional.fields[].key 'status' 값은 파서를 통해 할당됩니다.
status additional.fields[].value.string_value 원시 로그의 status 필드에서 직접 매핑됩니다.
summary security_result.summary 원시 로그의 summary 필드에서 직접 매핑됩니다.
target_app target.application 원시 로그의 TARGET 필드에서 직접 매핑됩니다.
target_ip target.ip 원시 로그의 target_ip 또는 IP 필드에서 직접 매핑됩니다.
target_port target.port 원시 로그의 target_port 필드에서 직접 매핑되며 정수로 변환됩니다.
timestamp metadata.event_timestamp 원시 로그의 timestamp 필드에서 직접 매핑되며 ISO8601 타임스탬프로 파싱됩니다.
timestamp event.timestamp 원시 로그의 timestamp 필드에서 직접 매핑되며 ISO8601 타임스탬프로 파싱됩니다.
trade_date additional.fields[].key 'trade_date' 값은 파서를 통해 할당됩니다.
trade_date additional.fields[].value.string_value 원시 로그의 trade_date 필드에서 직접 매핑됩니다.
transaction_uuid additional.fields[].key 'transaction_uuid' 값은 파서를 통해 할당됩니다.
transaction_uuid additional.fields[].value.string_value 원시 로그의 transaction_uuid 필드에서 직접 매핑됩니다.
type additional.fields[].key 'type' 값은 파서를 통해 할당됩니다.
type additional.fields[].value.string_value 원시 로그의 type 필드에서 직접 매핑됩니다.
user target.user.userid 원시 로그의 USER 또는 ugi 필드에서 직접 매핑됩니다.

도움이 더 필요하신가요? 커뮤니티 회원 및 Google SecOps 전문가로부터 답변을 받으세요.