파이프라인 배포

이 문서에서는 Dataflow가 파이프라인을 배포하고 실행하는 방법을 자세히 설명하고 최적화 및 부하 분산과 같은 고급 주제를 다룹니다. 첫 번째 파이프라인을 만들고 배포하는 방법에 대한 단계별 안내를 알아보려면 자바, Python 또는 템플릿에 대한 Dataflow의 빠른 시작을 사용하세요.

Apache Beam 파이프라인을 만들고 테스트한 후 Dataflow 관리형 서비스를 사용하여 배포하고 실행할 수 있습니다. Dataflow 서비스에서 파이프라인 코드는 Dataflow 작업이 됩니다.

Dataflow 서비스는 Compute EngineCloud Storage와 같은 Google Cloud 서비스를 완벽하게 관리하여 Dataflow 작업을 실행하고 필요한 리소스를 자동으로 떼어냅니다. Dataflow 서비스는 Dataflow Monitoring 인터페이스Dataflow 명령줄 인터페이스와 같은 도구를 통해 작업에 대한 가시성을 제공합니다.

파이프라인 코드에서 실행 매개변수를 설정하여 Dataflow 서비스가 작업을 실행하는 방식을 제어할 수 있습니다. 예를 들어 실행 매개변수는 파이프라인 단계를 실행할 위치(작업자 가상 컴퓨터, Dataflow 서비스 백엔드, 로컬)를 지정합니다.

Dataflow 서비스는 Google Cloud 리소스를 관리하는 것 외에도 분산 병렬 처리의 여러 측면을 자동으로 수행하고 최적화합니다. 예를 들면 다음과 같습니다.

  • 동시 처리 및 분산. Dataflow는 자동으로 데이터를 분할하고 작업자 코드를 Compute Engine 인스턴스에 배포하여 동시 처리합니다.
  • 최적화. Dataflow는 파이프라인 코드를 사용하여 파이프라인의 PCollection 및 변환을 나타내는 실행 그래프를 만들고, 가장 효율적인 성능과 리소스 사용에 맞게 그래프를 최적화합니다. Cloud Dataflow는 또한 데이터 집계와 같이 많은 비용이 들 수 있는 작업을 자동으로 최적화합니다.
  • 자동 조정 기능. Dataflow 서비스에는 자동 확장 및 동적 작업 재배포와 같은 리소스 할당 및 데이터 파티셔닝의 즉각적인 조정을 제공하는 여러 기능이 포함되어 있습니다. 이러한 기능은 Dataflow 서비스가 최대한 신속하고 효율적으로 작업을 실행하는 데 도움이 됩니다.

파이프라인 코드에서 Dataflow 작업으로의 파이프라인 수명주기

Dataflow 파이프라인을 실행하면 Dataflow가 Pipeline 객체를 구성하는 코드에서 모든 변환 및 관련 처리 함수(예: DoFn)를 포함하는 실행 그래프를 만듭니다. 이 단계를 그래프 생성 시간이라고 하며, 파이프라인이 실행되는 컴퓨터에서 로컬로 실행됩니다.

그래프를 구성하는 동안 Apache Beam은 파이프라인 코드의 기본 진입 포인트에서 로컬로 코드를 실행하여 소스, 싱크 또는 변환 단계 호출에서 중지하고 이러한 호출을 그래프 노드로 변환합니다. 따라서 파이프라인의 진입 포인트(자바의 main() 메서드 또는 Python 스크립트의 최상위 수준)에 있는 코드 조각은 파이프라인을 실행하는 머신에서 로컬로 실행되지만 DoFn 객체 메서드에서 선언된 같은 코드는 Dataflow 작업자에서 실행됩니다.

또한 그래프 구성 중에 Apache Beam은 Cloud Storage 버킷, BigQuery 테이블, Pub/Sub 주제 또는 구독과 같이 파이프라인에서 참조하는 모든 리소스가 실제로 존재하고 액세스할 수 있는지 검사합니다. 검사는 각 서비스에 대한 표준 API 호출을 통해 수행되므로 파이프라인을 실행하는 데 사용되는 사용자 계정이 필요한 서비스에 올바르게 연결되고 API를 호출할 수 있습니다. 파이프라인을 Dataflow 서비스에 제출하기 전에 Apache Beam은 다른 오류를 확인하여 파이프라인 그래프에 잘못된 작업이 포함되지 않도록 확인합니다.

그런 다음 실행 그래프는 JSON 형식으로 변환되고 JSON 실행 그래프는 Cloud Dataflow 서비스 엔드포인트로 전송됩니다.

참고: 파이프라인을 로컬에서 실행하는 경우에도 그래프가 생성되긴 하지만 그래프는 JSON으로 변환되지 않으며 서비스로 전송되지 않습니다. 대신 그래프는 Dataflow 프로그램을 실행한 컴퓨터에서 로컬로 실행됩니다. 자세한 내용은 로컬 실행 구성에 대한 문서를 참조하세요.

그런 다음 Dataflow 서비스는 JSON 실행 그래프의 유효성을 검사합니다. 그래프의 유효성이 검사되면 그래프는 Dataflow 서비스의 작업이 됩니다. Dataflow Monitoring 인터페이스를 사용하여 작업, 실행 그래프, 상태, 로그 정보를 볼 수 있습니다.

자바: SDK 2.x

Dataflow 서비스는 Dataflow 프로그램을 실행한 시스템에 응답을 보냅니다. 이 응답은 Dataflow 작업의 jobId를 포함하는 DataflowPipelineJob 객체에 캡슐화됩니다. Dataflow 모니터링 인터페이스Dataflow 명령줄 인터페이스를 사용하여 작업을 모니터링, 추적 및 문제 해결을 위해 jobId을 사용할 수 있습니다. 자세한 내용은 DataflowPipelineJob에 대한 API 참조를 참조하세요.

Python

Dataflow 서비스는 Dataflow 프로그램을 실행한 시스템에 응답을 보냅니다. 이 응답은 Dataflow 작업의 job_id를 포함하는 DataflowPipelineResult 객체에 캡슐화됩니다. Dataflow 모니터링 인터페이스Dataflow 명령 줄 인터페이스를 사용하여 작업을 모니터링, 추적 및 문제 해결을 위해 job_id를 사용할 수 있습니다.

자바: SDK 1.x

실행 그래프

Dataflow는 Pipeline 객체를 구성할 때 사용한 변환과 데이터를 기반으로 파이프라인을 나타내는 단계 그래프를 만듭니다. 이것이 파이프라인 실행 그래프입니다.

Apache Beam SDK에 포함되어 있는 WordCount 예시 프로그램에는 각 단어의 어커런스 횟수와 함께 텍스트 컬렉션에서 개별 단어를 읽고, 추출하고, 계산하고, 형식을 지정하고, 작성하는 일련의 변환이 포함되어 있습니다. 다음 다이어그램에는 WordCount 파이프라인의 변환이 실행 그래프로 확장되는 방법이 나와 있습니다.

WordCount 예시 프로그램의 변환을 Cloud Dataflow 서비스가 실행하는 단계의 실행 그래프로 확장
그림 1: WordCount 예시 실행 그래프

실행 그래프는 사용자가 파이프라인을 구성할 때 변환을 지정한 순서와 다른 경우가 많습니다. 그 이유는 Dataflow 서비스가 관리형 클라우드 리소스에서 실행되기 전에 실행 그래프에 다양한 최적화 및 융합을 수행하기 때문입니다. Dataflow 서비스는 파이프라인을 실행할 때 데이터 종속성을 따릅니다. 그러나 데이터 종속성이 없는 단계는 순서에 상관없이 실행될 수 있습니다.

Dataflow 모니터링 인터페이스에서 작업을 선택하면 Dataflow가 파이프라인에 대해 생성한 최적화되지 않은 실행 그래프를 확인할 수 있습니다.

동시 처리 및 분산

Dataflow 서비스는 파이프라인의 처리 논리를 자동으로 동시 처리하고 사용자가 작업 수행을 위해 할당한 작업자에게 분산합니다. Dataflow는 프로그래밍 모델의 추상화를 사용하여 동시 처리 함수를 나타냅니다. 예를 들어 Dataflow는 ParDo 변환을 통해 DoFn으로 표현되는 처리 코드를 여러 동시 실행 작업자에게 자동으로 분산할 수 있습니다.

사용자 코드 구조화

DoFn 코드를 작고 독립적인 항목으로 생각할 수 있습니다. 많은 인스턴스가 각 머신이 다른 머신에 대해 알지 못하는 상태에서 서로 다른 머신에서 실행 중일 수 있습니다. 이와 같이 퓨어 함수(숨겨진 상태 또는 외부 상태에 따라 달라지지 않고 부작용이 관찰되지 않는 확정 함수)는 DoFn의 동시 및 분산 특성에 적합한 코드입니다.

순수 함수 모델은 완전히 엄격하지는 않지만 코드가 Dataflow 서비스에서 보장하지 않는 항목에 종속되지 않는 한, 상태 정보 또는 외부 초기화 데이터는 DoFn 및 기타 함수 객체에 대해 유효할 수 있습니다. ParDo 변환을 구조화하고 DoFn을 만드는 경우 다음 가이드라인을 참고하세요.

  • Dataflow 서비스는 DoFn 인스턴스에서 입력 PCollection의 모든 요소가 정확히 한 번 처리되도록 보장합니다.
  • Dataflow 서비스는 DoFn이 호출되는 횟수를 보장하지 않습니다.
  • Dataflow 서비스는 분산된 요소가 그룹화되는 방식을 보장하지 않습니다. 즉, 어떤 요소가 함께 처리되는지를 보장하지 않습니다.
  • Dataflow 서비스는 파이프라인을 통해 생성되는 정확한 DoFn 인스턴스 수를 보장하지 않습니다.
  • Dataflow 서비스는 내결함성이 있으며 작업자 문제가 발생하는 경우 코드를 여러 번 재시도할 수 있습니다. Dataflow 서비스는 코드의 백업 사본을 만들 수 있으며, 코드가 고유하지 않은 이름을 가진 임시 파일을 사용하거나 만드는 경우 수동 부작용과 관련된 문제가 발생할 수 있습니다.
  • Dataflow 서비스는 DoFnDoFn 인스턴스별 요소 처리를 직렬화합니다. 코드에는 스레드 안전성이 엄격하게 요구되지 않지만, 여러 DoFn 인스턴스 사이에 공유되는 상태는 스레드 안전성을 가져야 합니다.

사용자 코드 빌드에 대한 자세한 내용은 프로그래밍 모델 문서의 사용자 제공 함수에 대한 요구사항을 참조하세요.

오류 및 예외 처리

데이터를 처리하는 동안 파이프라인에서 예외가 발생할 수 있습니다. 이러한 오류의 일부는 일시적(예: 외부 서비스 액세스 시의 일시적인 어려움)이지만 일부는 영구적(예: 계산 중 null 포인터 또는 손상되거나 파싱할 수 없는 입력 데이터로 인한 오류)입니다.

Dataflow는 임의 번들에서 요소를 처리하고 해당 번들의 요소에 오류가 발생하면 전체 번들을 재시도합니다. 일괄 모드에서 실행하면 실패 항목이 포함된 번들은 4번 재시도됩니다. 단일 번들이 4번 실패하면 파이프라인이 완전히 실패합니다. 스트리밍 모드에서 실행하는 경우 실패 항목이 포함된 번들은 무제한으로 재시도되므로 파이프라인이 영구적으로 중단될 수 있습니다.

융합 최적화

파이프라인 실행 그래프의 JSON 형식에 대한 유효성 검사가 완료되면 Dataflow 서비스는 최적화를 수행하도록 그래프를 수정할 수 있습니다. 이러한 최적화에는 파이프라인 실행 그래프의 여러 단계 또는 변환을 단일 단계로 융합하는 작업이 포함될 수 있습니다. 단계를 융합하면 Dataflow 서비스가 메모리와 처리 오버헤드 면에서 많은 비용이 들 수 있는 파이프라인의 모든 중간 PCollection을 구체화할 필요가 없게 됩니다.

파이프라인 구성에서 지정한 모든 변환이 서비스에서 실행되지만 다른 순서로 실행되거나 파이프라인을 가장 효율적으로 실행하도록 더 큰 융합 변환의 일부로 실행될 수 있습니다. Dataflow 서비스는 실행 그래프의 단계 간 데이터 종속성을 고려하지만 다른 단계는 순서에 관계없이 실행할 수 있습니다.

융합 예시

다음 다이어그램은 효율적인 실행을 위해 자바용 Apache Beam SDK에 포함된 WordCount 예시의 실행 그래프를 Dataflow 서비스에서 최적화하고 통합하는 방법을 보여줍니다.

Cloud Dataflow 서비스에서 WordCount 예시 프로그램의 실행 그래프가 최적화되고 단계 융합
그림 2: 실행 그래프를 최적화한 WordCount 예시

융합 방지

파이프라인에서 Dataflow 서비스가 융합 최적화를 수행하지 않도록 해야 하는 몇 가지 경우가 있습니다. Dataflow 서비스가 파이프라인에서 작업을 통합하는 최적의 방법을 잘못 추측하여 Dataflow 서비스의 사용 가능한 모든 작업자를 제한할 수있는 경우입니다.

예를 들어 융합으로 인해 Dataflow의 작업자 사용 최적화 기능이 제한될 수 있는 한 가지 경우는 '높은 팬아웃' ParDo입니다. 이러한 작업에서는 입력 컬렉션의 요소 수가 상대적으로 적음에도 ParDo에서 수백 또는 수천 배 더 많은 요소가 있는 출력을 생성한 다음 다른 ParDo가 수행될 수 있습니다 Dataflow 서비스에서 ParDo 작업을 융합할 경우, 이 단계의 동시 처리는 입력 컬렉션의 항목 수 이하로 제한되지만 중간 PCollection에는 더 많은 요소가 포함되어 있습니다.

Dataflow 서비스가 중간 PCollection을 강제로 구체화하도록 파이프라인에 작업을 추가하여 이러한 융합을 방지할 수 있습니다. 다음 작업 중 하나를 사용해 보세요.

  • GroupByKey를 삽입하고 첫 번째 ParDo 후에 그룹 해제할 수 있습니다. Dataflow 서비스는 집계에서 ParDo 작업을 융합하지 않습니다.
  • 중간 PCollection부차 입력으로 다른 ParDo에 전달할 수 있습니다. Cloud Dataflow 서비스는 항상 부차 입력을 구체화합니다.
  • Reshuffle 단계를 삽입할 수 있습니다. Reshuffle는 융합을 방지하고 데이터를 검사하며 레코드 중복 삭제를 수행합니다. Apache Beam 문서에서 지원 중단으로 표시된 경우에도 Dataflow는 재셔플을 지원합니다.

최적화 결합

집계 작업은 대규모 데이터 처리에서 중요한 개념입니다. 집계는 개념상 멀리 떨어져 있는 데이터를 모으기 때문에 상관 관계 설정에 매우 유용합니다. Dataflow 프로그래밍 모델은 집계 작업을 GroupByKey, CoGroupByKey, Combine 변환으로 나타냅니다.

Dataflow의 집계 연산은 여러 작업자에 분산될 수있는 데이터를 포함하여 전체 데이터세트에 걸쳐 데이터를 결합합니다. 이러한 집계 작업 중에는 인스턴스 전반에서 데이터를 결합하기 전에 최대한 많은 양의 데이터를 로컬에서 결합하는 것이 가장 효율적인 경우가 많습니다. GroupByKey 또는 다른 집계 변환을 적용하면 Cloud Dataflow 서비스는 기본 그룹화 작업을 수행하기 전에 부분 결합을 로컬에서 자동으로 수행합니다.

부분 또는 다중 레벨 결합을 수행할 때 Dataflow 서비스는 파이프라인이 일괄 또는 스트리밍 데이터를 사용하는지 여부에 따라 다른 결정을 내립니다. 제한된 데이터의 경우 이 서비스는 효율성을 우선시하고 최대한 많은 로컬 결합을 수행합니다. 제한되지 않은 데이터의 경우, 서비스는 짧은 지연 시간을 우선시하므로 지연 시간을 증가시킬 수 있는 부분 결합을 수행하지 않을 수 있습니다.

자동 조정 기능

Dataflow 서비스에는 실행 중인 Dataflow 작업을 더욱 동적으로 최적화할 수 있는 자동 조정 기능 몇 가지가 포함되어 있습니다. 이러한 기능에는 자동 확장동적 작업 재균등화가 포함됩니다.

자동 확장

자동 확장을 사용하면 Dataflow 서비스에서 자동으로 작업을 실행하는 데 필요한 적합한 작업자 인스턴스 수를 자동으로 선택합니다. Dataflow 서비스는 작업 특성을 고려하여 런타임 중에 더 많은 작업자 또는 더 적은 작업자를 동적으로 다시 할당할 수도 있습니다. 파이프라인의 특정 부분은 다른 부분보다 더 많은 연산을 필요로 할 수 있습니다. Dataflow 서비스는 이와 같은 작업 단계 중에 추가 작업자를 자동으로 가동하고 더 이상 필요하지 않을 때 종료할 수 있습니다.

자바: SDK 2.x

자동 확장은 Streaming Engine을 사용하는 모든 일괄 Dataflow 작업 및 스트리밍 작업에 기본적으로 사용됩니다. 파이프라인을 실행할 때 --autoscalingAlgorithm=NONE 플래그를 지정하여 자동 확장을 사용 중지할 수 있습니다. 이렇게 하면 Dataflow 서비스는 --numWorkers 옵션에 따라 작업자 수를 설정합니다. 기본값은 3입니다.

자동 확장을 사용하면 Dataflow 서비스에서 작업에 할당된 작업자 인스턴스의 수를 사용자가 직접 제어할 수 없습니다. 하지만 파이프라인을 실행할 때 --maxNumWorkers 옵션을 지정하여 사용자 수를 제한할 수 있습니다.

일괄 작업의 경우 --maxNumWorkers 플래그는 선택 사항입니다. 기본값은 1000입니다. Streaming Engine을 사용하는 스트리밍 작업의 경우 --maxNumWorkers 플래그는 선택 사항입니다. 기본값은 100입니다. Streaming Engine을 사용하지 않는 스트리밍 작업의 경우 --maxNumWorkers 플래그가 필요합니다.

Python

자동 확장은 Python 버전 0.5.1 이상의 Apache Beam SDK를 사용하여 생성된 모든 일괄 Dataflow 작업에서 기본적으로 사용됩니다. 파이프라인을 실행할 때 --autoscaling_algorithm=NONE 플래그를 지정하여 자동 확장을 사용 중지할 수 있습니다. 이렇게 하면 Dataflow 서비스는 --num_workers 옵션에 따라 작업자 수를 설정합니다. 기본값은 3입니다.

자바: SDK 1.x

Dataflow는 파이프라인의 동시 처리에 따라 확장됩니다. 파이프라인의 병렬 처리는 주어진 시간에 데이터를 가장 효율적으로 처리하는 데 필요한 스레드 수의 추정치입니다.

병렬 서비스는 외부 서비스의 대역폭이 너무 낮지 않다면 몇 분마다 계산됩니다. 병렬 처리가 증가하면 Dataflow가 확장되고 작업자가 추가됩니다. 병렬 처리가 감소하면 Dataflow가 축소되고 작업자가 제거됩니다.

다음 표에서는 자동 확장을 사용하여 작업자 수를 늘리거나 줄인 경우를 요약하여 보여줍니다. 일괄스트리밍 파이프라인:

일괄 파이프라인 스트리밍 파이프라인
확장

남은 작업이 새 작업자를 투입하는 것보다 오래 걸리고 현재 작업자의 평균 CPU 사용량이 5%를 넘으면 Dataflow가 확장될 수 있습니다.

다음과 같은 소스는 데이터 분량이 적은 데이터, 압축 파일과 같은 분할 불가능한 데이터, 데이터를 분할하지 않은 I/O 모듈에서 처리하는 데이터 등 새로운 작업자 수를 제한할 수 있습니다.

기존 파일에 쓰는 Cloud Storage 대상처럼 고정된 개수의 샤드에 쓰기 위해 구성된 싱크는 새 작업자 수를 제한할 수 있습니다.

스트리밍 파이프라인이 백로그되고 작업자가 평균적으로 CPU의 20% 이상을 사용하면 Dataflow가 확장될 수 있습니다. 작업자 당 현재 처리량을 고려할 때 약 150 초 내에 백 로그가 삭제됩니다.

축소

남은 작업 시간이 새 작업자를 늘리는 데 걸리는 시간보다 짧고 현재 작업자의 평균 CPU 사용량이 5% 를 넘으면 Dataflow가 축소될 수 있습니다.

트리밍 파이프라인 백 로그가 20 초 미만이고 작업자가 평균적으로 CPU의 80% 미만을 사용하는 경우 Dataflow가 축소될 수 있습니다. 새로운 작업자 수는 평균적으로 CPU의 75% 미만을 사용합니다.

자동 확장 없음

I/O가 데이터 처리보다 오래 걸리거나 평균적으로 CPU의 5% 미만인 경우 병렬 처리가 다시 계산되지 않습니다.

작업자가 평균적으로 CPU의 20% 미만을 사용하는 경우, 병렬 처리는 다시 계산되지 않습니다.

일괄 자동 확장

일괄 파이프라인의 경우 Dataflow는 파이프라인의 각 단계에서의 작업량과 해당 단계의 현재 처리량을 기반으로 작업자 수를 자동으로 선택합니다. Dataflow는 현재 작업자가 처리하는 데이터의 양을 결정하고 나머지 작업이 처리되는 데 걸리는 시간을 추정합니다.

파이프라인에서 사용자가 구현한 커스텀 데이터 소스를 사용하는 경우 몇 가지 메서드를 구현하여 Dataflow 서비스의 자동 확장 알고리즘에 더 많은 정보를 제공하고 성능을 개선할 수 있습니다.

자바: SDK 2.x

  • BoundedSource 서브클래스에서 getEstimatedSizeBytes 메서드를 구현합니다. Dataflow 서비스는 파이프라인에 사용할 초기 작업자 수를 계산할 때 getEstimatedSizeBytes을 사용합니다.
  • BoundedReader 서브클래스에서 getFractionConsumed 메서드를 구현합니다. Dataflow 서비스는 getFractionConsumed을 사용하여 읽기 진행 상황을 추적하고 읽기 작업에 사용할 올바른 작업자 수를 수렴합니다.

Python

  • BoundedSource 서브클래스에서 estimate_size 메서드를 구현합니다. Dataflow 서비스는 파이프라인에 사용할 초기 작업자 수를 계산할 때 estimate_size을 사용합니다.
  • RangeTracker 서브클래스에서 fraction_consumed 메서드를 구현합니다. Dataflow 서비스는 fraction_consumed을 사용하여 읽기 진행 상황을 추적하고 읽기 작업에 사용할 올바른 작업자 수를 수렴합니다.

자바: SDK 1.x

스트리밍 자동 확장

스트리밍 자동 확장을 사용하면 Dataflow 서비스가 로드 및 리소스 사용량의 변화에 응답하여 스트리밍 파이프라인을 실행하는 데 사용되는 작업자 수를 적응적으로 변경할 수 있습니다. 스트리밍 자동 확장은 무료 기능이며 스트리밍 파이프라인을 실행할 때 사용되는 리소스의 비용을 절감하도록 설계되었습니다.

자동 확장을 사용하지 않는 경우 numWorkers 또는 num_workers를 지정하여 파이프라인을 실행하는 고정된 작업자 수를 선택하게 됩니다. 시간에 따라 입력 워크로드가 달라지므로 이 숫자는 너무 높거나 낮아질 수 있습니다. 너무 많은 작업자를 프로비저닝하면 불필요한 추가 비용이 발생하고, 너무 적은 작업자를 프로비저닝하면 데이터 처리 지연 시간이 길어집니다. 자동 확장을 사용하면 필요한 경우에만 리소스가 사용됩니다.

스트리밍 파이프라인을 자동 확장하는 목적은 작업자 사용률 및 처리량을 최대화하면서 백로그를 최소화하고 부하 급증에 빠르게 대처하는 것입니다. 자동 확장을 사용하면 최대 부하에 대비한 프로비저닝과 최신 결과 중 무엇을 선택할지 고민할 필요가 없습니다. CPU 사용률 및 백로그가 증가하면 작업자가 추가되고, 감소하면 작업자가 제거됩니다. 따라서 필요한 항목에만 비용을 지불하면 되며 작업이 최대한 효율적으로 처리됩니다.

자바: SDK 2.x

제한되지 않은 커스텀 소스

파이프라인에서 제한되지 않은 커스텀 소스를 사용하는 경우 이 소스가 Dataflow 서비스에 백로그에 대해 알려야 합니다. 백로그는 아직 소스에서 처리하지 않은 입력 추정치입니다(단위: 바이트). 서비스에 백로그에 관한 정보를 제공하려면 UnboundedReader 클래스에서 다음 메서드 두 개 중 하나를 구현합니다.

  • getSplitBacklogBytes() - 현재 소스 분할에 대한 백로그입니다. 서비스는 모든 분할에 걸쳐 백로그를 집계합니다.
  • getTotalBacklogBytes() - 모든 분할에 걸친 전체 백로그입니다. 각 분할에 대한 백로그가 제공되지 않고 모든 분할에 대해서만 계산할 수 있는 경우도 있습니다. 첫 번째 분할(분할 ID '0')만 전체 백로그를 제공해야 합니다.
Apache Beam 저장소에는 UnboundedReader클래스를 구현하는 여러 가지 커스텀 소스 예시가 있습니다.
스트리밍 자동 확장 사용

Streaming Engine을 사용하는 스트리밍 작업의 경우 자동 확장이 기본적으로 사용됩니다.

Streaming Engine을 사용하지 않는 작업에 자동 확장을 사용하려면 파이프라인을 시작할 때 다음 실행 매개변수를 설정합니다.

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=N

Streaming Engine을 사용하지 않는 스트리밍 작업의 경우 최소 작업자 수는 --maxNumWorkers 값의 1/15입니다.

스트리밍 파이프라인은 --maxNumWorkers와 같은 수로 영구 디스크의 고정 풀과 함께 배포됩니다. --maxNumWorkers를 지정할 때 이 점을 고려하여 이 값이 파이프라인에 충분한 디스크 수인지 확인하세요.

사용량 및 가격 책정

Compute Engine 사용량은 평균 작업자 수를 기준으로 하며, 영구 디스크 사용량은 --maxNumWorkers의 정확한 수를 기준으로 합니다. 영구 디스크는 각 작업자에게 동일한 수의 연결된 디스크가 할당되도록 재분배됩니다.

위 예시(--maxNumWorkers=15)에서는 Compute Engine 인스턴스 1~15개 사이 및 영구 디스크 15개에 대한 요금이 청구됩니다.

Python

스트리밍 자동 확장 사용

자동 확장을 사용하려면 파이프라인을 시작할 때 다음 실행 매개변수를 설정합니다.

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=N

Streaming Engine을 사용하지 않는 스트리밍 작업의 경우 최소 작업자 수는 --maxNumWorkers 값의 1/15입니다.

스트리밍 파이프라인은 --maxNumWorkers와 같은 수로 영구 디스크의 고정 풀과 함께 배포됩니다. --maxNumWorkers를 지정할 때 이 점을 고려하여 이 값이 파이프라인에 충분한 디스크 수인지 확인하세요.

사용량 및 가격 책정

Compute Engine 사용량은 평균 작업자 수를 기준으로 하며, 영구 디스크 사용량은 --max_num_workers의 정확한 수를 기준으로 합니다. 영구 디스크는 각 작업자에게 동일한 수의 연결된 디스크가 할당되도록 재분배됩니다.

위 예시(--max_num_workers=15)에서는 Compute Engine 인스턴스 1~15개 사이 및 영구 디스크 15개에 대한 요금이 청구됩니다.

자바: SDK 1.x

스트리밍 파이프라인 수동 확장

자동 확장을 스트리밍 모드에서 사용할 수 있게 되면 Dataflow의 Update 기능을 사용하여 스트리밍 파이프라인을 실행하는 작업자 수를 수동으로 조정할 수 있습니다.

자바: SDK 2.x

실행 중에 스트리밍 파이프라인을 확장해야 하는 경우 파이프라인을 시작할 때 다음 실행 매개변수를 설정했는지 확인합니다.

  • --maxNumWorkers를 파이프라인에 제공할 최대 작업자 수와 동일하게 설정합니다.
  • --numWorkers를 파이프라인을 실행할 때 사용할 작업자 초기 수와 동일하게 설정합니다.

파이프라인이 실행되면 파이프라인을 업데이트하고 --numWorkers 매개변수를 사용하여 새 작업자 수를 지정할 수 있습니다. 새 --numWorkers에 설정한 값은 N--maxNumWorkers 사이여야 합니다. 여기서 N--maxNumWorkers/15와 같습니다.

파이프라인을 업데이트하면 실행 중인 작업이 새 작업으로 바뀝니다. 이때 새 작업자 수가 사용되지만 이전 작업과 연관된 모든 상태 정보는 유지됩니다.

Python

실행 중에 스트리밍 파이프라인을 확장해야 하는 경우 파이프라인을 시작할 때 다음 실행 매개변수를 설정했는지 확인합니다.

  • --max_num_workers를 파이프라인에 제공할 최대 작업자 수와 동일하게 설정합니다.
  • --num_workers를 파이프라인을 실행할 때 사용할 작업자 초기 수와 동일하게 설정합니다.

파이프라인이 실행되면 파이프라인을 업데이트하고 --num_workers 매개변수를 사용하여 새 작업자 수를 지정할 수 있습니다. 새 --num_workers에 설정한 값은 N--max_num_workers 사이여야 합니다. 여기서 N--max_num_workers/15와 같습니다.

파이프라인을 업데이트하면 실행 중인 작업이 새 작업으로 바뀝니다. 이때 새 작업자 수가 사용되지만 이전 작업과 연관된 모든 상태 정보는 유지됩니다.

자바: SDK 1.x

동적 작업 재균등화

Dataflow 서비스의 동적 작업 재 균형 조정 기능을 사용하면 서비스에서 런타임 조건에 따라 작업을 동적으로 다시 파티션 할 수 있습니다. 이러한 조건에는 다음이 포함될 수 있습니다.

  • 작업 할당의 불균형
  • 작업자의 완료 시간이 예상보다 오래 걸림
  • 작업자가 예상보다 빨리 완료

Dataflow 서비스는 이러한 조건을 자동으로 감지하여 사용하지 않거나 사용량이 적은 작업자에게 작업을 동적으로 재 할당하여 작업의 전체 처리 시간을 줄일 수 있습니다.

제한사항

동적 작업 재균등화는 Cloud Dataflow 서비스가 입력 데이터 일부를 동시에 처리 중인 경우에만 발생합니다. 즉, 외부 입력 소스에서 데이터를 읽는 경우, 구체화된 중간 PCollection으로 작업하는 경우, GroupByKey와 같은 집계의 결과에 대해 작업하는 경우에만 발생합니다. 작업의 많은 단계가 융합되면 작업의 중간 PCollection이 줄어들고 동적 작업 재균등화는 소스의 구체화된 PCollection의 요소 수로 제한됩니다. 동적 작업 재균등화를 파이프라인의 특정 PCollection에 적용할 수 있도록 하려면 몇 가지 동적 동시 처리 방법으로 융합을 방지하면 됩니다.

동적 작업 재균등화는 데이터를 단일 레코드보다 더 미세하게 다시 동시 처리할 수 없습니다. 데이터 흐름이 처리 시간을 크게 지연시키는 개별 레코드를 포함하는 경우 Dataflow는 개별 작업 레코드를 세분화하여 여러 작업자에게 재배포 할 수 없으므로 작업이 지연 될 수 있습니다.

자바: SDK 2.x

파이프라인의 최종 출력에 고정된 수의 샤드를 설정(예: TextIO.Write.withNumShards를 사용하여 데이터 작성)한 경우 Cloud Dataflow는 선택한 샤드 수에 따라 동시 처리를 제한합니다.

Python

파이프라인의 최종 출력에 고정 개수의 분할을 설정한 경우(예 : beam.io.WriteToText(..., num_shards=...) ), Dataflow는 선택한 분할 수에 따라 동시 처리를 제한합니다.

자바: SDK 1.x

고정 샤드 제한은 일시적인 것으로 간주 될 수 있으며 Dataflow 서비스의 향후 릴리스에서 변경될 수 있습니다.

커스텀 데이터 소스 작업

자바: SDK 2.x

파이프라인에서 사용자가 제공한 커스텀 데이터 소스를 사용하는 경우 소스가 동적 작업 재균등화 기능을 사용하여 작업할 수 있도록 하려면 splitAtFraction 메서드를 구현해야 합니다.

Python

파이프라인에서 사용자가 제공한 커스텀 데이터 소스를 사용하는 경우 소스가 동적 작업 재균등화 기능을 사용하여 작업할 수 있도록 하려면 RangeTracker에서 try_claim, try_split, position_at_fraction, fraction_consumed메서드를 구현해야 합니다.

자세한 내용은 RangeTracker에 대한 API 참조 정보를 참조하세요.

자바: SDK 1.x

리소스 사용 및 관리

Dataflow 서비스는 작업별로 Google Cloud의 리소스를 완벽하게 관리합니다. 여기에는 Compute Engine 인스턴스(작업자 또는 VM이라고도 함) 가동과 종료, 프로젝트의 Cloud Storage 버킷에 액세스하여 I/O 및 임시 파일 스테이징 등이 포함됩니다. 하지만 파이프라인이 BigQueryPub/Sub와 같은 Google Cloud 데이터 저장소 기술과 상호 작용하는 경우, 해당 서비스의 리소스와 할당량을 관리해야합니다.

Cloud Storage의 사용자 제공 위치를 스테이징 파일 용으로 사용합니다. 이 위치는 사용자가 제어할 수 있으므로 이 위치에서 읽는 작업이 있으면 위치의 수명 주기가 유지되도록 해야 합니다. SDK에서 기본 제공되는 캐싱은 작업 시작 시간을 단축할 수 있으므로 동일한 스테이징 위치를 여러 작업 실행에 재사용할 수 있습니다.

작업

Google Cloud 프로젝트 당 최대 100 개의 동시 Dataflow 작업을 실행할 수 있습니다.

현재 Dataflow 서비스는 크기가 20MB 이하인 JSON 작업 요청만 처리할 수 있습니다. 작업 요청 크기는 특히 파이프라인의 JSON 표현과 연관됩니다. 파이프라인이 클수록 요청이 커집니다.

파이프라인의 JSON 요청 크기를 예측하려면 다음 옵션을 사용하여 파이프라인을 실행합니다.

자바: SDK 2.x

--dataflowJobFile=< path to output file >

Python

--dataflow_job_file=< path to output file >

자바: SDK 1.x

이 명령어는 작업의 JSON 표현을 파일에 작성합니다. 직렬화된 파일 크기는 요청 크기의 예상치에 가깝습니다. 실제 크기는 요청에 포함된 추가 정보로 인해 약간 더 커집니다.

자세한 내용은 '413 요청 개체가 너무 큼'/'파이프라인의 직렬화된 JSON 표현 크기가 허용 한도를 초과함' 문제해결 페이지를 참조하세요.

또한 작업의 그래프 크기가 10MB를 초과해서는 안 됩니다. 자세한 내용은 '작업 그래프가 너무 큽니다. 더 작은 작업 그래프로 다시 시도하거나, 작업을 더 작은 작업 두 개 이상으로 분할하세요.' 문제해결 페이지를 참조하세요.

작업자

Dataflow 서비스는 현재 작업당 최대 1000 Compute Engine 인스턴스를 허용합니다.. 일괄 작업의 기본 머신 유형은 n1-standard-1이고 스트리밍 작업의 경우에는 n1-standard-4입니다. 따라서 기본 머신 유형을 사용하면 Dataflow 서비스는 코어를 작업당 최대 4,000개까지 할당할 수 있습니다. 작업에 코어가 더 필요한 경우 더 큰 머신 유형을 선택할 수 있습니다.

사용 가능한 Compute Engine 머신 유형 계열과 커스텀 머신 유형을 사용할 수 있습니다. 최상의 결과를 얻으려면 n1 머신 유형을 사용하세요. f1g1 시리즈 작업자와 같은 공유 코어 머신 유형은 Dataflow의 서비스수준계약에서 지원되지 않습니다.

Dataflow는 작업자의 vCPU 수와 GB 단위 메모리를 기준으로 청구합니다. 청구되는 요금은 머신 유형과는 무관합니다. 파이프라인을 만들 때 적합한 실행 매개변수를 설정하여 파이프라인의 머신 유형을 지정할 수 있습니다.

자바: SDK 2.x

머신 유형을 변경하려면 --workerMachineType 옵션을 설정합니다.

Python

머신 유형을 변경하려면 --worker_machine_type 옵션을 설정합니다.

자바: SDK 1.x

리소스 할당량

Dataflow 서비스는 작업을 시작하는 데 필요한 Compute Engine 리소스 할당량과 최대 작업자 인스턴스까지 확장 할 수있는 Google Cloud 프로젝트를 확인합니다. 사용 가능한 리소스 할당량이 충분하지 않으면 작업을 시작할 수 없습니다.

Dataflow의 자동 확장 기능은 프로젝트의 사용 가능한 Compute Engine 할당량에 의해 제한됩니다. 작업 시작 시 할당량이 충분하지만 다른 작업이 프로젝트의 가용 할당량 중 나머지를 사용하면 첫 번째 작업은 실행되지만 완전히 확장될 수 없습니다.

하지만 Dataflow 서비스는 프로젝트의 리소스 할당량을 초과하는 작업의 할당량 증가를 관리하지 않습니다. Google Cloud Console을 사용할 수 있는 추가 리소스 할당량을 요청해야 합니다.

영구 디스크 리소스

Dataflow 서비스는 현재 스트리밍 작업을 실행할 때 15개의 작업자 인스턴스당 영구 디스크로 제한됩니다. 각 영구 디스크는 개별 Compute Engine 가상 머신에 대해 로컬입니다. 작업에 영구 디스크보다 많은 작업자가 있을 수 없습니다. 작업자와 디스크 간 1:1 비율이 최소 리소스 할당입니다.

작업자 VM에서 실행되는 작업의 경우 각 영구 디스크의 기본 크기는 일괄 모드에서 250GB 이고 스트리밍 모드에서 400GB 입니다. Streaming Engine 또는 Dataflow Shuffle을 사용하는 작업은 Dataflow 서비스 백엔드에서 실행되고 더 작은 디스크를 사용합니다.

위치

기본적으로 Cloud Dataflow 서비스는 us-central1 리전의 us-central1-f 영역에 Compute Engine 리소스를 배포합니다. --region 매개변수를 지정하여 이 설정을 재정의할 수 있습니다. 리소스에 특정 영역을 사용해야 하는 경우 파이프라인을 만들 때 --zone 매개변수를 사용합니다. 하지만 리전만 지정하고 영역은 지정하지 않는 것이 좋습니다. 이를 통해 Dataflow 서비스는 작업 생성 요청 시 사용 가능한 영역 용량에 따라 영역 내의 최적 영역을 자동으로 선택합니다. 자세한 내용은 리전별 엔드포인트 문서를 참조하세요.

스트리밍 엔진

현재 Dataflow 파이프라인 실행자는 스트리밍 파이프라인의 단계를 작업자 가상 컴퓨터에서 실행하여 작업자 CPU, 메모리, 영구 디스크 저장소를 사용합니다. Dataflow의 Streaming Engine은 파이프라인 실행을 작업자 VM 외부와 Dataflow 서비스 백엔드로 이동합니다.

스트리밍 엔진의 이점

스트리밍 엔진 모델에는 다음과 같은 이점이 있습니다.

  • 작업자 VM에서 소비되는 CPU, 메모리, 영구 디스크 저장소 리소스 감소 스트리밍 엔진은 더 작은 작업자 머신 유형(n1-standard-4 대신 n1-standard-2)에서 가장 잘 작동되며, 작은 작업자 부팅 디스크 이상의 영구 디스크를 필요로 하지 않으므로 리소스와 할당량을 적게 소모합니다.
  • 수신 데이터 볼륨의 변화에 대응하여 더 반응성 있는 자동 확장 수행. 스트리밍 엔진은 더욱 원활하고 세분화된 작업자 확장을 제공합니다.
  • 지원 용이성 개선. 서비스 업데이트를 적용하기 위해 파이프라인을 다시 배포하지 않아도 됩니다.

대부분의 작업자 리소스 감소는 작업 부하를 Dataflow 서비스로 이전하면 발생합니다. 이러한 이유로 스트리밍 엔진을 사용하면 관련 요금이 청구됩니다. 그러나 Streaming Engine을 사용하는 Dataflow 파이프라인의 총 청구액은 이 옵션을 사용하지 않는 Dataflow 파이프라인의 총 비용과 비교할 때 거의 동일합니다.

스트리밍 엔진 사용

스트리밍 엔진은 현재 다음 리전의 스트리밍 파이프라인에서 사용할 수 있습니다. 향후 다른 리전에서도 출시될 예정입니다.

  • us-west1(오리건)
  • us-central1(아이오와)
  • us-east1(사우스캐롤라이나)
  • us-east4북 버지니아
  • europe-west2(런던)
  • europe-west1(벨기에)
  • europe-west4(네덜란드)
  • europe-west3(프랑크푸르트)
  • asia-east1(타이완)
  • asia-northeast1(도쿄)

자바: SDK 2.x

스트리밍 파이프라인에 스트리밍 엔진을 사용하려면 다음 매개변수를 지정합니다.

  • --enableStreamingEngine - 자바용 Apache Beam SDK 버전 2.11.0 이상을 사용하는 경우
  • --experiments=enable_streaming_engine - 자바용 Apache Beam SDK 버전 2.10.0을 사용하는 경우

파이프라인에 Dataflow Streaming Engine을 사용하는 경우에는 --zone 매개변수를 지정하지 않습니다. 대신 --region 매개변수를 지정하고 값을 현재 스트리밍 엔진을 사용할 수 있는 리전 중 하나로 설정합니다. Dataflow가 지정한 리전에서 영역을 자동으로 선택합니다. --zone 매개변수를 지정하고 사용 가능한 리전 외부의 영역으로 설정하면 Dataflow에서 오류가 보고됩니다.

스트리밍 엔진은 더 작은 작업자 머신 유형에서 가장 잘 작동되므로 --workerMachineType=n1-standard-2로 설정하는 것이 좋습니다. 또한 스트리밍 엔진에 작업자 부팅 이미지와 로컬 로그용 공간만 있으면 되므로 --diskSizeGb=30으로 설정하면 됩니다. 이러한 값이 기본값입니다.

Python

스트리밍 파이프라인에 스트리밍 엔진을 사용하려면 다음 매개변수를 지정합니다.

--enable_streaming_engine

파이프라인에 Dataflow Streaming Engine을 사용하는 경우에는 --zone 매개변수를 지정하지 않습니다. 대신 --region 매개변수를 지정하고 값을 현재 스트리밍 엔진을 사용할 수 있는 리전 중 하나로 설정합니다. Dataflow가 지정한 리전에서 영역을 자동으로 선택합니다. --zone 매개변수를 지정하고 사용 가능한 리전 외부의 영역으로 설정하면 Dataflow에서 오류가 보고됩니다.

스트리밍 엔진은 더 작은 작업자 머신 유형에서 가장 잘 작동되므로 --machine_type=n1-standard-2로 설정하는 것이 좋습니다. 또한 스트리밍 엔진에 작업자 부팅 이미지와 로컬 로그용 공간만 있으면 되므로 --disk_size_gb=30으로 설정하면 됩니다. 이러한 값이 기본값입니다.

자바: SDK 1.x

Dataflow Shuffle

Dataflow Shuffle은 GroupByKey, CoGroupByKey, Combine과 같은 Dataflow 변환의 기본 연산입니다. Dataflow Shuffle 작업은 확장 가능하고 효율적이며 내결함성이 있는 방식으로 키를 사용해 데이터를 분할하고 그룹화합니다. 현재 Dataflow는 전체 작업자 가상 머신에서 실행되며 작업자 CPU, 메모리, 영구 디스크 저장소를 사용하는 셔플 구현을 사용합니다. 일괄 파이프라인에만 사용할 수 있는 서비스 기반 Dataflow Shuffle 기능은 셔플 작업을 작업자 VM에서 Dataflow 서비스 백엔드로 이동합니다.

Dataflow Shuffle의 장점

서비스 기반 Dataflow Shuffle에는 다음과 같은 이점이 있습니다.

  • 대부분의 파이프라인 작업 유형에 대한 일괄 파이프라인의 실행 시간 단축
  • 작업자 VM에서 소비되는 CPU, 메모리, 영구 디스크 저장소 리소스 감소
  • 자동 확장 기능 향상. VM이 더 이상 셔플 데이터를 보유하지 않기 때문에 더 일찍 축소할 수 있습니다.
  • 내결함성 향상: Cloud Dataflow Shuffle 데이터를 보유하는 비정상 VM이 있어도 전체 작업이 실패하지 않습니다. 하지만 이 기능을 사용하지 않으면 작업이 실패할 수 있습니다.

대부분의 작업자 리소스 감소는 셔플 작업을 Dataflow 서비스로 오프로드하는 데서 비롯됩니다. 따라서 Dataflow Shuffle 사용과 관련된 요금이 발생합니다. 하지만 서비스 기반 Cloud Dataflow 구현을 사용하는 Cloud Dataflow 파이프라인의 총 청구액은 이 옵션을 사용하지 않는 Cloud Dataflow 파이프라인 비용 보다 작거나 같을 수 있습니다.

대부분의 파이프라인 작업 유형에서 Dataflow Shuffle은 작업자 VM에서 실행되는 셔플 구현보다 빠르게 실행됩니다. 하지만 실행 시간은 실행할 때마다 다를 수 있습니다. 중요한 기한이 있는 파이프라인을 실행하는 경우, 기한 전에 충분한 여유 시간을 할당하는 것이 좋습니다. 또한 Shuffle에 대한 할당량을 더 많이 요청하는 것을 고려해 보세요.

디스크 고려 사항

서비스 기반 Dataflow Shuffle 기능을 사용할 때는 작업자 VM에 대용량 Persistent Disks를 연결할 필요가 없습니다. Dataflow는 25GB의 작은 부팅 디스크를 자동으로 연결합니다. 그러나 이 작은 디스크 크기로 인해 Dataflow Shuffle을 사용할 때 주의해야 할 중요한 고려 사항이 있습니다.

  • 작업자 VM은 운영체제, 바이너리, 로그, 컨테이너에 25GB의 디스크 공간 중 일부를 사용합니다. Dataflow Shuffle을 사용하면 디스크 용량이 크고 남은 디스크 용량을 초과하는 작업이 실패할 수 있습니다.
  • 작은 디스크의 성능으로 인해 많은 디스크 I/O를 사용하는 작업은 느려질 수 있습니다. 각 디스크 크기 간 성능 차이에 대한 자세한 내용은 Compute Engine 영구 디스크 성능 페이지를 참조하세요.그

이와 같은 고려 사항이 작업에 해당되면 파이프라인 옵션을 사용하여 더 큰 디스크 크기를 지정하면 됩니다.

Dataflow Shuffle 사용하기

서비스 기반 Dataflow Shuffle은 현재 다음 리전에서 사용할 수 있습니다.

  • us-west1(오리건)
  • us-central1(아이오와)
  • us-east1(사우스캐롤라이나)
  • us-east4북 버지니아
  • europe-west2(런던)
  • europe-west1(벨기에)
  • europe-west4(네덜란드)
  • europe-west3(프랑크푸르트)
  • asia-east1(타이완)
  • asia-northeast1(도쿄)

Dataflow Shuffle은 향후 추가 리전에서 사용할 수 있게 됩니다.

자바: SDK 2.x

서비스 파이프라인 Dataflow Shuffle을 일괄 파이프라인에서 사용하려면 다음 매개변수를 지정합니다.
--experiments=shuffle_mode=service

파이프라인에 Dataflow Shuffle을 사용하는 경우, --zone 매개변수를 지정하지 마세요. 대신 --region 매개변수를 지정하고 값을 현재 Shuffle을 사용할 수 있는 리전 중 하나로 설정합니다. Dataflow가 지정한 리전에서 영역을 자동 선택합니다. --zone 매개변수를 지정하고 사용 가능한 리전 외부의 영역으로 설정하면 Dataflow에서 오류가 보고됩니다.

Python

서비스 파이프라인 Dataflow Shuffle을 일괄 파이프라인에서 사용하려면 다음 매개변수를 지정합니다.
--experiments=shuffle_mode=service

파이프라인에 Dataflow Shuffle을 사용하는 경우, --zone 매개변수를 지정하지 마세요. 대신 --region 매개변수를 지정하고 값을 현재 Shuffle을 사용할 수 있는 리전 중 하나로 설정합니다. Dataflow가 지정한 리전에서 영역을 자동 선택합니다. --zone 매개변수를 지정하고 사용 가능한 리전 외부의 영역으로 설정하면 Dataflow에서 오류가 보고됩니다.

자바: SDK 1.x

Dataflow 가변형 리소스 예약

Dataflow FlexRS는 고급 예약 기술, Dataflow Shuffle 서비스, 선점형 가상 머신(VM) 인스턴스와 일반 VM의 조합을 사용하여 일괄 처리 비용을 줄입니다. Dataflow는 선점형 VM과 일반 VM을 동시에 실행하여 시스템 이벤트 발생 시 Compute Engine이 선점형 VM 인스턴스를 중지하는 경우 사용자 환경을 개선시킵니다. FlexRS를 사용하면 파이프라인이 계속해서 진행되고 Compute Engine에서 선점형 VM을 선점할 때 이전 작업이 손실되지 않습니다. FlexRS에 대한 자세한 내용은 Dataflow에서 가변형 리소스 예약 사용을 참조하세요.

Dataflow Runner v2

현재 프로덕션 Dataflow Runner는 Apache Beam 파이프라인을 실행할 때 언어별 작업자를 활용합니다. 확장성, 일반화, 효율성을 개선하기 위해 Dataflow Runner가 더 많은 서비스 기반 아키텍처로 전환되고 있습니다. 이러한 변경 사항에는 Shuffle 서비스 및 스트리밍 엔진과 함께 패키지된 보다 효율적이고 휴대할 수 있는 작업자 아키텍처가 포함됩니다.

Python 스트리밍 파이프라인을 테스트하는 데 새로운 Dataflow Runner인 Dataflow Runner v2로 이용할 수 있습니다. 모든 새 파이프라인에서 기본으로 사용 설정되기 전에 Dataflow Runner v2를 현재 작업 부하로 테스트하는 것이 좋습니다. 이 새로운 아키텍처를 활용하기 위해 파이프라인 코드를 변경할 필요는 없습니다.

Dataflow Runner v2 사용 시 이점

Python 스트리밍 파이프라인부터 Dataflow Runner v2에서만 새로운 기능을 사용할 수 있습니다. 또한 Dataflow Runner v2 아키텍처의 효율성이 향상되면 Dataflow 작업의 성능이 향상될 수 있습니다.

Dataflow Runner v2를 처음 테스트하는 동안 청구 금액이 감소할 수 있습니다. Dataflow Runner v2의 결제 모델은 아직 최종 버전이 아니므로 모든 파이프라인에서 새 실행기가 사용 설정되면 청구액이 거의 최신 수준으로 다시 증가할 수 있습니다.

Dataflow Runner v2 사용

Dataflow Runner v2는 Dataflow 리전 엔드포인트가 있는 리전에서 사용할 수 있습니다.

자바: SDK 2.x

현재 자바에서는 Dataflow Runner v2를 사용할 수 없습니다.

Python

Dataflow Runner v2에는 스트리밍 엔진이 필요합니다. 둘 다 사용 설정하려면 다음 매개변수를 지정하세요.
--experiments=use_runner_v2

Dataflow Runner v2 작업 디버깅

Dataflow Runner v2를 사용하여 작업을 디버깅하려면 표준 디버깅 단계를 따라야 합니다. 하지만 Dataflow Runner v2를 사용할 때는 다음 사항에 유의하세요.

  • Dataflow Runner v2 작업은 작업자 VM에서 두 가지 유형의 프로세스(SDK 프로세스 및 실행기 하네스 프로세스)를 실행합니다. 파이프라인 및 VM 유형에 따라 하나 이상의 SDK 프로세스가 존재할 수 있지만 VM당 실행기 하네스 프로세스는 하나만 있습니다.
  • 실행기 하네스 프로세스는 다른 모든 것을 관리하는 동안 SDK 프로세스는 사용자 코드 및 기타 언어별 함수를 실행합니다.
  • 실행기 하네스 프로세스는 Dataflow에서 작업을 요청하기 전에 모든 SDK 프로세스가 연결될 때까지 기다립니다.
  • SDK 프로세스 시작 중에 작업자 VM이 종속 항목을 다운로드하고 설치하면 작업이 지연될 수 있습니다. SDK 프로세스에 라이브러리 시작 또는 설치와 같은 문제가 있는 경우 작업자는 상태를 비정상으로 보고합니다.
  • 작업자 VM 로그(로그 뷰어 또는 Dataflow 모니터링 인터페이스를 통해 이용 가능)에는 실행자 하네스 프로세스의 로그와 SDK 프로세스의 로그가 포함됩니다.
  • 사용자 코드의 문제를 진단하려면 SDK 프로세스의 작업자 로그를 검사합니다. 실행기 하네스 로그에서 오류가 발생하면 지원팀에 문의하여 버그를 신고해 주세요.