Spark 작업 미세 조정 팁

다음 섹션에서는 Dataproc Spark 애플리케이션을 미세 조정하는 데 도움이 되는 팁을 제공합니다.

임시 클러스터 사용

Dataproc "임시" 클러스터 모델을 사용할 때는 각 작업에 대해 전용 클러스터를 만들고, 작업이 완료되면 클러스터를 삭제합니다. 임시 모델을 사용하면 스토리지와 컴퓨팅을 별개로 취급하여 Cloud Storage 또는 BigQuery에 작업 입력 및 출력 데이터를 저장하고, 컴퓨팅 및 임시 데이터 스토리지를 위해서만 클러스터를 사용할 수 있습니다.

영구 클러스터 문제

임시 단일 작업 클러스터를 사용하면 다음과 같은 공유 및 장기 실행 '영구' 클러스터 사용과 관련된 문제 및 잠재적 문제를 방지할 수 있습니다.

  • 단일 장애점: 공유 클러스터 오류 상태로 인해 모든 작업이 실패하여 전체 데이터 파이프라인이 차단될 수 있습니다. 오류를 조사하고 복구하는 데 몇 시간이 걸릴 수 있습니다. 임시 클러스터는 임시 클러스터 내 상태만 유지하므로 오류가 발생하면 신속하게 삭제하고 다시 만들 수 있습니다.
  • HDFS, MySQL, 로컬 파일 시스템에서 클러스터 상태를 유지 및 마이그레이션하기 어려움
  • SLO에 부정적인 영향을 미치는 작업 간 리소스 경합
  • 메모리 부족으로 인해 응답하지 않는 서비스 데몬
  • 디스크 용량을 초과할 수 있는 로그 및 임시 파일 누적
  • 클러스터 영역 소진으로 인한 확장 실패
  • 오래된 클러스터 이미지 버전에 대한 지원 부족

임시 클러스터 이점

긍정적 측면에서 임시 클러스터를 사용하면 다음을 수행할 수 있습니다.

  • Dataproc VM 서비스 계정이 다른 작업마다 IAM 권한을 다르게 구성합니다.
  • 각 작업에 대해 클러스터 하드웨어 및 소프트웨어 구성을 최적화하고 필요에 따라 클러스터 구성을 변경합니다.
  • 최신 보안 패치, 버그 수정, 최적화를 활용하기 위해 새 클러스터에서 이미지 버전을 업그레이드합니다.
  • 격리된 단일 작업 클러스터에서 보다 빠르게 문제를 해결합니다.
  • 공유 클러스터의 작업 간 유휴 시간이 아니라 임시 클러스터 실행 시간에 대해서만 비용을 지불하여 비용을 절약합니다.

Spark SQL 사용

Spark SQL DataFrame API는 중요한 RDD API 최적화입니다. RDD를 사용하는 코드와 상호작용하는 경우 코드로 RDD를 전달하기 전에 데이터를 DataFrame으로 읽는 것이 좋습니다. 자바 또는 Scala 코드에서 Spark SQL Dataset API를 RDD 및 DataFrames의 상위 집합으로 사용하는 것이 좋습니다.

Apache Spark 3 사용

Dataproc 2.0은 다음과 같은 기능과 성능 개선이 포함된 Spark 3을 설치합니다.

  • GPU 지원
  • 바이너리 파일 읽기 기능
  • 성능 개선
  • 동적 파티션 프루닝
  • Spark 작업을 실시간으로 최적화하는 적응형 쿼리 실행

동적 할당 사용

Apache Spark에는 클러스터 내 작업자의 Spark 실행자 수를 확장하는 동적 할당 기능이 포함됩니다. 이 기능을 사용하면 클러스터가 확장되는 경우에도 작업이 전체 Dataproc 클러스터를 사용할 수 있습니다. 이 기능은 Dataproc에서 기본적으로 사용 설정됩니다(spark.dynamicAllocation.enabledtrue로 설정됨). 자세한 내용은 Spark 동적 할당을 참조하세요.

Dataproc 자동 확장 사용

Dataproc 자동 확장은 클러스터에서 Dataproc 작업자를 동적으로 추가하고 삭제하여 Spark 작업이 작업을 빠르게 완료하는 데 필요로 하는 리소스를 가질 수 있도록 합니다.

보조 작업자만 확장하는 자동 확장 정책을 구성하는 것이 권장사항입니다.

Dataproc 향상된 유연성 모드 사용

선점형 VM 또는 자동 확장 정책이 있는 클러스터는 감소기에 Shuffle 데이터 제공을 완료하기 전에 작업자가 선점되거나 삭제될 때 FetchFailed 예외가 발생할 수 있습니다. 이 예외로 인해 태스크 재시도 및 작업 완료 시간이 길어질 수 있습니다.

권장사항: 보조 작업자에 중간 Shuffle 데이터를 저장하지 않는 Dataproc 향상된 유연성 모드를 사용하면 보조 작업자를 안전하게 선점하거나 축소할 수 있습니다.

파티션 나누기 및 셔플링 구성

Spark는 클러스터의 임시 파티션에 데이터를 저장합니다. 애플리케이션에서 DataFrame을 그룹화하거나 조인하면 그룹화 및 하위 수준 구성에 따라 데이터를 새 파티션으로 셔플합니다.

데이터 파티션 나누기는 애플리케이션 성능에 상당한 영향을 줍니다. 파티션이 너무 적으면 작업 동시 로드 및 클러스터 리소스 사용률이 제한됩니다. 파티션이 너무 많으면 추가 파티션 처리 및 셔플링으로 인해 작업 속도가 느려집니다.

파티션 구성

다음 속성은 파티션의 수와 크기를 제어합니다.

  • spark.sql.files.maxPartitionBytes: Cloud Storage에서 데이터를 읽을 때 파티션의 최대 크기입니다. 기본값은 128MB로, 이는 100TB 미만의 프로세스를 처리하는 대부분의 애플리케이션에 충분히 큽니다.

  • spark.sql.shuffle.partitions: 셔플을 수행한 후의 파티션 수입니다. 기본값은 200으로, 총 vCPU 수가 100개 미만인 클러스터에 적합합니다. 권장사항: 클러스터의 vCPU 수의 3배로 설정하세요.

  • spark.default.parallelism: join, reduceByKey, parallelize와 같이 셔플이 필요한 RDD 변환을 수행한 후 반환된 파티션 수입니다. 기본값은 클러스터의 총 vCPU 수입니다. Spark 작업에서 RDD를 사용할 때 이 값을 vCPU의 3배까지 설정할 수 있습니다.

파일 수 제한

Spark가 다수의 작은 파일을 읽을 때 성능 손실이 발생합니다. 데이터를 더 큰 파일 크기(예: 256MB~512MB 범위)로 저장합니다. 마찬가지로 출력 파일 수를 제한합니다. 셔플을 강제 적용하려면 불필요한 셔플 방지를 참조하세요.

적응형 쿼리 실행 구성(Spark 3)

적응형 쿼리 실행(Dataproc 이미지 버전 2.0에서 기본적으로 사용 설정됨)은 다음과 같은 Spark 작업 성능 개선을 제공합니다.

기본 구성 설정은 대부분의 사용 사례에 적합하지만 spark.sql.adaptive.advisoryPartitionSizeInBytesspark.sqlfiles.maxPartitionBytes(기본값: 128MB)로 설정하면 유용할 수 있습니다.

불필요한 셔플 방지

Spark를 사용하면 셔플을 수동으로 트리거하여 repartition 함수로 데이터를 재분산할 수 있습니다. 셔플은 비용이 많이 발생하므로 셔플을 다시 수행할 때는 주의해야 합니다. Spark가 자동으로 데이터의 파티션을 나누도록 하려면 파티션 구성을 적절하게 설정해야 합니다.

예외: Cloud Storage에 열로 파티션을 나눈 데이터를 작성하는 경우 특정 열에서 다시 파티션을 나누면 다수의 작은 파일 쓰기를 방지하여 쓰기 시간을 단축합니다.

df.repartition("col_name").write().partitionBy("col_name").save("gs://...")

데이터를 Parquet 또는 Avro에 저장

Spark SQL은 기본적으로 Snappy 압축 Parquet 파일에서 데이터를 읽고 씁니다. Parquet는 Spark가 애플리케이션을 실행하는 데 필요한 데이터만 읽을 수 있도록 하는 효율적인 열 형식의 파일 형식입니다. 이는 대규모 데이터 세트를 다룰 때 매우 유용합니다. Apache ORC와 같은 다른 열 형식에서도 잘 작동합니다.

열 형식이 아닌 데이터의 경우 Apache Avro는 효율적인 바이너리 행 파일 형식을 제공합니다. 일반적으로 Parquet보다 느리지만 Avro의 성능은 CSV 또는 JSON과 같은 텍스트 기반 형식보다 더 좋습니다.

디스크 크기 최적화

영구 디스크 처리량은 디스크 크기로 확장되어, 작업이 메타데이터와 셔플 데이터를 디스크에 쓰기 때문에 Spark 작업 성능에 영향을 줄 수 있습니다. 표준 영구 디스크를 사용하는 경우 디스크 크기는 작업자당 1테라바이트 이상이어야 합니다(영구 디스크 크기별 성능 참조).

Google Cloud Console에서 작업자 디스크 처리량을 모니터링하려면 다음 안내를 따르세요.

  1. 클러스터 페이지에서 클러스터 이름을 클릭합니다.
  2. VM 인스턴스 탭을 클릭합니다.
  3. 작업자의 이름을 클릭합니다.
  4. Monitoring 탭을 클릭한 다음 디스크 처리량까지 아래로 스크롤하여 작업자 처리량을 확인합니다.

디스크 고려 사항

영구 스토리지의 이점을 누리지 않는 임시 Dataproc 클러스터 로컬 SSD를 사용할 수 있습니다. 로컬 SSD는 클러스터에 물리적으로 연결되고 영구 디스크보다 높은 처리량을 제공합니다(성능 표 참조). 로컬 SSD는 375GB의 고정된 크기로 사용 가능하지만 성능 향상을 위해 여러 SSD를 추가할 수 있습니다.

로컬 SSD는 클러스터가 종료된 후에 데이터를 유지하지 않습니다. 영구 스토리지가 필요한 경우 표준 영구 디스크보다 크기에 대해 높은 처리량을 제공하는 SSD 영구 디스크를 사용할 수 있습니다. 파티션 크기가 8KB보다 작은 경우 SSD 영구 디스크도 적합합니다. 그러나 작은 파티션을 피하세요.

클러스터에 GPU 연결

Spark 3은 GPU를 지원합니다. RAPIDS 초기화 작업에 GPU를 사용하여, RAPIDS SQL Accelerator를 사용하여 Spark 작업의 속도를 높입니다. GPU를 사용하는 클러스터를 구성하는 GPU 드라이버 초기화 작업입니다.

일반적인 작업 실패 및 해결방법

메모리 부족

예:

  • '실행자 손실'
  • 'java.lang.OutOfMemoryError: GC 오버헤드 한도 초과'
  • '메모리 한도를 초과하여 YARN에 의해 종료된 컨테이너'

가능한 해결방법

셔플 가져오기 실패

예:

  • 'FetchFailedException'(Spark 오류)
  • '...에 연결할 수 없음' (Spark 오류)
  • '가져올 수 없음'(맵리듀스 오류)

일반적으로 제공할 Shuffle 데이터가 있는 작업자를 조기에 제거하면 발생합니다.

가능한 원인 및 해결방법은 다음과 같습니다.

  • 자동 확장 처리에서 선점형 작업자 VM을 회수했거나 비선점형 작업자 VM을 삭제했습니다. 해결 방법: 향상된 유연성 모드를 사용하면 보조 작업자를 안전하게 선점 또는 확장 가능하게 만들 수 있습니다.
  • OutOfMemory 오류로 인해 실행자 또는 매퍼가 비정상 종료되었습니다. 해결 방법: 실행자 또는 매퍼의 메모리를 늘립니다.
  • Spark 셔플 서비스가 과부하 상태일 수 있습니다. 해결 방법: 작업 파티션 수를 줄입니다.

UNHEALTHY 상태의 YARN 노드

예를 들면 다음과 같습니다(YARN 로그).

...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]

셔플 데이터를 저장할 디스크 공간의 부족이 원인일 수 있습니다. 로그 파일을 보고 진단합니다.

  • Google Cloud 콘솔에서 프로젝트의 클러스터 페이지를 열고 클러스터 이름을 클릭합니다.
  • 로그 보기를 클릭합니다.
  • hadoop-yarn-nodemanager로 로그를 필터링합니다.
  • 'UNHEALTHY'를 검색합니다.

가능한 해결방법

  • 사용자 캐시는 yarn-site.xml file에서 yarn.nodemanager.local-dirs 속성으로 지정된 디렉터리에 저장됩니다. 이 파일은 /etc/hadoop/conf/yarn-site.xml에 있습니다. /hadoop/yarn/nm-local-dir 경로에서 여유 공간을 확인하고 /hadoop/yarn/nm-local-dir/usercache 사용자 캐시 폴더를 삭제하여 공간을 확보할 수 있습니다.
  • 로그에서 'UNHEALTHY' 상태를 보고하면 디스크 공간이 더 큰 클러스터를 다시 만들어 처리량 한도를 늘립니다.

드라이버 메모리 부족으로 인한 작업 실패

클러스터 모드에서 작업을 실행할 때 마스터 노드의 메모리 크기가 워커 노드 메모리 크기보다 훨씬 큰 경우 작업이 실패합니다.

드라이버 로그의 예시:

'Exception in thread "main" java.lang.IllegalArgumentException:
Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster!
Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'

가능한 해결방법

  • spark:spark.driver.memoryyarn:yarn.scheduler.maximum-allocation-mb보다 작게 설정합니다.
  • 마스터 노드와 워커 노드에 동일한 머신 유형을 사용합니다.

추가 정보