높은 병렬 워크플로 권장사항

이 페이지에서는 파이프라인에서 외부 코드를 사용하는 방법, 파이프라인 실행 방법, 오류 처리 관리 등 Dataflow HPC 높은 병렬 워크플로를 빌드하고 실행할 때 따라야 할 권장사항에 대한 안내를 제공합니다.

파이프라인에 외부 코드 포함

높은 병렬 파이프라인의 주요 차별화 요소는 표준 Apache Beam SDK 언어 중 하나가 아닌 DoFn 내에서 C++ 코드를 사용한다는 것입니다. 자바 파이프라인의 경우 파이프라인에서 C++ 라이브러리를 더 쉽게 사용하려면 외부 프로시져 호출을 사용하는 것이 좋습니다. 이 섹션에서는 Java 파이프라인에서 외부(C++) 코드를 실행하는 데 사용되는 일반적인 접근 방법에 대해 설명합니다.

Apache Beam 파이프라인 정의에는 다음과 같은 몇 가지 주요 구성요소가 있습니다.

  • PCollections는 변경할 수 없는 동종 요소의 모음입니다.
  • PTransforms는 다른 PCollection을 생성하는 PCollection에 대한 변환을 정의하는 데 사용됩니다.
  • 파이프라인은 코드를 통해 PTransformsPCollections 간의 상호작용을 선언할 수 있는 구성입니다. 파이프라인은 방향성 비순환 그래프(DAG)로 표시됩니다.

표준 Apache Beam SDK 언어 중 하나가 아닌 언어의 코드를 사용하는 경우 DoFn 내에 있는 PTransform에 코드를 배치하고 표준 SDK 언어 중 하나를 사용하여 파이프라인 자체를 정의합니다. Python SDK에는 다른 코드를 더 간단하게 사용할 수 있는 유틸리티 클래스가 있으므로 Apache Beam Python SDK를 사용하여 파이프라인을 정의하는 것이 좋습니다. 하지만 다른 Apache Beam SDK를 사용할 수도 있습니다.

코드를 사용하여 전체 빌드 없이도 빠르게 실험을 수행할 수 있습니다. 프로덕션 시스템의 경우 일반적으로 자체 바이너리를 생성하므로 필요에 따라 프로세스를 자유롭게 조정할 수 있습니다.

다음 다이어그램은 파이프라인 데이터의 두 가지 용도를 나타냅니다.

  • 데이터는 프로세스를 구동하는 데 사용됩니다.
  • 데이터는 프로세스 중 수집되고 드라이버 데이터에 결합됩니다.

파이프라인 데이터의 두 단계

이 페이지에서는 소스의 기본 데이터를 구동 데이터라고 부르고, 처리 단계의 보조 데이터를 결합 데이터라고 부릅니다.

금융 사용 사례에서 구동 데이터는 수십만 거래에서 사용될 수 있습니다. 각 거래는 시장 데이터와 함께 처리해야 합니다. 이 경우 시장 데이터는 결합 데이터입니다. 미디어 사용 사례에서 구동 데이터는 처리가 필요한 이미지 파일이지만 다른 데이터 소스는 필요하지 않으므로 결합 데이터를 사용하지 않습니다.

구동 데이터의 크기 고려 사항

구동 데이터 요소의 크기가 몇 메가바이트 범위 내일 경우 소스에서 PCollection 객체를 생성하고 처리를 위해 이 객체를 Apache Beam 변환으로 전송하는 일반 Apache Beam 패러다임으로 처리해야 합니다.

구동 데이터 요소의 크기가 미디어 분야에서와 같이 수 메가바이트 또는 기가바이트 단위일 경우 구동 데이터를 Cloud Storage에 저장할 수 있습니다. 그런 다음 PCollection 객체를 시작할 때 스토리지 URI를 참조하고 사용된 데이터에 대한 URI 참조만을 사용합니다.

결합 데이터의 크기 고려 사항

결합 데이터가 몇 백 메가바이트 이하일 경우 부가 입력을 사용하여 데이터를 Apache Beam 변환으로 가져옵니다. 부가 입력은 데이터 패킷을 이를 필요로 하는 모든 작업자로 전송합니다.

결합 데이터가 기가바이트 또는 테라바이트 범위일 경우 데이터의 특성에 따라 Bigtable 또는 Cloud Storage를 사용하여 결합 데이터를 구동 데이터에 결합할 수 있습니다. Bigtable은 Bigtable에서 주로 키-값 조회로 시장 데이터에 액세스하는 금융 시나리오에 적합합니다. 시계열 데이터 작업 권장 사항을 포함한 Bigtable 스키마 설계에 대한 자세한 내용은 다음 Bigtable 문서를 참조하세요.

외부 코드 실행

다양한 방법으로 Apache Beam에서 외부 코드를 실행할 수 있습니다.

  • Dataflow 변환 내의 DoFn 객체에서 호출되는 프로세스를 만듭니다.

  • Java SDK로 JNI를 사용합니다.

  • DoFn 객체에서 직접 하위 프로세스를 만듭니다. 이 접근 방법이 가장 효율적인 것은 아니지만 강력하고 구현이 간편합니다. JNI를 사용할 경우 발생할 수 있는 문제로 인해 이 페이지에서는 하위 프로세스 호출을 사용하는 방법을 보여줍니다.

워크플로를 설계할 때는 전체 엔드 투 엔드 파이프라인을 고려하세요. 프로세스가 실행되는 과정에서 발생하는 비효율성에 비해 소스에서 싱크까지의 데이터 이동이 단일 파이프라인으로 수행되는 방식의 이점이 큽니다. 이 방법을 다른 방법과 비교할 경우 파이프라인의 엔드 투 엔드 시간과 엔드 투 엔드 비용을 살펴보세요.

호스트로 바이너리 가져오기

기본 Apache Beam 언어를 사용하면 Apache Beam SDK가 자동으로 필요한 모든 코드를 작업자로 이동합니다. 하지만 외부 코드를 호출할 경우 수동으로 코드를 이동해야 합니다.

버킷에 저장된 바이너리 파일

코드를 이동하려면 다음 단계를 따르세요. 이 예시에서는 Apache Beam Java SDK의 단계를 보여줍니다.

  1. 컴파일된 외부 코드를 버전 정보와 함께 Cloud Storage에 저장합니다.
  2. @Setup 메서드에서 동기화된 블록을 만들어 코드 파일을 로컬 리소스에서 사용할 수 있는지 확인합니다. 물리적 검사를 구현하는 대신 첫 스레드가 완료될 때 정적 변수를 사용하여 가용성을 확인할 수 있습니다.
  3. 이 파일을 사용할 수 없으면 Cloud Storage 클라이언트 라이브러리를 사용하여 Cloud Storage 버킷의 파일을 로컬 작업자로 가져옵니다. 이 작업에는 Apache Beam FileSystems 클래스를 사용하는 것이 좋습니다.
  4. 파일을 이동한 후 실행 비트가 코드 파일에 설정되어 있는지 확인합니다.
  5. 프로덕션 시스템에서 바이너리의 해시를 검사하여 파일이 올바르게 복사되었는지 확인합니다.

Apache Beam filesToStage 함수를 사용할 수도 있지만 그러면 Java 코드를 자동으로 패키징하고 이동하는 실행기 기능의 이점을 누릴 수 없습니다. 또한 하위 프로세스 호출에 절대 파일 위치가 필요하기 때문에 코드를 사용하여 클래스 경로를 결정해야 하므로 filesToStage에 의해 이동한 파일의 위치를 결정해야 합니다. 이 방법은 사용하지 않는 것이 좋습니다.

외부 바이너리 실행

외부 코드를 실행하려면 먼저 래퍼를 만들어야 합니다. 이 래퍼는 외부 코드와 동일한 언어(예: C++) 또는 셸 스크립트로 작성합니다. 래퍼를 사용하면 이 페이지의 소규모 CPU 주기용 처리 설계 섹션에 설명된 대로 파일 처리를 전달하고 최적화를 구현할 수 있습니다. 래퍼는 정교하지 않아도 됩니다. 다음 스니펫은 C++로 작성한 래퍼를 간략하게 나타냅니다.

int main(int argc, char* argv[])
{
    if(argc < 3){
        std::cerr << "Required return file and data to process" << '\n';
        return 1;
    }

    std::string returnFile = argv[1];
    std::string word = argv[2];

    std::ofstream myfile;
    myfile.open (returnFile);
    myfile << word;
    myfile.close();
    return 0;
}

이 코드는 인수 목록에서 두 개의 매개변수를 읽습니다. 첫 번째 매개변수는 데이터를 푸시할 반환 파일의 위치입니다. 두 번째 매개변수는 코드가 사용자에게 표시하는 데이터입니다. 실제 구현에서 이 코드는 'Hello, world!' 이상을 반환합니다.

래퍼 코드를 작성한 후 다음을 수행하여 외부 코드를 실행합니다.

  1. 외부 코드 바이너리로 데이터를 전송합니다.
  2. 바이너리를 실행하고, 오류를 포착하고, 오류와 결과를 로깅합니다.
  3. 로깅 정보를 처리합니다.
  4. 완료된 처리에서 데이터를 캡처합니다.

바이너리로 데이터 전송

라이브러리 실행 프로세스를 시작하려면 데이터를 C++ 코드로 전송합니다. 이 단계에서는 다른 Google Cloud 도구와 통합된 Dataflow를 활용할 수 있습니다. Bigtable과 같은 도구를 사용하면 대규모 데이터 세트를 처리하고 지연 시간이 짧고 동시성이 높은 액세스를 실현할 수 있으므로 수천 개의 코어가 데이터 세트에 동시에 액세스할 수 있습니다. 또한 Bigtable은 데이터를 사전 처리하여 데이터 형성, 보강, 필터링을 수행할 수 있습니다. 이 모든 작업은 외부 코드를 실행하기 전에 Apache Beam 변환으로 수행할 수 있습니다.

프로덕션 시스템의 경우 프로토콜 버퍼를 사용하여 입력 데이터를 캡슐화하는 방법이 권장됩니다. 입력 데이터를 바이트로 변환하고 외부 라이브러리로 전달하기 전에 base64로 인코딩할 수 있습니다. 이 데이터를 외부 라이브러리로 전달하는 두 가지 방법은 다음과 같습니다.

  • 소량 입력 데이터. 시스템의 최대 명령어 인수 길이를 초과하지 않는 소량 데이터의 경우 java.lang.ProcessBuilder로 빌드되는 프로세스의 2번 위치로 인수를 전달합니다.
  • 대량 입력 데이터. 대량 데이터 크기의 경우 이름에 UUID가 포함된 파일을 생성하여 프로세스에 필요한 데이터를 포함합니다.

C++ 코드 실행, 오류 포착, 로깅

오류 정보 포착 및 처리는 파이프라인의 중요한 부분입니다. Dataflow 실행기에서 사용하는 리소스는 일시적이므로 작업자 로그 파일을 검사하기 어렵습니다. 유용한 정보를 모두 포착하여 Dataflow 실행기 로깅으로 푸시하고 하나 이상의 Cloud Storage 버킷에 로깅 데이터를 저장하도록 하세요.

stdoutstderr를 파일로 리디렉션하여 메모리 부족 문제를 방지하는 것이 좋습니다. 예를 들어 C++ 코드를 호출하는 Dataflow 실행기에서 다음과 같은 줄을 포함할 수 있습니다.

자바

  import java.lang.ProcessBuilder.Redirect;
  ...
      processbuilder.redirectError(Redirect.appendTo(errfile));
      processbuilder.redirectOutput(Redirect.appendTo(outFile));

Python

# Requires Apache Beam 2.34 or later.
stopping_times, bad_values = (
    integers
    | beam.Map(collatz.total_stopping_time).with_exception_handling(
        use_subprocess=True))

# Write the bad values to a side channel.
bad_values | 'WriteBadValues' >> beam.io.WriteToText(
    os.path.splitext(output_path)[0] + '-bad.txt')

로깅 정보 처리

대부분의 사용 사례에서는 수백만 개의 요소를 처리해야 합니다. 성공적으로 처리가 완료되고 나면 의미가 없거나 가치가 적은 로그가 생성되므로 로그 데이터 보관에 대해 비즈니스 결정을 내려야 합니다. 예를 들어 모든 로그 데이터를 유지하는 대신 다음과 같은 대안을 고려할 수 있습니다.

  • 성공적인 요소 처리로 인해 발생한 로그에 포함된 정보가 가치가 없을 경우 보관하지 않습니다.
  • 로그 데이터를 샘플링하는 로직을 만듭니다(예: 1만 번째 로그 항목만 샘플링). 프로세스가 동일한 경우(반복되는 코드가 본질적으로 동일한 로그 데이터를 생성하는 경우) 이 방법으로 로그 데이터 유지와 처리 최적화 간에 효과적으로 균형을 유지할 수 있습니다.

오류 발생 시 로그에 덤핑되는 데이터의 양이 많을 수 있습니다. 대규모 오류 로그 데이터를 처리하는 효과적인 전략은 로그 항목의 처음 몇 줄을 읽고 해당 줄만 Cloud Logging으로 푸시하는 것입니다. 나머지 로그 파일을 Cloud Storage 버킷으로 로드할 수 있습니다. 이 접근 방식을 사용하면 나중에 오류 로그의 첫 번째 줄을 확인하고 필요한 경우 Cloud Storage에서 전체 파일을 참조할 수 있습니다.

로그 파일의 크기를 확인하는 것도 유용합니다. 파일 크기가 0일 경우 안전하게 무시하거나 파일에 데이터가 없다는 간단한 로그 메시지를 기록할 수 있습니다.

완료된 처리에서 데이터 캡처

stdout을 사용하여 계산 결과를 DoFn 함수에 다시 전달하지 않는 것이 좋습니다. C++ 코드가 호출하는 다른 코드와 사용자 자신의 코드도 stdout에 메시지를 보낼 수 있으며, 이로 인해 로깅 데이터를 포함하는 stdoutput 스트림이 오염될 수 있습니다. 그 대신 C++ 래퍼 코드를 변경하여 코드가 값을 저장하는 파일을 생성할 위치를 나타내는 매개변수를 허용하도록 하는 것이 좋습니다. 이 파일은 C++ 코드를 사용하여 객체를 자바 또는 Python 코드로 다시 전달하도록 하는 프로토콜 버퍼를 사용하는 언어 중립적인 방법으로 저장하는 것이 좋습니다. DoFn 객체는 파일에서 직접 결과를 읽고 결과 정보를 자체 output 호출에 전달할 수 있습니다.

경험을 통해 프로세스 자체를 다루는 단위 테스트 실행의 중요성이 확인되었습니다. Dataflow 파이프라인과 독립적으로 프로세스를 실행하는 단위 테스트를 구현하는 것이 중요합니다. 라이브러리가 독립형일 경우 디버깅을 훨씬 효율적으로 수행할 수 있으며 전체 파이프라인을 실행하지 않아도 됩니다.

작은 CPU 주기의 처리 설계

하위 프로세스 호출에는 오버헤드가 발생합니다. 워크로드에 따라 추가 작업을 수행하여 수행 중인 작업과 프로세스 시작 및 종료를 관리하는 오버헤드 간의 비율을 줄여야 할 수 있습니다.

미디어 사용 사례에서 구동 데이터 요소의 크기는 수 메가바이트 또는 기가바이트 단위일 수 있습니다. 따라서 각 데이터 요소의 처리에는 몇 분이 소요될 수 있습니다. 이 경우 하위 프로세스 호출 비용은 전반적인 처리 시간에서 미미한 비율을 차지합니다. 이 상황에서 가장 좋은 방법은 단일 요소에서 자체 프로세스를 시작하는 것입니다.

하지만 금융과 같은 다른 사용 사례에서는 처리에 매우 작은 단위의 CPU 시간(1만 분의 1초)이 소요됩니다. 이 경우 하위 프로세스 호출의 오버헤드가 불균형적으로 커집니다. 이 문제에 대한 해결책은 Apache Beam의 GroupByKey 변환을 사용하여 50~100개의 요소로 이루어진 배치를 만들어 프로세스에 제공하는 것입니다. 예를 들어 다음 단계를 따르세요.

  • DoFn 함수에서 키-값 쌍을 만듭니다. 금융 거래를 처리하는 경우 거래 번호를 키로 사용할 수 있습니다. 키로 사용할 고유 번호가 없는 경우, 데이터에서 체크섬을 생성하고 모듈로 함수를 사용하여 50개의 요소로 구성된 파티션을 만들 수 있습니다.
  • 키를 GroupByKey.create 함수로 보내면 50개의 요소를 포함하는 KV<key,Iterable<data>> 컬렉션이 반환되고, 이 컬렉션을 프로세스로 보낼 수 있습니다.

작업자 동시 로드 제한

Dataflow 실행기에서 기본적으로 지원하는 언어로 작업할 경우에는 작업자에서 일어나는 일을 고려할 필요가 없습니다. Dataflow에서는 다양한 프로세스가 일괄 또는 스트림 모드에서 흐름 제어 및 스레드를 감독합니다.

하지만 C++ 등의 외부 언어를 사용하는 경우 하위 프로세스를 시작하여 일상적인 범위 밖의 작업을 수행하고 있음을 명심해야 합니다. 일괄 모드에서 Dataflow 실행기는 스트리밍 모드에 비해 CPU에 더 적은 비율의 작업 스레드를 사용합니다. 특히 스트리밍 모드에서는 클래스 내에 세마포어를 생성하여 개별 작업자의 동시 로드를 직접 제어하는 것이 좋습니다.

예를 들어 미디어 처리 시 단일 작업자가 수백 개의 트랜스코딩 요소를 동시에 처리하지 않고자 할 수 있습니다. 이러한 경우에는 수행 중인 작업에 DoFn 함수에 대한 허용을 제공하는 유틸리티 클래스를 만들 수 있습니다. 이 클래스를 사용하면 파이프라인 내의 작업자 스레드를 직접 제어할 수 있습니다.

Google Cloud에서 대용량 데이터 싱크 사용

데이터가 처리되고 나면 데이터 싱크로 전송됩니다. 싱크는 그리드 처리 솔루션으로 생성된 결과의 볼륨을 처리할 수 있어야 합니다.

다음 다이어그램은 Dataflow가 그리드 워크로드를 실행할 때 Google Cloud에서 사용할 수 있는 몇 가지 싱크를 보여 줍니다.

Google Cloud에서 사용 가능한 싱크

Bigtable, BigQuery, Pub/Sub는 모두 대규모 데이터 스트림을 처리할 수 있습니다. 예를 들어 각 Bigtable 노드는 수평 확장이 용이하며 최대 1K 크기의 삽입을 초당 10,000개 처리할 수 있습니다. 따라서 100개 노드 Bigtable 클러스터는 Dataflow 그리드에서 생성되는 메시지를 초당 1,000,000개까지 받아들일 수 있습니다.

segfault 관리

파이프라인 내에서 C++ 코드를 사용할 때는 올바르게 처리되지 않는 경우 로컬에 영향을 주지 않으므로 segfault 관리 방법을 결정해야 합니다. Dataflow 실행기는 자바, Python 또는 Go에서 필요에 따라 프로세스를 만든 후 번들 형태로 프로세스에 작업을 할당합니다.

C++ 코드 호출이 JNI 또는 Cython과 같은 긴밀하게 결합된 도구와 C++ 프로세스 segfault를 사용하는 경우 호출 프로세스 및 자바 가상 머신(JVM)도 비정상 종료됩니다. 이 시나리오에서는 잘못된 데이터 포인트를 포착할 수 없습니다. 잘못된 데이터 포인트를 포착할 수 있도록 느슨한 결합을 사용하여 잘못된 데이터를 분기하고 파이프라인을 계속 유지합니다. 그러나 모든 데이터 변형에 대해 완전히 테스트된 성숙한 C++ 코드를 사용하면 Cython과 같은 메커니즘을 사용할 수 있습니다.

다음 단계