Cloud Dataflow를 사용하여 규모에 따라 로그 처리

Google Cloud Platform은 광범위하고 다양한 로그 분석 작업을 처리하는 데 필요한 확장 가능한 인프라를 제공합니다. 이 솔루션에서는 Cloud Platform을 사용하여 여러 소스의 로그 항목을 처리하는 분석 파이프라인을 구축하는 방법에 대해 알아봅니다. 사용자는 유의미한 정보를 추출하고 데이터에서 지속적으로 정보를 도출하여 분석, 검토, 보고에 사용할 수 있도록 로그 데이터를 결합할 것입니다.

개요

애플리케이션이 더 복잡해지면서 로그에 캡처된 데이터를 통해 유용한 정보를 얻는 일은 더 어려워집니다. 로그의 소스가 점점 많아지므로 로그를 수집하여 유용한 정보를 쿼리하는 것이 어려울 수 있습니다. 자체 인프라를 구축하고 운영하고 유지하여 로그 데이터를 규모에 맞게 분석하려면 분산 시스템 및 저장소 운영에 관한 광범위한 전문성이 필요할 것입니다. 이러한 전용 인프라는 단 한 번의 자본 지출로 구축해 용량이 고정되는 경우가 많으므로 초기 투자 이상으로 확장하기가 어렵습니다. 이러한 한계는 데이터를 통해 의미 있고 활용 가능한 분석 정보를 만들어내는 과정을 더디게 만들므로 사업에 영향을 미칠 수 있습니다.

본 솔루션은 Cloud Platform을 사용하여 이러한 한계를 극복하는 방법을 보여줍니다. 본 솔루션에서는 일련의 샘플 마이크로 서비스를 Google Kubernetes Engine에서 실행하여 웹사이트를 하나 구현합니다. Stackdriver Logging이 이러한 서비스에서 로그를 수집하여 Google Cloud Storage 버킷에 저장합니다. 그 다음 Google Cloud Dataflow가 메타데이터를 추출하고 기본 집계를 계산하여 로그를 처리합니다. Cloud Dataflow 파이프라인은 로그 요소를 매일 처리하여 각 날짜의 로그에 기초하여 서버 응답 시간에 관한 집계 측정항목을 생성하도록 설계되어 있습니다. 마지막으로, Cloud Dataflow의 출력값이 Google BigQuery 테이블에 로드되는데, 여기서 이를 분석하여 비즈니스 인텔리전스를 제공할 수 있습니다. 또한, 본 솔루션은 지연 시간이 짧은 비동기 로그 처리를 위해 파이프라인을 스트리밍 모드에서 실행되도록 변경하는 방법도 설명합니다.

본 솔루션에서는 몇 가지 Cloud Platform 구성요소를 사용합니다.

여기에 포함된 가이드는 샘플 Cloud Dataflow 파이프라인, 샘플 웹 애플리케이션, 구성 정보, 그리고 샘플 실행 절차를 제시합니다.

앱 정보

이 샘플 배포는 쇼핑 앱 하나를 모델링합니다. 이 샘플에서 사용자는 쇼핑몰 홈페이지를 방문하여 개별 제품을 검색한 후 가까운 오프라인 매장에서 해당 제품을 찾아볼 수 있습니다. 이 앱은 세 가지 마이크로 서비스 즉 HomeService, BrowseService, LocateService로 구성되어 있습니다. 각 서비스는 공유된 네임스페이스의 API 엔드포인트를 통해 이용할 수 있습니다. 사용자는 기본 URL에 /home, /browse, /locate를 추가하여 서비스에 액세스합니다.

이 애플리케이션은 들어오는 HTTP 요청을 stdout에 로깅하도록 구성되어 있습니다.

Stackdriver Logging에 Kubernetes Engine 사용하기

이 예에서 마이크로 서비스는 Google Compute Engine 인스턴스의 그룹인 Kubernetes Engine 클러스터나 Kubernetes를 실행하는 노드에서 실행됩니다. 기본적으로 Kubernetes Engine은 각 노드를 모니터링, 상태 확인, 중앙 로깅을 포함한 다수의 서비스를 제공하도록 구성합니다. 본 솔루션은 Stackdriver Logging을 위한 이러한 기본 지원을 이용하여 각 마이크로 서비스의 로그를 Cloud Storage로 보냅니다. 정보를 파일에 로깅하는 애플리케이션의 대안으로서 Kubernetes를 이용한 클러스터 수준 로깅을 구성할 수 있는데, 여기에 대해서는 본 솔루션에서 다루지 않습니다.

각 마이크로 서비스는 클러스터의 개별 포드에서 실행됩니다. 각 포드는 노드에서 실행되고 Kubernetes Engine 서비스를 이용하여 단일 HTTP 엔드포인트로 노출됩니다.

개별 노드에서 실행되는 마이크로 서비스

클러스터의 각 노드는 로그 메시지를 캡처하는 Stackdriver Logging 에이전트를 실행합니다. 로그를 Stackdriver Logging에서 사용할 수 있게 된 후에는 스크립트가 Cloud SDK에서 제공하는 Stackdriver Logging 지원을 이용하여 자동으로 Cloud Storage 버킷에 로그를 내보냅니다. logging.sh에서 이 샘플은 다음 예와 유사한 명령어를 실행합니다.

# Create a Cloud Storage Bucket
gsutil -q mb gs://BUCKET_NAME

# Allow Stackdriver Logging access to the bucket
gsutil -q acl ch -g cloud-logs@google.com:O gs://BUCKET_NAME

# For each microservice, set up Stackdriver Logging exports
gcloud beta logging sinks create SINK_NAME \
  storage.googleapis.com/BUCKET_NAME \
  --log=”kubernetes.home_service…” --project=PROJECT_ID

로그 뷰어를 사용하여 로그가 Cloud Storage로 내보내지도록 구성할 수도 있습니다. 본 솔루션에서는 SDK를 사용하는데, 이는 여러 개의 로그를 내보낼 때 SDK가 필요하기 때문입니다.

Cloud Storage를 로그 내보내기 대상으로 사용할 경우, 개별 JSON 파일에 LogEntry 유형의 로그 항목이 1시간마다 일괄적으로 저장됩니다. 이렇게 구조화된 Cloud Logging 항목에는 각 로그 메시지가 언제 생성되었는지, 어떤 리소스나 인스턴스가 그 로그 메시지를 생성했는지, 그 심각도 수준이 어느 정도인지 등을 지정하는 추가 메타데이터가 포함됩니다. Stackdriver Logging 항목의 다음 예에서는 마이크로 서비스가 생성한 원본 로그 메시지를 structPayload.log 요소에서 확인할 수 있습니다.

{
  "metadata": {
    "projectId": "...",
    "serviceName": "compute.googleapis.com",
    "zone": "us-central1-f",
    "labels": {
      "compute.googleapis.com/resource_id": "4154944251817867710",
      "compute.googleapis.com/resource_type": "instance"
    },
    "timestamp": "2015-09-22T20:01:13Z"
  },
  "insertId": "2015-09-22|13:01:17.636360-07|10.106.196.103|1124257302",
  "log": "kubernetes.browse-service-iaus6_default_sample-browse-service",
  "structPayload": {
    "stream": "stdout",
    "log": "2015/09/22 - 20:01:13 | 404 | 176ns | 10.160.0.1:34790 | GET /browse/46"
  }
}

Cloud Dataflow 파이프라인 생성하기

Cloud Dataflow는 여러 가지 종류의 데이터 처리 작업에 사용할 수 있는 간단하면서도 강력한 시스템입니다. Dataflow SDK는 지속적으로 업데이트되는 데이터 소스의 제한되지 않은 데이터 세트 또는 무한 데이터 세트를 포함하여 그 어떤 크기의 데이터 세트도 표현할 수 있는 통합 데이터 모델을 제공하며, 본 솔루션의 로그 데이터를 작업하는 데 적합합니다. Dataflow 관리형 서비스는 일괄 작업과 스트리밍 작업을 모두 실행할 수 있습니다. 즉, 비동기 또는 동기, 실시간, 이벤트 중심 데이터 처리에 하나의 코드베이스를 사용할 수 있다는 뜻입니다.

Dataflow SDK는 PCollection이라는 특수한 컬렉션 클래스를 통해 간단한 데이터 표현 방식을 제공합니다. SDK는 PTransform 클래스를 통해 내장 및 커스텀 데이터 변환을 제공합니다. Cloud Dataflow에서는 변환이 파이프라인의 처리 로직을 나타냅니다. 변환은 데이터 결합, 값의 수학적 계산, 데이터 출력 필터링, 한 형식에서 다른 형식으로 데이터 변환 등 다양한 처리 작업에 사용될 수 있습니다. 파이프라인, PCollection, 변환, I/O 소스 및 싱크에 관한 자세한 내용은 Dataflow 프로그래밍 모델을 참조하세요.

다음 다이어그램은 Cloud Storage에 저장된 로그 데이터의 파이프라인 작업을 보여줍니다.

Cloud Dataflow 파이프라인은 몇 가지 단계가 있습니다.

이 다이어그램이 복잡해 보일지 모르지만 Cloud Dataflow를 사용하면 파이프라인을 쉽게 만들고 사용할 수 있습니다. 다음 섹션에서는 파이프라인의 각 단계별 작업에 대해 설명합니다.

데이터 수신

파이프라인은 세 가지 마이크로 서비스의 로그가 포함되어 있는 Cloud Storage 버킷의 입력을 소비하는 것에서 시작합니다. 각각의 로그 컬렉션은 String 요소의 PCollection이 되는데, 여기서 각 요소는 하나의 LogEntry 객체에 해당합니다. 다음 스니펫에서 homeLogs, browseLogs, locateLogsPCollection<String>: 유형입니다.

homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));

지속적으로 업데이트되는 데이터 세트의 문제를 해결하기 위해, Dataflow SDK는 기간 설정이라는 기술을 사용합니다. 기간 설정은 데이터를 개별 요소의 타임스탬프에 따라 PCollection으로 논리적으로 세분화하는 방식으로 작동합니다. 이 경우에는 소스 유형이 TextIO이기 때문에 모든 객체가 처음에는 단일 전역 기간으로 읽히며 이것이 기본 동작입니다.

데이터를 객체로 수집하기

다음 단계에서는 Flatten 작업을 사용하여 각 마이크로 서비스 PCollection을 하나의 PCollection으로 결합합니다.

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

이 작업은 각 소스 PCollection이 동일한 데이터 유형을 포함하고 있고 동일한 전역 기간 설정 전략을 사용하기 때문에 유용합니다. 이 솔루션에서는 각 로그의 소스와 구조가 동일하지만 이 접근법을 소스와 구조가 서로 다른 접근법으로 확대할 수 있을 것입니다.

하나의 PCollection이 만들어졌으면 이제 로그 항목에 여러 가지 단계를 수행하는 커스텀 변환을 사용하여 개별 String 요소를 처리할 수 있습니다. 그 단계는 다음과 같습니다.

변환이 문자열 메시지를 처리하여 로그 메시지를 만듭니다.

  • JSON 문자열을 하나의 Stackdriver Logging LogEntry 자바 객체로 역직렬화합니다.
  • LogEntry 메타데이터에서 타임스탬프를 추출합니다.
  • 정규 표현식을 사용하여 로그 메시지에서 timestamp, responseTime, httpStatusCode, httpMethod, source IP 주소, destination 엔드포인트와 같은 개별 필드를 추출합니다. 이러한 필드를 사용하여 타임스탬프가 지정된 하나의 LogMessage 커스텀 객체를 만듭니다.
  • LogMessage 객체를 새로운 PCollection으로 출력합니다.

다음 코드가 이러한 단계를 실행합니다.

PCollection<LogMessage> allLogMessages = allLogs
  .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

일 단위로 데이터 집계하기

매일 요소를 처리하여 각 날짜의 로그에 기초하여 집계된 측정항목을 생성하는 것이 목표임을 떠올리세요. 이러한 집계를 위해서는 날짜별로 데이터를 세분화하는 윈도우 함수가 필요하며, 이는 PCollection의 모든 LogMessage에 타임스탬프가 있기 때문에 가능합니다. Cloud Dataflow가 매일 PCollection의 파티션을 나누고 나면 기간이 설정된 PCollection을 지원하는 작업이 기간 설정 체계를 반영합니다.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply(Window.named("allLogMessageToDaily").<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

기간이 설정된 하나의 PCollection으로 이제 단일 Cloud Dataflow 작업을 실행하여 여러 날에 걸친 세 가지 로그 소스에 관한 집계 일일 측정항목을 계산할 수 있습니다.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Combine.<String,Double,Double>perKey(new Max.MaxDoubleFn()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

우선 변환은 LogMessage 객체를 입력으로 가져온 후 키가 되는 대상 엔드포인트를 응답 시간 값에 매핑하는 키-값 쌍의 PCollection을 출력합니다. 이 PCollection을 사용하여 2개의 집계 측정항목 즉, 대상별 최대 응답 시간과 대상별 평균 응답 시간을 계산할 수 있습니다. PCollection은 계속해서 일별로 파티션이 나눠지므로 각 계산의 출력값은 하루의 로그 데이터를 나타냅니다. 즉, 최종적으로 2개의 PCollection 즉, 날짜별로 대상별 최대 응답 시간을 포함한 것과 날짜별로 대상별 평균 응답 시간을 포함한 것이 출력된다는 뜻입니다.

집계 일일 측정항목 계산

BigQuery에 데이터 로드

파이프라인의 마지막 단계에서는 다운스트림 분석과 데이터 웨어하우징을 위해 결과로 출력된 PCollection을 BigQuery로 출력합니다.

먼저, 파이프라인은 모든 로그 소스에서 LogMessage 객체를 포함하는 PCollection을 BigQuery TableRow 객체의 PCollection으로 변환합니다. 이 단계는 Cloud Dataflow의 자체 지원 기능을 활용하여 BigQuery를 파이프라인을 위한 싱크로 사용하기 위해 필요합니다.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));

BigQuery 테이블은 정의된 스키마를 필요로 합니다. 본 솔루션에서는 스키마가 기본값 주석을 사용하여 LogAnalyticsPipelineOptions.java에 정의되어 있습니다. 예를 들어 최대 응답 시간 테이블의 스키마는 다음과 같이 정의되어 있습니다.

@Default.String("destination:STRING,aggResponseTime:FLOAT")

집계된 응답-시간 값을 포함하는 PCollection에 관한 작업은 적절한 스키마를 적용하고 테이블이 누락된 경우 테이블을 생성하여 이 값을 TableRow 객체의 PCollection으로 변환합니다.

logsAsTableRows.apply(BigQueryIO.Write
  .named("allLogsToBigQuery")
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

본 솔루션은 항상 기존 데이터에 새 데이터를 추가합니다. 이 파이프라인은 새 로그 데이터를 분석하기 위해 주기적으로 실행되기 때문에 이것이 좋은 방법입니다. 그러나 다른 경우에는 기존 테이블 데이터를 잘라내거나 비어 있는 테이블에만 작성하는 것도 가능합니다

데이터 쿼리

BigQuery 콘솔을 이용하면 출력 데이터에 쿼리를 실행하고 추가 분석을 위해 TableauQlikView와 같은 타사 비즈니스 인텔리전스 도구에 연결할 수 있습니다.

BigQuery 콘솔은 로그 데이터에 관해 쿼리를 실행합니다.

스트리밍 파이프라인 사용하기

이 샘플은 일괄 처리 모드나 스트리밍 모드에서 파이프라인을 실행하기 위한 지원을 포함합니다. 파이프라인을 일괄 처리에서 스트리밍으로 변경하려면 몇 가지 단계만 거치면 됩니다. 우선 Stackdriver Logging 설정이 로깅 정보를 Cloud Storage가 아니라 Cloud Pub/Sub으로 내보냅니다. 그 다음으로 Cloud Dataflow 파이프라인의 입력 소스를 Cloud Storage에서 Cloud Pub/Sub 주제 구독으로 변경합니다. 입력 소스당 하나의 구독이 필요합니다.

Cloud Pub/Sub 파이프라인은 구독을 사용합니다.

logging.sh에 SDK 명령어가 사용되는 것을 확인할 수 있습니다.

Cloud Pub/Sub 입력 데이터에서 생성되는 PCollection은 제한되지 않은 전역 기간을 사용합니다. 단, 개별 항목에는 이미 타임스탬프가 포함되어 있습니다. Stackdriver Logging LogEntry 객체에서 타임스탬프 데이터를 추출할 필요가 없고, 로그 타임스탬프를 추출하여 커스텀 LogMessage 객체를 만들면 된다는 뜻입니다.

Cloud Pub/Sub 파이프라인을 사용할 때 로그의 타임스탬프를 추출할 수 있습니다.

파이프라인의 나머지는 다운스트림 평면화, 변환, 집계, 출력 연산 등이 그대로 유지됩니다.

파이프라인 실행

이 Cloud Dataflow 파이프라인을 구성하고 빌드하고 배포하는 절차와 마이크로 서비스를 배포하고 Stackdriver Logging 내보내기를 구성하는 절차는 가이드에서 확인할 수 있습니다. Cloud Dataflow 작업을 실행할 때 Google Cloud Platform 콘솔을 사용하여 진행 상황을 모니터링하고 파이프라인의 각 단계에 관한 정보를 확인할 수 있습니다.

다음 이미지는 예시 파이프라인을 실행할 때의 콘솔 사용자 인터페이스를 보여줍니다.

GCP 콘솔은 실행 중인 Cloud Dataflow 작업을 보여줍니다.

솔루션 확장

본 솔루션에서 설명하는 파이프라인과 일련의 작업을 그 밖의 다양한 방식으로 확장할 수 있습니다. 가장 명백한 확장 방법으로는 LogMessage 데이터에 전반에서 추가로 집계를 실시하는 것이 있습니다. 예를 들어 로그 출력에 세션 또는 익명화된 사용자 정보가 포함되어 있다면 사용자 활동에 관한 집계를 생성할 수 있을 것입니다. 또한, ApproximateQuantiles 변환을 이용하여 응답 시간의 분산을 생성할 수도 있을 것입니다.

리소스 및 비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 몇 가지 Google Cloud Platform 구성요소를 사용합니다.

  • 마이크로 서비스 배포를 위해 Kubernetes Engine
  • 로그를 수신하고 내보내기 위해 Stackdriver Logging
  • 일괄 처리 모드로 내보낸 로그를 저장하기 위해 Cloud Storage
  • 스트리밍 모드로 내보낸 로그를 스트리밍하기 위해 Cloud Pub/Sub
  • 로그 데이터를 처리하기 위해 Cloud Dataflow
  • 프로세싱 출력을 저장하고 그 출력에 관한 다양한 쿼리를 지원하기 위해 BigQuery

이 가이드를 실행하는 비용은 런타임에 따라 다릅니다. 가격 계산기를 사용하면 예상 사용량을 기준으로 예상 비용을 산출할 수 있습니다. Cloud Platform 신규 사용자는 무료 평가판을 사용할 수 있습니다.

가이드

설치 안내 및 소스 코드를 포함한 가이드의 전체 내용은 GitHub(https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow)에서 확인할 수 있습니다.

다음 단계

  • 다른 Google Cloud Platform 기능을 직접 사용해 보세요. 가이드를 살펴보세요.
  • Google Cloud Platform 제품을 사용하여 엔드 투 엔드 솔루션을 구축하는 방법에 대해 알아보세요.
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...