파이프라인 메시지 로깅

Dataflow SDK에서 기본 제공되는 로깅 인프라를 사용하여 파이프라인 실행 중에 정보를 로깅할 수 있습니다 Google Cloud Console을 사용하여 파이프라인 실행 중과 실행 후에 로깅 정보를 모니터링 할 수 있습니다.

파이프라인에 로그 메시지 추가

자바: SDK 2.x

자바용 Apache Beam SDK는 오픈소스 SLF4J(Simple Logging Facade for Java) 라이브러리를 통한 작업자 메시지 로깅을 권장합니다. 자바용 Apache Beam SDK가 필요한 로깅 인프라를 구현하므로, 자바 코드에서는 SLF4J API만 가져옵니다. 그런 다음 파이프라인 코드 내에서 메시지 로깅을 사용 설정하도록 로거를 인스턴스화합니다.

기존 코드 또는 라이브러리의 경우, 자바용 Apache Beam SDK는 로깅 인프라를 추가로 설정합니다. 이러한 설정은 작업자에서 실행 시 다음 자바용 로깅 라이브러리를 통해 생성되는 로그 메시지를 캡처하기 위해 수행됩니다.

Python

Python 용 Apache Beam SDK는 파이프라인 작업자가 로그 메시지를 출력하게 하는 logging.라이브러리 패키지를 제공합니다. 라이브러리 함수를 사용하려면 라이브러리를 가져와야 합니다.

import logging

자바: SDK 1.x

작업자 로그 메시지 코드 예시

자바: SDK 2.x

Apache Beam WordCount 예시는 처리된 텍스트 줄에서 'love'라는 단어가 발견되면 로그 메시지를 출력하도록 수정될 수 있습니다. 추가된 코드는 아래에서 굵게 표시되었습니다(주변 코드는 이해를 돕기 위해 포함됨).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Apache Beam wordcount.py 예시는 처리된 텍스트 줄에서 'love' 단어가 발견되면 로그 메시지를 출력하도록 수정될 수 있습니다.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

자바: SDK 1.x

자바: SDK 2.x

수정된 WordCount 파이프라인이 로컬 파일에 보낸 출력(--output=./local-wordcounts)이 있는 기본 DirectRunner를 사용하여 로컬로 실행되는 경우, Console 출력에 추가된 로그 메시지가 포함됩니다.

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

기본적으로 INFO 이상으로 표시된 로그 줄만 클라우드 로깅으로 보냅니다. 이 동작을 변경하고 싶다면 파이프라인 작업자 로그 수준 설정을 참조하세요.

Python

수정된 WordCount 파이프라인이 로컬 파일에 보낸 출력(--output=./local-wordcounts)이 있는 기본 DirectRunner를 사용하여 로컬로 실행되는 경우, Console 출력에 추가된 로그 메시지가 포함됩니다.

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

기본적으로 INFO 이상으로 표시된 로그 줄만 Cloud Logging으로 보냅니다.

자바: SDK 1.x

파이프라인 로그 모니터링

Dataflow 서비스에서 파이프라인을 실행하면 Dataflow 모니터링 인터페이스를 사용하여 파이프라인에서 방출한 로그를 볼 수 있습니다.

Cloud Dataflow 작업자 로그 예시

수정된 WordCount 파이프라인은 다음 옵션으로 클라우드에서 실행될 수 있습니다.

자바: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

자바: SDK 1.x

작업 요약 및 상태 보기

WordCount 클라우드 파이프라인은 차단 실행을 사용하므로, 파이프라인 실행 중에 콘솔 메시지가 출력됩니다. 작업이 시작되면 Cloud Console 페이지에 대한 링크가 콘솔로 출력되고, 이어서 파이프라인 작업 ID가 출력됩니다.

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

콘솔 URL은 제출된 작업의 요약 페이지가 있는 Dataflow 모니터링 인터페이스로 연결됩니다. 해당 화면의 왼쪽에는 동적 실행 그래프, 오른쪽에는 요약 정보가 표시됩니다.

로그 버튼을 누르면 하단 로그 창이 열리고 기본적으로 작업 상태를 전체적으로 보고하는 작업 로그 메시지가 표시됩니다. 최소 심각도 선택기를 사용하여 작업 진행 상태와 상태 메시지를 필터링할 수 있습니다.

그래프에서 파이프라인 단계를 선택하면 뷰는 사용자 코드 및 파이프라인 단계에서 실행되는 생성 코드를 통해 생성된 단계 로그로 변경됩니다.

작업 로그로 돌아가려면 그래프 바깥쪽을 클릭하거나 오른쪽 측면 패널에 있는 닫기 버튼을 사용하여 단계를 선택 취소합니다.

로그 보기

Dataflow 모니터링 인터페이스의 단계 로그에는 가장 최신 로그 메시지만 표시됩니다. 로그 창의 오른쪽에 있는 Google Cloud 작업 제품군 링크를 클릭하면 Cloud Logging의 파이프라인 단계에 대한 모든 단계 로그를 볼 수 있습니다.

Logging에는 파이프라인의 다른 인프라 로그도 포함됩니다. 작업 로그에서 외부 링크 버튼을 클릭하면 다른 로그 유형을 선택하기 위한 메뉴가 있는 Cloud Logging으로 이동합니다.

다음은 모니터링→로그 페이지에서 볼 수 있는 여러 가지 로그 유형의 요약입니다.

  • job-message 로그에는 Dataflow의 다양한 구성요소가 생성하는 작업 수준 메시지가 포함됩니다. 대표적인 예시는 자동 확장 구성, 작업자 시작 또는 종료 시간, 작업 단계 진행 상황과 작업 오류입니다. 사용자 코드 비정상 종료 때문에 발생했으며 작업자 로그에 존재하는 작업자 수준 오류도 job-message 로그를 작성합니다.
  • worker 로그는 Dataflow 작업자가 생성합니다. 작업자는 대부분의 파이프라인 작업을 수행합니다(예: 데이터에 ParDo 적용). Worker 로그에는 개발자 코드와 Dataflow에서 로깅한 메시지가 포함됩니다.
  • worker-startup 로그는 대부분의 Dataflow 작업에 포함되며 시작 프로세스와 관련된 메시지를 캡처할 수 있습니다. 시작 프로세스에는 Cloud Storage에서 작업의 jar를 다운로드하는 것과 이후의 작업자 시작이 포함됩니다. 작업자를 시작하는 데 문제가 있으면 이 로그를 먼저 보는 것이 좋습니다.
  • shuffler 로그에는 병렬 파이프라인 작업 결과를 통합하는 작업자의 메시지가 포함됩니다.
  • dockerkubelet 로그에는 Dataflow 작업자에서 사용되는 이들 공개 기술과 관련된 메시지가 포함됩니다.

파이프라인 작업자 로그 수준 설정

자바: SDK 2.x

자바용 Apache Beam SDK를 통해 작업자에 생성되는 기본 SLF4J 로깅 수준은 INFO입니다. INFO 이상(INFO , WARN, ERROR)의 모든 로그 메시지가 방출됩니다. 더 낮은 SLF4J 로깅 수준(TRACE 또는 DEBUG)을 지원하도록 다른 기본 로그 수준을 설정하거나 코드의 여러 클래스 패키지에 대해 서로 다른 로그 수준을 설정할 수 있습니다.

명령줄 또는 프로그래매틱 방식으로 작업자 로그 수준을 설정할 수 있도록 파이프라인 옵션 두 개가 제공됩니다.

  • --defaultWorkerLogLevel=<level>: 이 옵션을 사용하면 모든 로거를 지정된 기본 수준으로 설정합니다. 예를 들어 다음 명령줄 옵션은 기본 Dataflow INFO 로그 수준을 재정의하고 이를 DEBUG:
    --defaultWorkerLogLevel=DEBUG로 설정합니다.
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: 이 옵션을 사용하면 지정된 패키지 또는 클래스의 로깅 수준을 설정합니다. 예를 들어 com.google.cloud.dataflow 패키지의 기본 파이프라인 로그 수준을 재정의하고 이를 TRACE:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    로 설정하거나, com.google.cloud.Foo 클래스의 기본 파이프라인 로깅 수준을 재정의하고 이를 DEBUG:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    로 설정합니다. JSON 맵
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})을 제공하면 여러 재정의를 수행할 수도 있습니다.

다음 예시는 명령줄에서 재정의할 수 있는 기본값과 함께 파이프라인 로깅 옵션을 프로그래매틱 방식으로 설정합니다.

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

이 기능은 아직 Python용 Apache Beam SDK에서 사용할 수 없습니다.

자바: SDK 1.x