파이프라인 메시지 로깅

Apache Beam SDK에서 기본 제공되는 로깅 인프라를 사용하여 파이프라인 실행 중에 정보를 로깅할 수 있습니다. Google Cloud Console을 사용하여 파이프라인 실행 중과 실행 후에 로깅 정보를 모니터링 할 수 있습니다.

파이프라인에 로그 메시지 추가

자바: SDK 2.x

자바용 Apache Beam SDK는 오픈소스 SLF4J(Simple Logging Facade for Java) 라이브러리를 통한 작업자 메시지 로깅을 권장합니다. 자바용 Apache Beam SDK가 필요한 로깅 인프라를 구현하므로, 자바 코드에서는 SLF4J API만 가져옵니다. 그런 다음 파이프라인 코드 내에서 메시지 로깅을 사용 설정하도록 로거를 인스턴스화합니다.

기존 코드 또는 라이브러리의 경우, 자바용 Apache Beam SDK는 로깅 인프라를 추가로 설정합니다. 이러한 설정은 작업자에서 실행 시 다음 자바용 로깅 라이브러리를 통해 생성되는 로그 메시지를 캡처하기 위해 수행됩니다.

Python

Python 용 Apache Beam SDK는 파이프라인 작업자가 로그 메시지를 출력하게 하는 logging.라이브러리 패키지를 제공합니다. 라이브러리 함수를 사용하려면 라이브러리를 가져와야 합니다.

import logging

자바: SDK 1.x

작업자 로그 메시지 코드 예

자바: SDK 2.x

Apache Beam WordCount 예는 처리된 텍스트 줄에서 'love' 단어가 발견되면 로그 메시지를 출력하도록 수정될 수 있습니다. 추가된 코드는 아래에서 굵게 표시되었습니다(주변 코드는 이해를 돕기 위해 포함됨).

 package org.apache.beam.examples;
 // Import SLF4J packages.
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 ...
 public class WordCount {
   ...
   static class ExtractWordsFn extends DoFn<String, String> {
     // Instantiate Logger.
     // Suggestion: As shown, specify the class name of the containing class
     // (WordCount).
     private static final Logger LOG = LoggerFactory.getLogger(WordCount.class);
     ...
     @ProcessElement
     public void processElement(ProcessContext c) {
       ...
       // Output each word encountered into the output PCollection.
       for (String word : words) {
         if (!word.isEmpty()) {
           c.output(word);
         }
         // Log INFO messages when the word "love" is found.
         if(word.toLowerCase().equals("love")) {
           LOG.info("Found " + word.toLowerCase());
         }
       }
     }
   }
 ... // Remaining WordCount example code ...

Python

Apache Beam wordcount.py 예는 처리된 텍스트 줄에서 'love' 단어가 발견되면 로그 메시지를 출력하도록 수정될 수 있습니다.

# import Python logging module.
import logging

class ExtractWordsFn(beam.DoFn):
  def process(self, element):
    words = re.findall(r'[A-Za-z\']+', element)
    for word in words:
      yield word

      if word.lower() == 'love':
        # Log using the root logger at info or higher levels
        logging.info('Found : %s', word.lower())

# Remaining WordCount example code ...

자바: SDK 1.x

자바: SDK 2.x

수정된 WordCount 파이프라인이 로컬 파일에 보낸 출력(--output=./local-wordcounts)이 있는 기본 DirectRunner를 사용하여 로컬로 실행되는 경우, Console 출력에 추가된 로그 메시지가 포함됩니다.

INFO: Executing pipeline using the DirectRunner.
...
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
Feb 11, 2015 1:13:22 PM org.apache.beam.examples.WordCount$ExtractWordsFn processElement
INFO: Found love
...
INFO: Pipeline execution complete.

기본적으로 INFO 이상으로 표시된 로그 줄만 클라우드 로깅으로 보냅니다. 이 동작을 변경하고 싶다면 파이프라인 작업자 로그 수준 설정을 참조하세요.

Python

수정된 WordCount 파이프라인이 로컬 파일에 보낸 출력(--output=./local-wordcounts)이 있는 기본 DirectRunner를 사용하여 로컬로 실행되는 경우, Console 출력에 추가된 로그 메시지가 포함됩니다.

INFO:root:Found : love
INFO:root:Found : love
INFO:root:Found : love

기본적으로 INFO 이상으로 표시된 로그 줄만 Cloud Logging으로 보냅니다.

자바: SDK 1.x

로깅 한도 및 제한

작업자 로그 메시지는 작업자당 30초마다 15,000개의 메시지로 제한됩니다. 이 한도에 도달하면 단일 작업자 로그 메시지에 로깅이 제한됨이라는 메시지가 표시됩니다.

Throttling logger worker. It used up its 30s quota for logs in only 12.345s
30초 간격이 지나면 더 이상 메시지가 로깅되지 않습니다. 이 한도는 Apache Beam SDK 및 사용자 코드에서 생성된 로그 메시지에 의해 공유됩니다.

파이프라인 로그 모니터링

Dataflow 서비스에서 파이프라인을 실행하면 Dataflow 모니터링 인터페이스를 사용하여 파이프라인에서 방출한 로그를 볼 수 있습니다.

Dataflow 작업자 로그 예시

수정된 WordCount 파이프라인은 다음 옵션으로 클라우드에서 실행될 수 있습니다.

자바: SDK 2.x

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--tempLocation=gs://<bucket-name>/temp
--stagingLocation=gs://<bucket-name>/binaries

Python

--project=WordCountExample
--output=gs://<bucket-name>/counts
--runner=DataflowRunner
--staging_location=gs://<bucket-name>/binaries

자바: SDK 1.x

작업 요약 및 상태 보기

WordCount 클라우드 파이프라인은 차단 실행을 사용하므로, 파이프라인 실행 중에 콘솔 메시지가 출력됩니다. 작업이 시작되면 Cloud Console 페이지에 대한 링크가 콘솔로 출력되고, 이어서 파이프라인 작업 ID가 출력됩니다.

INFO: To access the Dataflow monitoring console, please navigate to
https://console.developers.google.com/dataflow/job/2017-04-13_13_58_10-6217777367720337669
Submitted job: 2017-04-13_13_58_10-6217777367720337669

콘솔 URL은 제출된 작업의 요약 페이지가 있는 Dataflow 모니터링 인터페이스로 연결됩니다. 해당 화면의 왼쪽에는 동적 실행 그래프, 오른쪽에는 요약 정보가 표시됩니다.

로그 버튼을 누르면 하단 로그 창이 열리고 기본적으로 작업 상태를 전체적으로 보고하는 작업 로그 메시지가 표시됩니다. 최소 심각도 선택기를 사용하여 작업 진행 상태와 상태 메시지를 필터링할 수 있습니다.

그래프에서 파이프라인 단계를 선택하면 뷰는 사용자 코드 및 파이프라인 단계에서 실행되는 생성 코드를 통해 생성된 단계 로그로 변경됩니다.

작업 로그로 돌아가려면 그래프 바깥쪽을 클릭하거나 오른쪽 측면 패널에 있는 닫기 버튼을 사용하여 단계를 선택 취소합니다.

로그 보기

Dataflow 모니터링 인터페이스의 단계 로그에는 가장 최신 로그 메시지만 표시됩니다. 로그 창의 오른쪽에 있는 Google Cloud 작업 제품군 링크를 클릭하면 Cloud Logging의 파이프라인 단계에 대한 모든 단계 로그를 볼 수 있습니다.

Logging에는 파이프라인의 다른 인프라 로그도 포함됩니다. 작업 로그에서 외부 링크 버튼을 클릭하면 다른 로그 유형을 선택하기 위한 메뉴가 있는 Cloud Logging으로 이동합니다.

다음은 모니터링→로그 페이지에서 볼 수 있는 여러 가지 로그 유형의 요약입니다.

  • job-message 로그에는 Dataflow의 다양한 구성요소가 생성하는 작업 수준 메시지가 포함됩니다. 대표적인 예시는 자동 확장 구성, 작업자 시작 또는 종료 시간, 작업 단계 진행 상황과 작업 오류입니다. 사용자 코드 비정상 종료 때문에 발생했으며 작업자 로그에 존재하는 작업자 수준 오류도 job-message 로그를 작성합니다.
  • worker 로그는 Dataflow 작업자가 생성합니다. 작업자는 대부분의 파이프라인 작업을 수행합니다(예: 데이터에 ParDo 적용). Worker 로그에는 개발자 코드와 Dataflow에서 로깅한 메시지가 포함됩니다.
  • worker-startup 로그는 대부분의 Dataflow 작업에 포함되며 시작 프로세스와 관련된 메시지를 캡처할 수 있습니다. 시작 프로세스에는 Cloud Storage에서 작업의 jar를 다운로드하는 것과 이후의 작업자 시작이 포함됩니다. 작업자를 시작하는 데 문제가 있으면 이 로그를 먼저 보는 것이 좋습니다.
  • shuffler 로그에는 병렬 파이프라인 작업 결과를 통합하는 작업자의 메시지가 포함됩니다.
  • dockerkubelet 로그에는 Dataflow 작업자에서 사용되는 이들 공개 기술과 관련된 메시지가 포함됩니다.

파이프라인 작업자 로그 수준 설정

자바: SDK 2.x

자바용 Apache Beam SDK를 통해 작업자에 생성되는 기본 SLF4J 로깅 수준은 INFO입니다. INFO 이상(INFO , WARN, ERROR)의 모든 로그 메시지가 방출됩니다. 더 낮은 SLF4J 로깅 수준(TRACE 또는 DEBUG)을 지원하도록 다른 기본 로그 수준을 설정하거나 코드의 여러 클래스 패키지에 대해 서로 다른 로그 수준을 설정할 수 있습니다.

명령줄 또는 프로그래매틱 방식으로 작업자 로그 수준을 설정할 수 있도록 파이프라인 옵션 두 개가 제공됩니다.

  • --defaultWorkerLogLevel=<level>: 이 옵션을 사용하면 모든 로거를 지정된 기본 수준으로 설정합니다. 예를 들어 다음 명령줄 옵션은 기본 Dataflow INFO 로그 수준을 재정의하고 이를 DEBUG:
    --defaultWorkerLogLevel=DEBUG로 설정합니다.
  • --workerLogLevelOverrides={"<package or class>":"<level>"}: 이 옵션을 사용하면 지정된 패키지 또는 클래스의 로깅 수준을 설정합니다. 예를 들어 com.google.cloud.dataflow 패키지의 기본 파이프라인 로그 수준을 재정의하고 이를 TRACE:
    --workerLogLevelOverrides={"com.google.cloud.dataflow":"TRACE"}
    로 설정하거나, com.google.cloud.Foo 클래스의 기본 파이프라인 로깅 수준을 재정의하고 이를 DEBUG:
    --workerLogLevelOverrides={"com.google.cloud.Foo":"DEBUG"}
    로 설정합니다. JSON 맵
    (--workerLogLevelOverrides={"<package/class>":"<level>","<package/class>":"<level>",...})을 제공하면 여러 재정의를 수행할 수도 있습니다.

다음 예시는 명령줄에서 재정의할 수 있는 기본값과 함께 파이프라인 로깅 옵션을 프로그래매틱 방식으로 설정합니다.

 PipelineOptions options = ...
 DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
 // Overrides the default log level on the worker to emit logs at TRACE or higher.
 loggingOptions.setDefaultWorkerLogLevel(Level.TRACE);
 // Overrides the Foo class and "com.google.cloud.dataflow" package to emit logs at WARN or higher.
 loggingOptions.setWorkerLogLevelOverrides(
     WorkerLogLevelOverride.forClass(Foo.class, Level.WARN),
     WorkerLogLevelOverride.forPackage(Package.getPackage("com.google.cloud.dataflow"), Level.WARN));

Python

이 기능은 아직 Python용 Apache Beam SDK에서 사용할 수 없습니다.

자바: SDK 1.x

실행된 BigQuery 작업 로그 보기

Dataflow 파이프라인에서 BigQuery를 사용하면 데이터 로드, 데이터 내보내기 등의 다양한 작업을 대신 수행하기 위해 BigQuery 작업이 실행됩니다. Dataflow 모니터링 UI에는 로그 패널에서 사용할 수 있는 이러한 BigQuery 작업에 대한 추가 정보가 문제해결 및 모니터링을 위해 존재합니다.

로그 패널에 표시된 BigQuery 작업 정보는 BigQuery 시스템 테이블에 저장되고 로드되므로 기본 BigQuery 테이블을 쿼리할 때 청구 비용이 발생합니다.

프로젝트 설정

BigQuery 작업 정보를 열람하려면 파이프라인에서 Apache Beam 2.24.0 이상을 사용해야 합니다. 하지만 그 때까지는 기본 브랜치에서 빌드된 Apache Beam SDK의 개발 버전을 사용해야 합니다.

자바: SDK 2.x

  1. 다음 프로필을 프로젝트의 pom.xml 파일에 추가합니다.

    <profiles>
      <!-- Additional profiles listed here. -->
      <profile>
        <id>snapshot</id>
        <repositories>
          <repository>
            <id>apache.snapshots</id>
            <url>https://repository.apache.org/content/repositories/snapshots</url>
          </repository>
        </repositories>
      </profile>
    </profiles>
    
  2. 프로젝트를 테스트하거나 실행할 때 프로필 옵션을 pom.xml에 나열된 id 값으로 설정하고 beam.version 속성을 2.24.0-SNAPSHOT 이상으로 설정합니다. 예를 들면 다음과 같습니다.

    mvn test -Psnapshot -Dbeam.version=2.24.0-SNAPSHOT
    

    스냅샷 값에 대한 자세한 내용은 스냅샷 색인을 참조하세요.

Python

  1. GitHub에 로그인합니다.

  2. 성공적으로 완료된 Apache Beam Python SDK 빌드를 보려면 결과 목록으로 이동합니다.

  3. 기본(마스터) 분기에서 빌드된 최근 완료된 작업을 클릭합니다.

  4. 측면 패널에서 Google Cloud Storage 버킷의 파일 목록을 클릭합니다.

  5. 기본 패널에서 Google Cloud Storage 버킷의 파일 목록을 펼칩니다.

  6. 파일 목록에서 Python 프로젝트를 실행하는 로컬 머신이나 위치로 ZIP 파일을 다운로드합니다.

    Cloud Storage 버킷 이름은 beam-wheels-staging이므로 다운로드 URL을 구성할 때 이 이름을 포함해야 합니다. 예를 들면 다음과 같습니다.

    gsutil cp gs://beam-wheels-staging/master/02bf081d0e86f16395af415cebee2812620aff4b-207975627/apache-beam-2.25.0.dev0.zip <var>SAVE_TO_LOCATION</var>
    
  7. 다운로드한 ZIP 파일을 설치합니다.

    pip install apache-beam-2.25.0.dev0.zip
    
  8. Apache Beam 파이프라인을 실행할 때 --sdk_location 플래그를 전달하고 SDK ZIP 파일을 참조합니다.

    --sdk_location=apache-beam-2.25.0.dev0.zip
    

자바: SDK 1.x

BigQuery 작업 세부정보 보기

BigQuery 작업을 나열하려면 BigQuery 작업 탭을 열고 BigQuery 작업의 위치를 선택합니다. 다음으로 BigQuery 작업 로드를 클릭하고 대화상자를 확인합니다. 쿼리가 완료되면 작업 목록이 표시됩니다.

BigQuery 작업 정보 테이블의 BigQuery 작업 로드 버튼

작업 ID, 유형, 기간 등 각 작업에 대한 기본 정보가 제공됩니다.

현재 파이프라인 작업 실행 중에 실행된 BigQuery 작업을 보여주는 테이블입니다.

특정 작업에 대한 자세한 내용을 보려면 추가 정보 열에서 명령줄을 클릭하세요.

명령줄의 모달 창에서 bq jobs describe 명령어를 복사하여 로컬 또는 Cloud Shell에서 실행합니다.

gcloud alpha bq jobs describe BIGQUERY_JOB_ID

bq jobs describe 명령어는 JobStatistics를 출력합니다. 이 기능은 속도가 느리거나 BigQuery 작업 진단 시 유용한 추가 세부정보를 제공합니다.

또는 BigQueryIO를 SQL 쿼리와 함께 사용하면 쿼리 작업이 발급됩니다. 작업에서 사용하는 SQL 쿼리를 보려면 추가 정보 열에서 쿼리 보기를 클릭합니다.