Cloud Dataflow를 사용하여 대규모 로그 처리

Google Cloud Platform(GCP)은 규모가 크고 다양한 로그 분석 작업을 처리하는 데 필요한 확장 가능한 인프라를 제공합니다. 이 가이드에서는 GCP를 사용하여 여러 소스의 로그 항목을 처리하는 분석 파이프라인을 구축하는 방법을 보여줍니다. 사용자는 유의미한 정보를 추출하고 데이터에서 도출된 정보를 유지하여 분석, 검토, 보고에 사용할 수 있도록 로그 데이터를 결합합니다.

개요

애플리케이션이 더 복잡해지면서 로그에 캡처된 데이터를 통해 유용한 정보를 얻는 일은 더 어려워집니다. 로그의 소스가 점점 많아지므로 로그를 수집하여 유용한 정보를 쿼리하는 것이 어려울 수 있습니다. 자체 인프라를 구축하고 운영하고 유지하여 로그 데이터를 규모에 맞게 분석하려면 분산 시스템 및 저장소 운영에 관한 광범위한 전문성이 필요할 것입니다. 이러한 전용 인프라는 단 한 번의 자본 지출로 구축해 용량이 고정되는 경우가 많으므로 초기 투자 이상으로 확장하기가 어렵습니다. 이러한 한계는 데이터를 통해 의미 있고 활용 가능한 분석 정보를 만들어내는 과정을 더디게 만들므로 사업에 영향을 미칠 수 있습니다.

본 솔루션은 다음 다이어그램에 표시된 대로 GCP 제품을 사용하여 이러한 한계를 극복하는 방법을 보여줍니다.

이 솔루션에서는 몇 가지 GCP 구성요소를 사용합니다.

이 솔루션에서는 일련의 샘플 마이크로서비스가 Google Kubernetes Engine(GKE)에서 실행되어 웹사이트를 구현합니다. Stackdriver Logging은 이러한 서비스에서 로그를 수집한 다음 Cloud Storage 버킷에 저장합니다. 그 다음 Cloud Dataflow가 메타데이터를 추출하고 기본 집계를 계산하여 로그를 처리합니다. Cloud Dataflow 파이프라인은 로그 요소를 매일 처리하여 각 날짜의 로그를 기초로 서버 응답 시간에 관한 집계 측정항목을 생성하도록 설계되어 있습니다. 마지막으로 Cloud Dataflow의 출력이 BigQuery 테이블에 로드되며, 여기서 이를 분석하여 비즈니스 인텔리전스를 제공할 수 있습니다. 이 솔루션은 지연 시간이 짧은 비동기 로그 처리를 위해 파이프라인을 스트리밍 모드에서 실행되도록 변경하는 방법도 설명합니다.

이 가이드는 샘플 Cloud Dataflow 파이프라인, 샘플 웹 애플리케이션, 구성 정보, 샘플 실행 단계를 제공합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud Platform 구성요소를 사용합니다.

  • 마이크로서비스 배포를 위한 GKE
  • 로그를 수신하고 내보내기 위한 Logging
  • 일괄 처리 모드로 내보낸 로그를 저장하기 위한 Cloud Storage
  • 스트리밍 모드로 내보낸 로그를 스트리밍하기 위한 Cloud Pub/Sub
  • 로그 데이터를 처리하기 위한 Cloud Dataflow
  • 처리 출력을 저장하고 해당 출력에 관한 다양한 쿼리를 지원하기 위한 BigQuery

프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다. 신규 GCP 사용자는 무료 체험판을 사용할 수 있습니다.

이 가이드를 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않게 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

시작하기 전에

  1. Google 계정에 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. GCP Console의 프로젝트 선택기 페이지에서 GCP 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Google Cloud Platform 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. BigQuery, Cloud Storage, Cloud Pub/Sub, Cloud Dataflow, GKE and Logging필요한 API를 사용 설정합니다.

    API를 사용 설정합니다.

  5. Stackdriver 작업공간을 만듭니다. 작업공간에 대한 자세한 내용은 작업공간 관리를 참조하세요.

    Stackdriver로 이동

환경 설정

이 가이드에서는 Cloud Shell을 사용하여 명령어를 입력합니다. Cloud Shell은 GCP Console의 명령줄에 대한 액세스 권한을 부여하며 GCP에서 개발하는 데 필요한 Cloud SDK 및 기타 도구를 포함합니다. Cloud Shell은 GCP Console 하단에 창으로 표시됩니다. 초기화되는 데 몇 분 정도 걸릴 수 있지만 창은 즉시 표시됩니다.

Cloud Shell을 사용하여 환경을 설정하고 이 가이드에서 사용된 git 저장소를 복제하려면 다음 단계를 따르세요.

  1. GCP Console에서 Cloud Shell을 엽니다.

    Cloud Shell 열기

  2. 방금 만든 프로젝트에서 작업하고 있는지 확인합니다. [YOUR_PROJECT_ID]를 새로 만든 GCP 프로젝트로 바꿉니다.

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. 기본 컴퓨팅 영역을 설정합니다. 이 가이드에서는 us-east1입니다. 프로덕션 환경에 배포하는 경우, 선택한 리전에 배포합니다.

    export REGION=us-east1
    gcloud config set compute/region $REGION
    

샘플 저장소 클론

  • 이 가이드에서 볼 스크립트와 애플리케이션 로직이 포함된 저장소를 클론합니다.

    git clone https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.git
    cd processing-logs-using-dataflow/services
    

환경 변수 구성

# name your bucket
export PROJECT_ID=[YOUR_PROJECT_ID]
# name your GKE cluster
export CLUSTER_NAME=cluster-processing-logs-using-dataflow

# name the bucket for this tutorial
export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

# name the logging sink for this tutorial
export SINK_NAME=sink-processing-logs-using-dataflow

# name the logging sink for this tutorial
export DATASET_NAME=processing_logs_using_dataflow

새 Google Kubernetes Engine 클러스터에 샘플 애플리케이션 배포

# create the cluster and deploy sample services
./cluster.sh $PROJECT_ID $CLUSTER_NAME up

샘플 애플리케이션 배포 정보

이 샘플 배포는 쇼핑 앱 하나를 모델링합니다. 이 샘플에서 사용자는 쇼핑몰 홈페이지를 방문하여 개별 제품을 검색한 후 가까운 오프라인 매장에서 해당 제품을 찾아볼 수 있습니다. 이 앱은 세 가지 마이크로 서비스 즉 HomeService, BrowseService, LocateService로 구성되어 있습니다. 각 서비스는 공유된 네임스페이스의 API 엔드포인트를 통해 이용할 수 있습니다. 사용자는 기본 URL에 /home, /browse, /locate를 추가하여 서비스에 액세스합니다.

이 애플리케이션은 들어오는 HTTP 요청을 stdout에 로깅하도록 구성되어 있습니다.

Stackdriver Logging에 Google Kubernetes Engine 사용하기

이 예에서 마이크로서비스는 Compute Engine 인스턴스의 그룹인 Kubernetes Engine 클러스터나 Kubernetes를 실행하는 노드에서 실행됩니다. 기본적으로 GKE는 모니터링, 상태 확인, 중앙 로깅을 포함한 다수의 서비스를 제공하도록 각 노드를 구성합니다. 이 솔루션은 Logging에 대한 이러한 기본 지원을 사용하여 각 마이크로서비스의 로그를 Cloud Storage로 보냅니다. 정보를 파일에 로깅하는 애플리케이션의 대안으로 Kubernetes를 이용한 클러스터 수준 로깅을 구성할 수 있는데, 이 솔루션에서는 다루지 않습니다.

각 마이크로서비스는 클러스터의 개별 pod에서 실행됩니다. 각 pod는 노드에서 실행되고 GKE 서비스를 사용하여 단일 HTTP 엔드포인트로 노출됩니다.

개별 노드에서 실행되는 마이크로서비스입니다.

클러스터의 각 노드는 로그 메시지를 캡처하는 Stackdriver Logging 에이전트를 실행합니다. Logging에서 로그를 사용할 수 있게 되면 스크립트는 Cloud SDK에서 제공하는 Logging 지원을 사용하여 Cloud Storage 버킷으로 로그를 내보냅니다.

로그 뷰어를 사용하여 Cloud Storage로 내보내도록 로그를 구성할 수도 있습니다. 이 솔루션에서는 Cloud SDK를 사용하는데, 여러 로그를 내보낼 때 필요하기 때문입니다.

Cloud Storage를 로그 내보내기 대상으로 사용할 경우, 개별 JSON 파일에 LogEntry 유형의 로그 항목이 1시간마다 일괄적으로 저장됩니다. 이렇게 구조화된 Logging 항목에는 각 로그 메시지가 어떤 리소스나 인스턴스에 의해 언제 생성됐는지, 심각도 수준이 어느 정도인지 등을 지정하는 추가 메타데이터가 포함됩니다. 다음 Logging 항목 예의 structPayload.log 요소에서 마이크로서비스가 생성한 원래 로그 메시지를 볼 수 있습니다.

 {
    "insertId": "ugjuig3j77zdi",
    "labels": {
        "compute.googleapis.com/resource_name": "fluentd-gcp-v3.2.0-9q4tr",
        "container.googleapis.com/namespace_name": "default",
        "container.googleapis.com/pod_name": "browse-service-rm7v9",
        "container.googleapis.com/stream": "stdout"
    },
    "logName": "projects/processing-logs-at-scale/logs/browse-service",
    "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
    "resource": {
        "labels": {
            "cluster_name": "cluster-processing-logs-using-dataflow",
            "container_name": "browse-service",
            "instance_id": "640697565266753757",
            "namespace_id": "default",
            "pod_id": "browse-service-rm7v9",
            "project_id": "processing-logs-at-scale",
            "zone": "us-east1-d"
        },
        "type": "container"
    },
    "severity": "INFO",
    "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs |      10.142.0.6 | GET      /browse/product/1\n",
    "timestamp": "2019-03-09T00:33:23.743466177Z"
 }

로깅 설정

클러스터가 실행되고 서비스가 배포되면 애플리케이션에 대한 로깅을 구성할 수 있습니다.

Stackdriver Logging 내보내기 싱크를 구성하기 위해서는 kubectl을 사용하여 서비스 이름을 얻으므로 먼저 클러스터에 대한 자격 증명을 가져옵니다.

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION

코드 저장소에서 services/logging.sh는 일괄 처리 또는 스트리밍 모드에 필요한 구성요소를 설정합니다. 스크립트는 다음 매개변수를 허용합니다.

logging.sh [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]

이 가이드에서는 일괄 로깅을 시작합니다.

./logging.sh $PROJECT_ID $BUCKET_NAME batch up

아래 단계는 일괄 처리 모드의 명령어 실행을 보여줍니다.

  1. Cloud Storage 버킷을 만듭니다.

    gsutil -q mb gs://[BUCKET_NAME]

  2. 버킷에 Stackdriver Logging이 액세스하도록 허용합니다.

    gsutil -q acl ch -g cloud-logs@google.com:O gs://[BUCKET_NAME]

  3. 마이크로서비스마다 싱크를 사용하여 Stackdriver 내보내기를 설정합니다.

    gcloud logging sinks create [SINK_NAME] \ storage.googleapis.com/[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

대상 권한 업데이트

대상(이 경우에는 Cloud Storage 버킷)의 권한은 싱크를 만들 때 수정되지 않습니다. 싱크에 쓰기 권한을 부여하도록 Cloud Storage 버킷의 권한 설정을 변경해야 합니다.

Cloud Storage 버킷의 권한을 업데이트하는 방법은 다음과 같습니다.

  1. 싱크의 작성자 ID를 확인합니다.

    1. 로그 뷰어 페이지로 이동합니다.

      로그 뷰어 페이지로 이동

    2. 왼쪽 메뉴에서 내보내기를 선택해 싱크의 작성자 ID를 포함한 싱크 요약을 확인합니다.

    3. 중요: 3개의 싱크 각각에는 Cloud Storage 버킷에 대한 권한이 부여되어야 하는 별도의 서비스 계정 이메일이 있습니다.

  2. GCP Console에서 스토리지 > 브라우저를 클릭합니다.

    브라우저로 이동

  3. 버킷 이름을 클릭해 세부정보 뷰를 엽니다.

  4. 권한을 선택하고 구성원 추가를 클릭합니다.

  5. 역할Storage Object Creator로 설정하고 싱크의 작성자 ID를 입력합니다.

자세한 내용은 대상 권한을 참조하세요.

다음 명령어를 사용하여 로그 객체 경로를 확인할 수 있습니다.

gsutil ls gs://processing-logs-at-scale-processing-logs-using-dataflow/ | grep service

출력에 3개 항목이 모두 포함되면 데이터 파이프라인을 실행하는 단계를 계속 진행할 수 있습니다.

 gs://processing-logs-at-scale-processing-logs-using-dataflow/browse-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/home-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/locate-service/

BigQuery 데이터세트 만들기

bq mk $DATASET_NAME

애플리케이션 서비스에 일부 부하 생성

Apache HTTP 서버 유틸리티 설치

Apache HTTP 서버 벤치마킹 도구(ab)를 사용하여 서비스에 부하를 생성합니다.

sudo apt-get update

sudo apt-get install -y apache2-utils

load.sh 셸 스크립트는 HomeService, BrowseService, LocateService에서 응답을 요청하여 마이크로서비스에 부하를 생성합니다.

부하 집합 하나는 홈 서비스에 대한 하나의 요청과 각각 검색 및 찾기 서비스에 대한 20개의 요청으로 구성됩니다.

아래 옵션은 1,000개의 부하 집합(동시 실행이 3개의 동시 요청으로 설정됨)을 생성합니다.

cd ../services
./load.sh 1000 3

충분한 로그가 만들어지도록 몇 분 동안 실행합니다.

Cloud Dataflow 파이프라인 시작

충분한 양의 트래픽이 서비스에 도달하도록 허용한 후 Dataflow 파이프라인을 시작할 수 있습니다.

이 가이드에서는 Cloud Dataflow 파이프라인을 일괄 처리 모드로 실행합니다. pipeline.sh 셸 스크립트는 수동으로 파이프라인을 시작합니다.

cd ../dataflow
./pipeline.sh $PROJECT_ID $DATASET_NAME $BUCKET_NAME run

Cloud Dataflow 파이프라인 이해

Cloud Dataflow는 여러 가지 데이터 처리 작업에 사용할 수 있습니다. Cloud Dataflow SDK는 지속적으로 업데이트되는 데이터 소스의 제한되지 않은 데이터세트 또는 무한 데이터세트를 포함하여 그 어떤 크기의 데이터세트도 표현할 수 있는 통합 데이터 모델을 제공하며, 본 솔루션의 로그 데이터를 작업하는 데 적합합니다. Cloud Dataflow 관리형 서비스는 일괄 작업과 스트리밍 작업을 모두 실행할 수 있습니다. 즉, 비동기 또는 동기, 실시간, 이벤트 중심 데이터 처리에 하나의 코드베이스를 사용할 수 있다는 뜻입니다.

Cloud Dataflow SDK는 PCollection이라는 특수한 컬렉션 클래스를 통해 간단한 데이터 표현 방식을 제공합니다. SDK는 PTransform 클래스를 통해 내장 및 커스텀 데이터 변환을 제공합니다. Cloud Dataflow에서는 변환이 파이프라인의 처리 로직을 나타냅니다. 변환은 데이터 결합, 값의 수학적 계산, 데이터 출력 필터링, 한 형식에서 다른 형식으로 데이터 변환 등 다양한 처리 작업에 사용될 수 있습니다. 파이프라인, PCollection, 변환, I/O 소스 및 싱크에 관한 자세한 내용은 Dataflow 프로그래밍 모델을 참조하세요.

다음 다이어그램은 Cloud Storage에 저장된 로그 데이터의 파이프라인 작업을 보여줍니다.

파이프라인의 작업 단계입니다.

이 다이어그램이 복잡해 보일지 모르지만 Cloud Dataflow를 사용하면 파이프라인을 쉽게 만들고 사용할 수 있습니다. 다음 섹션에서는 파이프라인의 각 단계별 작업에 대해 설명합니다.

데이터 수신

파이프라인은 세 가지 마이크로 서비스의 로그가 포함되어 있는 Cloud Storage 버킷의 입력을 소비하는 것에서 시작합니다. 각각의 로그 컬렉션은 String 요소의 PCollection이 되는데, 여기서 각 요소는 하나의 LogEntry 객체에 해당합니다. 다음 스니펫에서 homeLogs, browseLogs, locateLogsPCollection<String>: 유형입니다.

homeLogs = p.apply("homeLogsTextRead", TextIO.read().from(options.getHomeLogSource()));
browseLogs = p.apply("browseLogsTextRead", TextIO.read().from(options.getBrowseLogSource()));
locateLogs = p.apply("locateLogsTextRead", TextIO.read().from(options.getLocateLogSource()));

지속적으로 업데이트되는 데이터세트의 문제를 해결하기 위해, Dataflow SDK는 기간 설정이라는 기술을 사용합니다. 기간 설정은 데이터를 개별 요소의 타임스탬프에 따라 PCollection으로 논리적으로 세분화하는 방식으로 작동합니다. 이 경우에는 소스 유형이 TextIO이기 때문에 모든 객체가 처음에는 단일 전역 기간으로 읽히며 이것이 기본 동작입니다.

데이터를 객체로 수집하기

다음 단계에서는 Flatten 작업을 사용하여 개별 마이크로서비스 PCollection을 하나의 PCollection으로 결합합니다.

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

이 작업은 각 소스 PCollection이 동일한 데이터 유형을 포함하고 있고 동일한 전역 기간 설정 전략을 사용하기 때문에 유용합니다. 이 솔루션에서는 각 로그의 소스와 구조가 동일하지만 이 접근법을 소스와 구조가 서로 다른 접근법으로 확대할 수 있을 것입니다.

하나의 PCollection이 만들어졌으면 이제 로그 항목에 여러 가지 단계를 수행하는 커스텀 변환을 사용하여 개별 String 요소를 처리할 수 있습니다. 다음 다이어그램은 해당 단계를 보여줍니다.

변환이 문자열 메시지를 처리하여 로그 메시지를 만듭니다.

  • JSON 문자열을 하나의 Stackdriver Logging LogEntry 자바 객체로 역직렬화합니다.
  • LogEntry 메타데이터에서 타임스탬프를 추출합니다.
  • 정규 표현식을 사용하여 로그 메시지에서 timestamp, responseTime, httpStatusCode, httpMethod, source IP 주소, destination 엔드포인트와 같은 개별 필드를 추출합니다. 이러한 필드를 사용하여 타임스탬프가 지정된 하나의 LogMessage 커스텀 객체를 만듭니다.
  • LogMessage 객체를 새로운 PCollection으로 출력합니다.

다음 코드가 이러한 단계를 실행합니다.

PCollection<LogMessage> allLogMessages = allLogs
  .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

일 단위로 데이터 집계하기

매일 요소를 처리하여 각 날짜의 로그에 기초하여 집계된 측정항목을 생성하는 것이 목표임을 떠올리세요. 이러한 집계를 위해서는 날짜별로 데이터를 세분화하는 윈도우 함수가 필요하며, 이는 PCollection의 모든 LogMessage에 타임스탬프가 있기 때문에 가능합니다. Cloud Dataflow가 매일 PCollection의 파티션을 나누고 나면 기간이 설정된 PCollection을 지원하는 작업이 기간 설정 체계를 반영합니다.

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

기간이 설정된 하나의 PCollection으로 이제 단일 Cloud Dataflow 작업을 실행하여 여러 날에 걸친 세 가지 로그 소스에 관한 집계 일일 측정항목을 계산할 수 있습니다.

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Max.<String>doublesPerKey());
 // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

우선 변환은 다음 다이어그램에서 설명한 대로 LogMessage 객체를 입력으로 가져온 후 키가 되는 대상 엔드포인트를 응답 시간 값에 매핑하는 키-값 쌍의 PCollection을 출력합니다.

집계 일일 측정항목을 계산합니다.

이 PCollection을 사용하여 2개의 집계 측정항목 즉, 대상별 최대 응답 시간과 대상별 평균 응답 시간을 계산할 수 있습니다. PCollection은 계속해서 일별로 파티션이 나눠지므로 각 계산의 출력값은 하루의 로그 데이터를 나타냅니다. 즉, 최종적으로 2개의 PCollection 즉, 날짜별로 대상별 최대 응답 시간을 포함한 것과 날짜별로 대상별 평균 응답 시간을 포함한 것이 출력된다는 뜻입니다.

BigQuery에 데이터 로드

파이프라인의 마지막 단계에서는 다운스트림 분석과 데이터 웨어하우징을 위해 결과로 출력된 PCollection을 BigQuery로 출력합니다.

먼저, 파이프라인은 모든 로그 소스에서 LogMessage 객체를 포함하는 PCollection을 BigQuery TableRow 객체의 PCollection으로 변환합니다. 이 단계는 Cloud Dataflow의 자체 지원 기능을 활용하여 BigQuery를 파이프라인을 위한 싱크로 사용하기 위해 필요합니다.

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

BigQuery 테이블은 정의된 스키마를 필요로 합니다. 본 솔루션에서는 스키마가 기본값 주석을 사용하여 LogAnalyticsPipelineOptions.java에 정의되어 있습니다. 예를 들어 최대 응답 시간 테이블의 스키마는 다음과 같이 정의되어 있습니다.

@Default.String("destination:STRING,aggResponseTime:FLOAT")

집계된 응답-시간 값을 포함하는 PCollection에 관한 작업은 적절한 스키마를 적용하고 테이블이 누락된 경우 테이블을 생성하여 이 값을 TableRow 객체의 PCollection으로 변환합니다.

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

본 솔루션은 항상 기존 데이터에 새 데이터를 추가합니다. 이 파이프라인은 새 로그 데이터를 분석하기 위해 주기적으로 실행되기 때문에 이것이 좋은 방법입니다. 그러나 다른 경우에는 기존 테이블 데이터를 잘라내거나 비어 있는 테이블에만 작성하는 것도 가능합니다

BigQuery에서 데이터 쿼리

BigQuery 콘솔을 이용하면 출력 데이터에 쿼리를 실행하고 추가 분석을 위해 TableauQlikView와 같은 타사 비즈니스 인텔리전스 도구에 연결할 수 있습니다.

  1. GCP Console에서 BigQuery를 엽니다.

    BigQuery 열기

  2. 프로젝트 processing-logs-at-scale을 클릭한 다음 데이터세트 processing_logs_using_dataflow를 클릭합니다.

  3. all_logs_table을 선택한 다음 데이터 창에서 미리보기를 선택하여 모든 로그 테이블의 데이터 샘플을 확인합니다.

  4. 쿼리 편집기에서 다음 쿼리를 입력합니다.

    SELECT *
    FROM `processing_logs_using_dataflow.max_response_time_table`
    ORDER BY aggResponseTime DESC
    LIMIT 100;
    
  5. 쿼리를 실행하려면 실행을 클릭합니다.

    BigQuery 콘솔은 로그 데이터에 대해 쿼리를 실행합니다.

스트리밍 파이프라인 사용하기

이 샘플은 일괄 처리 모드나 스트리밍 모드에서 파이프라인을 실행하기 위한 지원을 포함합니다. 파이프라인을 일괄 처리에서 스트리밍으로 변경하려면 몇 가지 단계만 거치면 됩니다. 우선 Stackdriver Logging 설정이 로깅 정보를 Cloud Storage가 아니라 Cloud Pub/Sub으로 내보냅니다. 그 다음으로 Cloud Dataflow 파이프라인의 입력 소스를 Cloud Storage에서 Cloud Pub/Sub 주제 구독으로 변경합니다. 입력 소스당 하나의 구독이 필요합니다.

Cloud Pub/Sub 파이프라인은 구독을 사용합니다.

logging.sh에 SDK 명령어가 사용되는 것을 확인할 수 있습니다.

Cloud Pub/Sub 입력 데이터에서 생성되는 PCollection은 제한되지 않은 전역 기간을 사용합니다. 단, 개별 항목에는 이미 타임스탬프가 포함되어 있습니다. Stackdriver Logging LogEntry 객체에서 타임스탬프 데이터를 추출할 필요가 없고, 로그 타임스탬프를 추출하여 커스텀 LogMessage 객체를 만들면 된다는 뜻입니다.

Cloud Pub/Sub 파이프라인을 사용할 때 로그의 타임스탬프를 추출할 수 있습니다.

파이프라인의 나머지는 다운스트림 평면화, 변환, 집계, 출력 연산 등이 그대로 유지됩니다.

파이프라인 모니터링

Cloud Dataflow 작업을 실행할 때 Google Cloud Platform Console을 사용하여 진행 상황을 모니터링하고 파이프라인의 각 단계에 관한 정보를 확인할 수 있습니다.

다음 이미지는 예시 파이프라인을 실행할 때의 GCP Console을 보여줍니다.

GCP Console은 실행 중인 Cloud Dataflow 작업을 보여줍니다.

삭제

프로젝트 삭제

  1. GCP Console에서 리소스 관리 페이지로 이동합니다.

    리소스 관리 페이지로 이동

  2. 프로젝트 목록에서 삭제할 프로젝트를 선택하고 삭제 를 클릭합니다.
  3. 대화상자에서 프로젝트 ID를 입력한 후 종료를 클릭하여 프로젝트를 삭제합니다.

모든 구성요소 삭제

특정 환경 변수가 설정 중에 사용된 값으로 여전히 설정되어 있는지 확인합니다.

  1. BigQuery 데이터세트를 삭제합니다.

    bq rm $DATASET_NAME
    
  2. Cloud Logging 내보내기를 비활성화합니다. 이 단계는 내보내기와 지정된 Cloud Storage 버킷을 삭제합니다.

    cd ../services
    ./logging.sh $PROJECT_ID $BUCKET_NAME batch down
    
  3. 샘플 웹 애플리케이션 실행에 사용된 Compute Engine 클러스터를 삭제합니다.

    /cluster.sh $PROJECT_ID $CLUSTER_NAME down
    

솔루션 확장

본 솔루션에서 설명하는 파이프라인과 일련의 작업을 그 밖의 다양한 방식으로 확장할 수 있습니다. 가장 명백한 확장 방법으로는 LogMessage 데이터에 전반에서 추가로 집계를 실시하는 것이 있습니다. 예를 들어 로그 출력에 세션 또는 익명화된 사용자 정보가 포함되어 있다면 사용자 활동에 관한 집계를 생성할 수 있을 것입니다. 또한 ApproximateQuantiles 변환을 사용하여 응답 시간 분산을 생성할 수도 있습니다.

다음 단계

  • 다른 Google Cloud Platform 기능 직접 사용하기. 가이드를 살펴보세요.
  • Google Cloud Platform 제품을 사용하여 엔드 투 엔드 솔루션 빌드 방법 알아보기