파이프라인 수명주기

이 페이지에서는 파이프라인 코드부터 Dataflow 작업까지 파이프라인 수명 주기를 간략하게 설명합니다.

이 페이지에서는 다음 개념을 설명합니다.

  • 실행 그래프의 정의와 Apache Beam 파이프라인이 Dataflow 작업이 되는 방법
  • Dataflow가 오류를 처리하는 방법
  • Dataflow가 파이프라인의 처리 논리를 자동으로 동시 처리하여 작업을 수행하는 작업자에게 배포하는 방법
  • Dataflow에서 수행할 수 있는 작업 최적화

실행 그래프

Dataflow 파이프라인을 실행하면 Dataflow는 모든 변환 및 관련 처리 함수(예: DoFn 객체) 등 Pipeline 객체를 구성하는 코드로부터 실행 그래프를 만듭니다. 이것이 파이프라인 실행 그래프이며 이 단계를 그래프 생성 시간이라고 합니다.

그래프를 구성하는 동안 Apache Beam은 파이프라인 코드의 기본 진입 포인트에서 로컬로 코드를 실행하여 소스, 싱크 또는 변환 단계 호출에서 중지하고 이러한 호출을 그래프 노드로 변환합니다. 따라서 파이프라인의 진입점(Java 및 Go의 main 메서드 또는 Python 스크립트의 최상위 수준)에 있는 코드 조각은 파이프라인을 실행하는 머신에서 로컬로 실행됩니다. 그러나 DoFn 객체 메서드에서 선언된 동일한 코드는 Dataflow 작업자에서 실행됩니다.

예를 들어 Apache Beam SDK에 포함되어 있는 WordCount 샘플에는 각 단어의 어커런스 횟수와 함께 텍스트 컬렉션에서 개별 단어를 읽고, 추출하고, 계산하고, 형식을 지정하고, 작성하는 일련의 변환이 포함되어 있습니다. 다음 다이어그램에는 WordCount 파이프라인의 변환이 실행 그래프로 확장되는 방법이 나와 있습니다.

WordCount 예시 프로그램의 변환을 Dataflow 서비스가 실행하는 단계의 실행 그래프로 확장

그림 1: WordCount 예시 실행 그래프

실행 그래프는 사용자가 파이프라인을 구성할 때 변환을 지정한 순서와 다른 경우가 많습니다. 그 차이가 생기는 이유는 Dataflow 서비스가 관리형 클라우드 리소스에서 실행되기 전에 실행 그래프에 다양한 최적화 및 융합을 수행하기 때문입니다. Dataflow 서비스는 파이프라인을 실행할 때 데이터 종속 항목을 고려합니다. 그러나 데이터 종속 항목이 없는 단계는 순서에 관계없이 실행될 수 있습니다.

Dataflow가 파이프라인에 대해 생성한 최적화되지 않은 실행 그래프를 보려면 Dataflow 모니터링 인터페이스에서 작업을 선택합니다. 작업 보기에 대한 자세한 내용은 Dataflow 모니터링 인터페이스 사용을 참조하세요.

그래프 구성 중에 Apache Beam은 Cloud Storage 버킷, BigQuery 테이블, Pub/Sub 주제 또는 구독과 같이 파이프라인에서 참조하는 모든 리소스가 실제로 존재하고 액세스할 수 있는지 검사합니다. 검사는 각 서비스에 대한 표준 API 호출을 통해 수행되므로 파이프라인을 실행하는 데 사용되는 사용자 계정이 필요한 서비스에 올바르게 연결되고 서비스의 API를 호출할 수 있습니다. 파이프라인을 Dataflow 서비스에 제출하기 전에 Apache Beam은 다른 오류를 확인하여 파이프라인 그래프에 잘못된 작업이 포함되지 않도록 확인합니다.

그런 다음 실행 그래프는 JSON 형식으로 변환되고 JSON 실행 그래프는 Cloud Dataflow 서비스 엔드포인트로 전송됩니다.

그런 다음 Dataflow 서비스는 JSON 실행 그래프의 유효성을 검사합니다. 그래프의 유효성이 검사되면 그래프는 Dataflow 서비스의 작업이 됩니다. Dataflow 모니터링 인터페이스를 사용하여 작업, 실행 그래프, 상태, 로그 정보를 볼 수 있습니다.

Java

Dataflow 서비스는 Dataflow 프로그램을 실행한 머신에 응답을 보냅니다. 이 응답은 Dataflow 작업의 jobId를 포함하는 DataflowPipelineJob 객체에 캡슐화됩니다. jobId를 사용하면 Dataflow 모니터링 인터페이스Dataflow 명령줄 인터페이스를 사용해 작업을 모니터링, 추적, 문제해결할 수 있습니다. 자세한 내용은 DataflowPipelineJob에 대한 API 참조를 참고하세요.

Python

Dataflow 서비스는 Dataflow 프로그램을 실행한 머신에 응답을 보냅니다. 이 응답은 Dataflow 작업의 job_id를 포함하는 DataflowPipelineResult 객체에 캡슐화됩니다. job_id를 사용하면 Dataflow 모니터링 인터페이스Dataflow 명령줄 인터페이스를 사용해 작업을 모니터링, 추적, 문제해결할 수 있습니다.

Go

Dataflow 서비스는 Dataflow 프로그램을 실행한 머신에 응답을 보냅니다. 이 응답은 Dataflow 작업의 jobID를 포함하는 dataflowPipelineResult 객체에 캡슐화됩니다. jobID를 사용하면 Dataflow 모니터링 인터페이스Dataflow 명령줄 인터페이스를 사용해 작업을 모니터링, 추적, 문제해결할 수 있습니다.

파이프라인을 로컬에서 실행하는 경우에도 그래프가 생성되긴 하지만 그래프는 JSON으로 변환되지 않으며 서비스로 전송되지 않습니다. 대신 그래프는 Dataflow 프로그램을 실행한 머신에서 로컬로 실행됩니다. 자세한 내용은 로컬 실행을 위한 PipelineOptions 구성을 참조하세요.

오류 및 예외 처리

데이터를 처리하는 동안 파이프라인에서 예외가 발생할 수 있습니다. 이러한 오류 중 일부는 일시적(예: 외부 서비스 액세스 시의 일시적인 어려움)이지만 다른 오류(예: 계산 중 null 포인터 또는 손상되거나 파싱할 수 없는 입력 데이터로 인한 오류)는 영구적입니다.

Dataflow는 임의 번들에서 요소를 처리하고, 해당 번들의 요소에 대한 오류가 발생하는 경우 전체 번들을 다시 시도합니다. 일괄 모드에서 실행하면 실패 항목이 포함된 번들은 4번 재시도됩니다. 단일 번들이 4번 실패하면 파이프라인이 완전히 실패합니다. 스트리밍 모드에서 실행하는 경우 실패 항목이 포함된 번들은 무제한으로 재시도되므로 파이프라인이 영구 중단될 수 있습니다.

일괄 모드에서 처리 중인 경우 파이프라인 작업이 완전히 실패(4번 재시도 후 주어진 번들이 실패하는 경우 발생)하기 전에 많은 수의 개별 실패가 표시될 수 있습니다. 예를 들어, 파이프라인이 번들 100개를 처리하려고 하면 단일 번들이 4번 실패 조건에 도달할 때까지 수백 개의 개별 오류가 발생할 수 있습니다.

작업자에 패키지를 설치하지 못하는 등의 시작 작업자 오류는 일시적입니다. 이 시나리오에서는 재시도가 무한히 발생하며 파이프라인이 영구적으로 중단될 수 있습니다.

동시 처리 및 분산

Dataflow 서비스는 파이프라인의 처리 논리를 자동으로 동시 처리하고 사용자가 작업 수행을 위해 할당한 작업자에게 분산합니다. Dataflow는 프로그래밍 모델의 추상화를 사용하여 병렬 처리 함수를 나타냅니다. 예를 들어 파이프라인의 ParDo 변환은 Dataflow가 DoFn 객체로 표현되는 처리 코드를 여러 동시 실행 작업자에게 자동으로 분산하도록 만듭니다.

작업 동시 로드에는 두 가지 유형이 있습니다.

  • 수평 동시 로드는 파이프라인 데이터가 분할되고 여러 작업자에서 동시에 처리될 때 발생합니다. Dataflow 런타임 환경은 분산 작업자 풀을 기반으로 합니다. 풀에 작업자가 더 많이 포함되어 있으면 파이프라인의 잠재적 동시 로드가 향상되지만, 구성 비용도 높아집니다. 이론적으로 수평 동시 로드에는 상한이 없습니다. 하지만 Dataflow는 Fleet 전체 리소스 사용을 최적화하기 위해 작업자 풀을 4,000 작업자로 제한합니다.

  • 수직 동시 로드는 파이프라인 데이터가 분할되고 동일한 작업자의 여러 CPU 코어에서 처리될 때 발생합니다. 각 작업자는 Compute Engine VM을 기반으로 합니다. VM은 모든 CPU 코어를 포화시키기 위해 여러 프로세스를 실행할 수 있습니다. 코어가 더 많은 VM은 잠재적인 수직 동시 로드가 더 높지만, 이 구성은 비용이 증가합니다. 코어 수가 많을수록 메모리 사용량이 증가하는 경우가 많으므로 일반적으로 코어 수는 메모리 크기와 함께 확장됩니다. 컴퓨터 아키텍처의 물리적 한계를 고려할 때 수직 동시 로드의 상한은 수평 동시 로드의 상한보다 훨씬 낮습니다.

관리형 동시 로드

기본적으로 Dataflow는 작업 동시 로드를 자동으로 관리합니다. Dataflow는 CPU 및 메모리 사용량과 같은 작업의 런타임 통계를 모니터링하여 작업을 확장하는 방법을 결정합니다. 작업 설정에 따라 Dataflow는 수평 자동 확장이라고 하는 수평형 또는 수직 확장이라고 하는 작업을 수평으로 확장할 수 있습니다. 동시 로드를 위해 자동 확장을 통해 작업 비용과 작업 성능을 최적화합니다.

작업 성능을 개선하기 위해 Dataflow는 내부적으로 파이프라인을 최적화합니다. 일반적인 최적화는 융합 최적화결합 최적화입니다. Dataflow는 파이프라인 단계를 융합함으로써 분산 시스템에서의 단계 조정 및 각 개별 단계를 개별적으로 실행하는 데 필요한 불필요한 비용을 없애줍니다.

동시 로드에 영향을 미치는 요소

다음 요소는 Dataflow 작업에서 동시 로드가 얼마나 잘 작동하는지에 영향을 줍니다.

입력 소스

입력 소스가 동시 로드를 허용하지 않으면 입력 소스 수집 단계에서 Dataflow 작업에서 병목 현상이 발생할 수 있습니다. 예를 들어 단일 압축 텍스트 파일에서 데이터를 수집하면 Dataflow가 입력 데이터를 병렬 처리할 수 없습니다. 대부분의 압축 형식은 수집 중에 샤드로 임의 분할될 수 없기 때문에 Dataflow는 파일 시작 부분부터 순차적으로 데이터를 읽어야 합니다. 파이프라인의 전체 처리량은 파이프라인의 비동시 부분으로 인해 느려집니다. 이 문제를 해결하는 방법은 더 확장 가능한 입력 소스를 사용하는 것입니다.

경우에 따라 단계 융합으로 동시 로드가 줄어듭니다. 입력 소스가 동시 로드를 허용하지 않는 경우 Dataflow가 데이터 수집 단계를 후속 단계와 융합하고 이 융합된 단계를 단일 스레드에 할당하면 전체 파이프라인이 더 느리게 실행될 수 있습니다.

이 시나리오를 방지하려면 입력 소스 수집 단계 뒤에 Reshuffle 단계를 삽입하세요. 자세한 내용은 이 문서의 융합 방지 섹션을 참조하세요.

기본 팬아웃 및 데이터 형태

단일 변환 단계의 기본 팬아웃으로 인해 병목 현상이 발생할 수 있고 동시 로드를 제한할 수 있습니다. 예를 들어 '높은 팬아웃' ParDo 변환은 융합으로 인해 Dataflow의 작업자 사용 최적화 기능이 제한될 수 있습니다. 이러한 작업에서는 입력 컬렉션의 요소 수가 상대적으로 적음에도 ParDo에서 수백 또는 수천 배 더 많은 요소가 있는 출력을 생성한 다음 다른 ParDo가 수행될 수 있습니다 Dataflow 서비스에서 ParDo 작업을 융합할 경우, 이 단계의 동시 처리는 입력 컬렉션의 항목 수 이하로 제한되지만 중간 PCollection에는 더 많은 요소가 포함되어 있습니다.

가능한 해결책은 이 문서의 융합 방지 섹션을 참조하세요.

데이터 형태

입력 데이터든 중간 데이터든 데이터의 형태에 따라 동시 로드가 제한될 수 있습니다. 예를 들어 도시와 같은 자연 키의 GroupByKey 단계 다음에 map 또는 Combine 단계가 이어지면 Dataflow가 두 단계를 융합합니다. 키 공간이 협소한 경우(예를 들어, 5개 도시이고, 한 키는 아주 핫한 대도시라고 할 경우) GroupByKey 단계의 출력 중 대부분의 항목이 하나의 프로세스에 분산됩니다. 이 프로세스로 인해 병목 현상이 발생하고 작업 속도가 느려집니다.

이 예시에서는 자연 키를 사용하는 대신 GroupByKey 단계 결과를 더 큰 인위적 키 공간으로 재배포할 수 있습니다. GroupByKey 단계와 map 또는 Combine 단계 사이에 Reshuffle 단계를 삽입합니다. Reshuffle 단계에서 hash 함수 등을 사용하여 인위적인 키 공간을 만들어 데이터 형태로 인한 제한된 동시 로드 문제를 해결합니다.

자세한 내용은 이 문서의 융합 방지 섹션을 참조하세요.

출력 싱크

싱크는 파일 또는 데이터베이스와 같은 외부 데이터 스토리지 시스템에 쓰는 변환입니다. 실제로 싱크는 표준 DoFn 객체로 모델링 및 구현되며 PCollection을 외부 시스템에 구체화하는 데 사용됩니다. 이 경우 PCollection에는 최종 파이프라인 결과가 포함됩니다. 싱크 API를 호출하는 스레드는 병렬로 실행되어 외부 시스템에 데이터를 쓸 수 있습니다. 기본적으로 스레드 간 조정은 발생하지 않습니다. 쓰기 요청 및 제어 흐름을 버퍼링하는 중간 레이어가 없으면 외부 시스템에 과부하가 발생하여 쓰기 처리량이 줄어들 수 있습니다. 동시 로드를 추가하여 리소스를 확장하면 파이프라인이 더 느려질 수 있습니다.

이 문제의 솔루션은 쓰기 단계에서 동시 로드를 줄이는 것입니다. 쓰기 단계 직전에 GroupByKey 단계를 추가할 수 있습니다. GroupByKey 단계는 출력 데이터를 더 작은 배치 집합으로 그룹화하여 총 RPC 호출 및 외부 시스템에 대한 연결을 줄입니다. 예를 들어 GroupByKey를 사용하여 데이터 포인트 100만 개 중 50개의 해시 공간을 만듭니다.

이 방식의 단점은 동시 로드에 하드코딩된 제한이 필요하다는 것입니다. 또 다른 옵션은 데이터를 쓸 때 싱크에서 지수 백오프를 구현하는 것입니다. 이 옵션은 최소 수준의 클라이언트 제한을 제공할 수 있습니다.

동시 로드 모니터링

동시 로드를 모니터링하려면 Google Cloud 콘솔을 사용하여 감지된 낙오 항목을 볼 수 있습니다. 자세한 내용은 일괄 작업의 낙오 항목 문제 해결스트리밍 작업의 낙오 항목 문제 해결을 참조하세요.

융합 최적화

파이프라인 실행 그래프의 JSON 형식에 대한 유효성 검사가 완료되면 Dataflow 서비스는 최적화를 수행하도록 그래프를 수정할 수 있습니다. 최적화에는 파이프라인 실행 그래프의 여러 단계 또는 변환을 단일 단계로 융합하는 작업이 포함될 수 있습니다. 단계를 융합하면 Dataflow 서비스가 메모리와 처리 오버헤드 면에서 많은 비용이 들 수 있는 파이프라인의 모든 중간 PCollection을 구체화할 필요가 없게 됩니다.

파이프라인 구성에서 지정한 모든 변환이 서비스에서 실행되지만 파이프라인의 가장 효율적인 실행을 보장하기 위해 변환은 다른 순서로 실행되거나 더 큰 융합된 변환의 일부로 실행될 수 있습니다. Dataflow 서비스는 실행 그래프의 단계 간 데이터 종속성을 고려하지만 다른 단계는 순서에 관계없이 실행할 수 있습니다.

융합 예시

다음 다이어그램은 효율적인 실행을 위해 Java용 Apache Beam SDK에 포함된 WordCount 예시의 실행 그래프를 Dataflow 서비스에서 최적화하고 통합하는 방법을 보여줍니다.

Dataflow 서비스에서 WordCount 예시 프로그램의 실행 그래프가 최적화되고 단계 융합

그림 2: 실행 그래프를 최적화한 WordCount 예

융합 방지

경우에 따라 Dataflow는 파이프라인에서 작업을 융합하는 최적의 방법을 잘못 추측하여 사용 가능한 모든 작업자를 사용하는 Dataflow의 기능이 제한될 수 있습니다. 이러한 경우 Dataflow가 융합 최적화를 수행하지 않도록 방지할 수 있습니다.

Dataflow 서비스가 중간 PCollection을 강제로 구체화하도록 파이프라인에 작업을 추가하여 단계 융합을 방지할 수 있습니다. 다음 작업 중 하나를 사용해 보세요.

  • GroupByKey를 삽입하고 첫 번째 ParDo 이후 그룹 해제합니다. Dataflow 서비스는 집계에서 ParDo 작업을 융합하지 않습니다.
  • 중간 PCollection부차 입력으로 다른 ParDo에 전달합니다. Dataflow 서비스는 항상 부차 입력을 구체화합니다.
  • Reshuffle 단계를 삽입합니다. Reshuffle은 융합을 방지하고 데이터를 검사하고 레코드의 중복 삭제를 수행합니다. Apache Beam 문서에서 지원 중단된 것으로 표시되더라도 Dataflow는 Reshuffle을 지원합니다.

융합 모니터링

gcloud CLI 또는 API를 사용하여 Google Cloud 콘솔에서 최적화된 그래프와 융합된 스테이지에 액세스할 수 있습니다.

콘솔

콘솔에서 그래프의 융합된 스테이지와 단계를 보려면 Dataflow 작업의 실행 세부정보 탭에서 단계 워크플로 그래프 뷰를 엽니다.

스테이지에 융합된 구성요소 단계를 보려면 그래프에서 융합된 단계를 클릭합니다. 스테이지 정보 창의 구성요소 단계 행에 융합된 스테이지가 표시됩니다. 단일 복합 변환의 일부가 여러 스테이지로 융합되는 경우가 있습니다.

gcloud

gcloud CLI를 사용하여 최적화된 그래프와 융합된 스테이지에 액세스하려면 다음 gcloud 명령어를 실행합니다.

  gcloud dataflow jobs describe --full JOB_ID --format json

JOB_ID를 Dataflow 작업의 ID로 바꿉니다.

관련 비트를 추출하려면 gcloud 명령어의 출력을 jq에 파이핑합니다.

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

출력 응답 파일에서 융합된 스테이지에 대한 설명을 보려면 ComponentTransform 배열 내에서 ExecutionStageSummary 객체를 확인합니다.

API

API를 사용하여 최적화된 그래프와 융합된 스테이지에 액세스하려면 project.locations.jobs.get를 호출합니다.

출력 응답 파일에서 융합된 스테이지에 대한 설명을 보려면 ComponentTransform 배열 내에서 ExecutionStageSummary 객체를 확인합니다.

최적화 결합

집계 작업은 대규모 데이터 처리에서 중요한 개념입니다. 집계는 개념상 멀리 떨어져 있는 데이터를 모으기 때문에 상관 관계 설정에 매우 유용합니다. Dataflow 프로그래밍 모델은 집계 작업을 GroupByKey, CoGroupByKey, Combine 변환으로 나타냅니다.

Dataflow의 집계 연산은 여러 작업자에 분산될 수있는 데이터를 포함하여 전체 데이터 세트에 걸쳐 데이터를 결합합니다. 이러한 집계 작업 중에는 인스턴스 전반에서 데이터를 결합하기 전에 최대한 많은 양의 데이터를 로컬에서 결합하는 것이 가장 효율적인 경우가 많습니다. GroupByKey 또는 다른 집계 변환을 적용하면 Cloud Dataflow 서비스는 기본 그룹화 작업을 수행하기 전에 부분 결합을 로컬에서 자동으로 수행합니다.

부분 또는 다중 레벨 결합을 수행할 때 Dataflow 서비스는 파이프라인이 일괄 또는 스트리밍 데이터를 사용하는지 여부에 따라 다른 결정을 내립니다. 제한된 데이터의 경우 이 서비스는 효율성을 우선시하고 최대한 많은 로컬 결합을 수행합니다. 제한되지 않은 데이터의 경우, 서비스는 짧은 지연 시간을 우선시하므로 지연 시간을 증가시킬 수 있으므로 부분 결합을 수행하지 않을 수 있습니다.