병렬 처리

파이프라인은 머신 클러스터에서 실행됩니다. 실행해야 할 작업을 분할한 다음 클러스터에 분산된 여러 실행자에서 작업을 동시에 실행하여 높은 처리량을 달성합니다. 일반적으로 분할(파티션이라고도 함) 수가 클수록 파이프라인을 더 빠르게 실행할 수 있습니다. 파이프라인의 병렬 처리 수준은 파이프라인의 소스와 셔플 단계에 따라 결정됩니다.

소스

각 파이프라인 실행이 시작되면 파이프라인의 모든 소스는 읽어야 하는 데이터와 해당 데이터를 분할로 나눌 수 있는 방법을 계산합니다. 예를 들어 Cloud Storage에서 읽고, Wrangler 변환을 실행한 후 Cloud Storage에 다시 쓰는 기본 파이프라인을 생각해 보세요.

Cloud Storage 소스, Wrangler 변환, Cloud Storage 싱크를 보여주는 기본 파이프라인

파이프라인이 시작되면 Cloud Storage 소스는 입력 파일을 검사하고 파일 크기를 기준으로 분할합니다. 예를 들어 1GB의 단일 파일을 크기가 각각 10MB인 100개의 분할로 나눌 수 있습니다. 각 실행자는 해당 분할에 대한 데이터를 읽고 Wrangler 변환을 실행한 후 출력을 부분 파일에 씁니다.

Cloud Storage의 데이터를 병렬 Wrangler로 파티션하여 부분 파일 변환

파이프라인이 느리게 실행되는 경우 가장 먼저 확인해야 할 사항 중 하나는 소스가 병렬 처리를 최대한 활용할 만큼 충분한 분할을 생성하고 있는지 여부입니다. 예를 들어 일부 압축 유형은 일반 텍스트 파일을 분할할 수 없게 만듭니다. 압축이 풀린 파일을 읽는 경우, 압축이 풀리지 않은 파일이나 분할이 가능한 BZIP으로 압축된 파일을 읽을 때보다 파이프라인이 훨씬 느리게 실행되는 것을 확인할 수 있습니다. 마찬가지로 데이터베이스 소스를 사용하면서 단일 분할만 사용하도록 구성한 경우 더 많은 분할을 사용하도록 구성한 경우보다 훨씬 느리게 실행됩니다.

셔플

특정 유형의 플러그인을 사용하면 클러스터 전반에서 데이터가 셔플됩니다. 이는 한 실행자에 의해 처리되는 레코드를 다른 실행자에게 전송하여 계산을 수행해야 할 때 발생합니다. 셔플은 많은 I/O가 포함되므로 비용이 많이 드는 작업입니다. 데이터를 셔플을 유발하는 플러그인은 모두 Pipeline Studio의 애널리틱스 섹션에 표시됩니다. 여기에는 그룹화, 중복 삭제, 고유, 조인과 같은 플러그인이 포함됩니다. 예를 들어 위 예시에서 그룹화 단계가 파이프라인에 추가되었다고 가정해 보겠습니다.

또한 읽고 있는 데이터가 식료품점에서의 구매를 나타낸다고 가정해 보겠습니다. 각 레코드에는 item 필드와 num_purchased 필드가 포함됩니다. 그룹 기준 단계에서 item 필드의 레코드를 그룹화하고 num_purchased 필드의 합계를 계산하도록 파이프라인을 구성합니다.

파이프라인이 실행되면 앞에서 설명한 대로 입력 파일이 분할됩니다. 그런 다음 각 레코드는 클러스터 전체에서 셔플되어 동일한 항목이 있는 모든 레코드가 동일한 실행자에 속하게 됩니다.

위 예에서 볼 수 있듯이 사과 구매 기록은 원래 여러 실행자에 걸쳐 분산되어 있었습니다. 집계를 수행하려면 이러한 모든 레코드가 클러스터를 통해 동일한 실행자에게 전송되어야 했습니다.

셔플이 필요한 대부분의 플러그인에서는 데이터를 셔플할 때 사용할 파티션 수를 지정할 수 있습니다. 이는 셔플된 데이터를 처리하는 데 사용되는 실행자 수를 제어합니다.

위 예에서 파티션 수가 2로 설정되면 각 실행자는 하나가 아닌 두 항목에 대한 집계를 계산합니다.

이 단계를 거치면 파이프라인의 병렬 처리를 줄일 수 있습니다. 예를 들어 다음과 같은 파이프라인의 논리적 뷰를 살펴보겠습니다.

소스는 500개의 파티션으로 데이터를 분할하지만 그룹화에서 200개의 파티션을 사용하여 셔플하는 경우 그룹화 후 최대 병렬 처리 수가 500에서 200으로 감소합니다. Cloud Storage에 500개의 서로 다른 부분 파일이 작성되는 대신 200개만 작성됩니다.

파티션 선택

파티션 수가 너무 적으면 클러스터의 전체 용량을 사용하여 최대한 많은 작업을 병렬화할 수 없습니다. 파티션을 너무 높게 설정하면 불필요한 오버헤드가 증가합니다. 일반적으로 파티션을 너무 적게 사용하는 것보다 너무 많이 사용하는 것이 좋습니다. 파이프라인을 실행하는 데 몇 분이 걸리고 몇 분을 줄이려고 하면 추가 오버헤드가 발생할 수 있습니다. 파이프라인을 실행하는 데 몇 시간이 걸리는 경우 일반적으로 오버헤드는 걱정할 필요가 없습니다.

사용할 파티션 수를 결정하는 유용하지만 지나치게 단순한 방법은 max(cluster CPUs, input records / 500,000)로 설정하는 것입니다. 즉, 입력 레코드 수를 500,000으로 나눕니다. 이 숫자가 클러스터 CPU 수보다 크면 파티션 수로 사용합니다. 그렇지 않으면 클러스터 CPU 수를 사용합니다. 예를 들어 클러스터에 CPU가 100개 있고 셔플 단계에 입력 레코드가 1억 개 있을 것으로 예상되는 경우 200개의 파티션을 사용합니다.

더 완전한 답변은 각 파티션의 중간 셔플 데이터가 실행자의 메모리에 딱 들어 맞아 디스크에 분산할 필요가 없을 때 셔플의 성능이 가장 좋다는 것입니다. Spark는 셔플 데이터를 보관하기 위해 실행자 메모리의 30% 미만을 예약합니다. 정확한 숫자는 (총 메모리 - 300MB) * 30%입니다. 각 실행자가 2GB의 메모리를 사용하도록 설정되어 있다고 가정하면, 각 파티션은 (2GB - 300MB) * 30% = 약 500MB 이하의 레코드를 보유해야 합니다. 각 레코드가 1KB 크기로 압축된다고 가정하면 파티션당 (500MB/파티션) / (1KB/레코드) = 500,000개의 레코드가 된다는 의미입니다. 실행자가 더 많은 메모리를 사용하거나 레코드가 더 작으면 이 숫자를 적절하게 조정할 수 있습니다.

데이터 편향

위 예시에서는 다양한 품목에 대한 구매가 균등하게 분배되었습니다. 즉, 사과, 바나나, 당근, 계란을 각각 세 번씩 구매했습니다. 균등하게 분포된 키에서 셔플하는 것이 가장 성능이 우수한 셔플 유형이지만 많은 데이터 세트에는 이 속성이 없습니다. 위 예시에서 식료품 구매를 계속하면 웨딩 카드보다 계란 구매가 더 많을 것으로 예상할 수 있습니다. 다른 키보다 훨씬 더 일반적인 셔플 키가 몇 개 있는 경우 편향된 데이터를 다루고 있는 것입니다. 편향된 데이터는 소수의 실행자가 불균형한 양의 작업을 실행하므로 편향되지 않은 데이터보다 성능이 현저히 떨어질 수 있습니다. 이로 인해 파티션의 작은 하위 집합이 다른 모든 파티션보다 훨씬 커집니다.

이 예시에서 달걀 구매 횟수는 카드 구매 횟수보다 5배 더 많으므로 달걀 집계는 계산하는 데 약 5배 더 오래 걸립니다. 레코드가 2개가 아닌 10개만 처리되는 경우에는 큰 문제가 아니지만 10억 개가 아닌 50억 개의 레코드를 처리할 때는 큰 차이가 납니다. 데이터 편향이 있는 경우 셔플에 사용되는 파티션 수는 파이프라인 성능에 큰 영향을 미치지 않습니다.

시간 경과에 따른 출력 레코드의 그래프를 검사하여 데이터 편향을 파악할 수 있습니다. 단계가 파이프라인 실행을 시작할 때 훨씬 빠른 속도로 레코드를 출력하다가 갑자기 속도가 느려진다면 데이터가 편향된 것일 수 있습니다.

시간 경과에 따른 클러스터 메모리 사용량을 검사하여 데이터 편향을 파악할 수도 있습니다. 클러스터가 한동안 최대 용량을 사용하지만 갑자기 일정 기간 동안 메모리 사용량이 줄어든다면 데이터 편향이 발생하고 있다는 신호이기도 합니다.

편향된 데이터는 조인이 실행될 때 성능에 가장 큰 영향을 미칩니다. 편향된 조인의 성능을 개선하는 데 사용할 수 있는 몇 가지 기술이 있습니다. 자세한 내용은 JOIN 작업의 병렬 처리를 참고하세요.

실행을 위한 적응형 조정

실행을 적응적으로 조정하려면 정확한 파티션 수가 아닌 사용할 파티션 범위를 지정하세요. 파이프라인 구성으로 설정하더라도 적응형 실행이 사용 설정되면 정확한 파티션 번호는 무시됩니다.

임시 Dataproc 클러스터를 사용하는 경우 Cloud Data Fusion은 적절한 구성을 자동으로 설정하지만 정적 Dataproc 또는 Hadoop 클러스터의 경우 다음 두 가지 구성 파라미터를 설정할 수 있습니다.

  • spark.default.parallelism: 클러스터에서 사용 가능한 총 vCore 수로 설정합니다. 이렇게 하면 클러스터의 부하가 걸리지 않고 파티션 수의 하한값을 정의할 수 있습니다.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: 클러스터에서 사용할 수 있는 vCore 수의 32배로 설정합니다. 이는 파티션 수의 상한을 정의합니다.
  • Spark.sql.adaptive.enabled: 최적화를 사용 설정하려면 이 값을 true로 설정합니다. Dataproc에서는 자동으로 설정하지만 일반 Hadoop 클러스터를 사용하는 경우 이 설정이 사용 설정되어 있는지 확인해야 합니다 .

이러한 파라미터는 특정 파이프라인의 엔진 구성 또는 정적 Dataproc 클러스터의 클러스터 속성에서 설정할 수 있습니다.

다음 단계