특정 Dataflow 작업을 선택하면 모니터링 인터페이스가 작업 그래프를 그래픽으로 표시합니다. 콘솔의 작업 그래프 페이지에서는 작업 요약, 작업 로그 및 파이프라인의 각 단계에 대한 정보도 제공합니다.
파이프라인의 작업 그래프는 파이프라인의 각 변환을 상자로 나타냅니다. 각 상자에는 변환 이름과 작업 상태 정보가 있으며 여기에는 다음이 포함됩니다.
- 실행 중: 단계가 실행 중입니다.
- 대기 중: FlexRS 작업 단계가 대기 중입니다.
- 성공: 단계가 성공적으로 완료되었습니다.
- 중지됨: 작업이 중지되어 단계가 중지되었습니다.
- 알 수 없음: 단계에서 상태를 보고하지 못했습니다.
- 실패: 단계를 완료하지 못했습니다.
기본적으로 작업 그래프 페이지에는 그래프 보기가 표시됩니다. 작업 그래프를 표로 보려면 작업 단계 뷰에서 테이블 뷰를 선택합니다. 테이블 뷰에는 동일한 정보가 다른 형식으로 포함됩니다. 테이블 뷰는 다음과 같은 시나리오에 유용합니다.
- 작업에 단계가 많아 작업 그래프를 탐색하기 어렵습니다.
- 특정 속성별로 작업 단계를 정렬하려고 합니다. 예를 들어 테이블을 실제 경과 시간으로 정렬하여 느린 단계를 식별할 수 있습니다.
기본 작업 그래프
파이프라인 코드:자바// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Go// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
작업 그래프:
|
복합 변환
작업 그래프에서 복합 변환(중첩된 하위 변환 여러 개가 포함되어 있는 변환)을 확장할 수 있습니다. 확장 가능한 복합 변환은 그래프에서 화살표로 표시됩니다. 변환을 펼치고 하위 변환을 보려면 화살표를 클릭합니다.
파이프라인 코드:자바// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Go// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
작업 그래프:
|
파이프라인 코드에서 다음 코드를 사용하여 복합 변환을 호출할 수 있습니다.
result = transform.apply(input);
이런 방식으로 호출된 복합 변환은 예상되는 중첩을 생략하므로 Dataflow 모니터링 인터페이스에서 확장된 상태로 나타날 수 있습니다. 파이프라인 실행 시 안정적인 고유 이름에 대한 경고 또는 오류가 발생할 수도 있습니다.
이러한 문제를 방지하려면 다음과 같은 권장 형식을 사용하여 변환을 호출하세요.
result = input.apply(transform);
변환 이름
Dataflow에는 모니터링 작업 그래프에 표시되는 변환 이름을 가져오는 몇 가지 방법이 있습니다. 변환 이름은 Dataflow 모니터링 인터페이스, 로그 파일, 디버깅 도구를 포함하여 공개적으로 표시 가능한 장소에 사용됩니다. 사용자 이름 또는 조직 이름과 같은 개인 식별 정보를 포함하는 이름을 변환하지 마세요.
자바
- 사용자가 변환을 적용할 때 Dataflow는 할당 받은 이름을 사용할 수 있습니다.
apply
메서드에 제공하는 첫 번째 인수가 변환 이름이 됩니다. - Dataflow는 맞춤 변환을 빌드하는 경우 클래스 이름에서,
ParDo
와 같은 핵심 변환을 사용하는 경우DoFn
함수 객체 이름에서 변환 이름을 유추할 수 있습니다.
Python
- 사용자가 변환을 적용할 때 Dataflow는 할당 받은 이름을 사용할 수 있습니다. 변환의
label
인수를 지정하여 변환 이름을 설정할 수 있습니다. - Dataflow는 맞춤 변환을 빌드하는 경우 클래스 이름에서,
ParDo
와 같은 핵심 변환을 사용하는 경우DoFn
함수 객체 이름에서 변환 이름을 유추할 수 있습니다.
Go
- 사용자가 변환을 적용할 때 Dataflow는 할당 받은 이름을 사용할 수 있습니다.
Scope
를 지정하여 변환 이름을 설정하세요. DoFn
구조를 사용하는 경우 구조체 이름에서, 또는 함수 이름DoFn
을 사용하는 경우 함수 이름으로부터 Dataflow가 변환 이름을 유추할 수 있습니다.
측정항목 이해하기
이 섹션에서는 작업 그래프와 연결된 측정항목에 대해 자세히 설명합니다.
실제 경과 시간
단계를 클릭하면 단계 정보 패널에 실제 경과 시간 측정항목이 표시됩니다. 실제 경과 시간은 모든 작업자의 모든 스레드에서 다음 작업에 소요된 대략적인 총 시간을 나타냅니다.
- 단계 초기화
- 데이터 처리
- 데이터 무작위 섞기
- 단계 종료
복합 단계의 경우 실제 경과 시간은 각 구성 단계에서 소요된 시간의 합계를 나타냅니다. 이 추정값을 통해 속도가 느린 단계를 식별하고 파이프라인의 어떤 부분에서 필요한 것보다 더 많은 시간이 소요되는지 진단할 수 있습니다.
부차 입력 측정항목
부차 입력 측정항목은 부차 입력 액세스 패턴과 알고리즘이 파이프라인 성능에 어떻게 영향을 미치는지 보여줍니다. 파이프라인에서 부차 입력을 사용하면 Dataflow가 컬렉션을 디스크와 같은 영구 계층에 기록하고 변환을 이 영구 컬렉션에서 읽습니다. 이러한 읽기와 쓰기는 작업 실행 시간에 영향을 미칩니다.
Dataflow 모니터링 인터페이스는 부차 입력 컬렉션을 생성하거나 사용하는 변환을 선택할 때 부차 입력 측정항목을 표시합니다. 단계 정보 패널의 부차 입력 측정항목 섹션에서 측정항목을 볼 수 있습니다.
부차 입력을 만드는 변환
선택한 변환이 부차 입력 컬렉션을 만드는 경우 부차 입력 측정항목 섹션에는 다음 측정항목과 함께 컬렉션 이름이 표시됩니다.
- 작성 소요 시간: 부차 입력 컬렉션을 작성하는 데 소요된 시간입니다.
- 작성한 바이트: 부차 입력 컬렉션에 작성한 총 바이트 수입니다.
- 부차 입력의 읽기 소요 시간 및 바이트: 부차 입력 컬렉션을 소비하는 모든 변환(부차 입력 소비자라고도 함)에 대한 추가 측정항목이 포함되어 있는 테이블입니다.
부차 입력의 읽기 소요 시간 및 바이트 테이블에는 다음과 같은 각 부차 입력 소비자에 대한 정보가 포함됩니다.
- 부차 입력 소비자: 부차 입력 소비자의 변환 이름입니다.
- 읽기 소요 시간: 이 소비자가 부차 입력 컬렉션 읽기에 소요한 시간입니다.
- 읽은 바이트: 이 소비자가 부차 입력 컬렉션에서 읽은 바이트 수입니다.
파이프라인에 부차 입력을 만드는 복합 변환이 있는 경우, 부차 입력을 만드는 특정 하위 변환이 표시될 때까지 복합 변환을 확장합니다. 그런 다음 해당 하위 변환을 선택하여 부차 입력 측정항목 섹션을 확인합니다.
그림 4에는 부차 입력 컬렉션을 만드는 변환의 부차 입력 측정항목이 나와 있습니다.
부차 입력을 하나 이상 소비하는 변환
선택한 변환이 부차 입력을 하나 이상 소비하는 경우, 부차 입력 측정항목 섹션에 부차 입력의 읽기 소요 시간 및 바이트 테이블이 표시됩니다. 이 테이블에는 다음과 같은 각 부차 입력 컬렉션에 대한 정보가 포함됩니다.
- 부차 입력 컬렉션: 부차 입력 컬렉션 이름입니다.
- 읽기 소요 시간: 변환이 이 부차 입력 컬렉션을 읽는 데 소요한 시간입니다.
- 읽은 바이트: 변환이 이 부차 입력 컬렉션에서 읽은 바이트 수입니다.
파이프라인에 부차 입력을 읽는 복합 변환이 있는 경우, 부차 입력을 읽는 특정 하위 변환이 표시될 때까지 복합 변환을 확장합니다. 그런 다음 해당 하위 변환을 선택하여 부차 입력 측정항목 섹션을 확인합니다.
그림 5에는 부차 입력 컬렉션에서 읽는 변환의 부차 입력 측정항목이 나와 있습니다.
부차 입력 성능 문제 식별
반복은 일반적인 부차 입력 성능 문제입니다. 부차 입력 PCollection
이 너무 크면 작업자가 전체 컬렉션을 메모리에 캐시할 수 없습니다.
그 결과 작업자는 영구 부차 입력 컬렉션에서 반복적으로 읽어야 합니다.
그림 6에서 부차 입력 측정항목은 부차 입력 컬렉션에서 읽은 총 바이트 수가 컬렉션 크기(작성된 총 바이트 수)보다 훨씬 큼을 보여줍니다.
이 파이프라인의 성능을 개선하려면 부차 입력 데이터의 반복이나 다시 가져오는 것을 방지하도록 알고리즘을 다시 설계합니다. 이 예시에서 파이프라인은 컬렉션 2개의 카티전 프로덕트을 만듭니다. 이 알고리즘은 기본 컬렉션의 각 요소에 대한 전체 부차 입력 컬렉션을 반복합니다. 기본 컬렉션의 여러 요소를 함께 일괄 처리하여 파이프라인의 액세스 패턴을 개선할 수 있습니다. 이 변경으로 작업자가 부차 입력 컬렉션을 다시 읽어야 하는 횟수가 줄어듭니다.
파이프라인이 큰 부차 입력이 하나 이상 있는 ParDo
를 적용하여 조인을 수행하면 다른 일반적인 성능 문제가 발생할 수 있습니다. 이 경우 작업자는 부차 입력 컬렉션에서 읽는 데 동시 실행 시간의 많은 부분을 소비할 수 있습니다.
그림 7은 이 문제의 부차 입력 측정항목 예시를 보여줍니다.
이 파이프라인 성능을 개선하려면 부차 입력 대신 CoGroupByKey를 사용합니다.