Dataproc 향상된 유연성 모드

Dataproc 향상된 유연성 모드(EFM)는 셔플 데이터를 관리하여 실행 중인 클러스터에서 노드를 삭제할 때 발생하는 작업 진행 지연을 최소화합니다. EFM은 기본 작업자에 Spark 셔플 데이터를 작성합니다. 작업자는 감소 단계 중에 해당 원격 노드에서 가져옵니다.

EFM은 보조 작업자에 중간 셔플 데이터를 저장하지 않으므로 선점형 VM을 사용하거나 보조 작업자 그룹을 자동 확장만 하는 클러스터에서 사용하기에 적합합니다.

제한사항:

  • AppMaster 재배치를 지원하지 않는 Apache Hadoop YARN 작업은 향상된 유연성 모드에서 실패할 수 있습니다(AppMaster가 종료될 때까지 기다리는 시간 참조).
  • 향상된 유연성 모드는 다음과 같은 경우에 권장되지 않습니다
    • 클러스터에서 기본 작업자만 있는 경우.
  • 향상된 유연성 모드는 지원되지 않습니다.
    • 기본 작업자 자동 확장이 사용 설정된 경우 대부분의 경우 기본 작업자는 자동으로 마이그레이션되지 않는 셔플 데이터를 계속 저장합니다. 기본 작업자 그룹을 축소하면 EFM 이점이 사라집니다.
    • Spark 작업이 단계적 해제가 사용 설정된 클러스터에서 실행되는 경우 YARN 단계적 해제 메커니즘은 모든 관련 애플리케이션이 완료될 때까지 DECOMMISSIONING 노드를 유지하므로 단계적 해제와 EFM이 교차 목적에 따라 작동할 수 있습니다.

향상된 유연성 모드 사용

향상된 유연성 모드는 실행 엔진별로 구성되며 클러스터 생성 중에 구성해야 합니다. Spark EFM 구현은 dataproc:efm.spark.shuffle=primary-worker 클러스터 속성으로 구성됩니다.

예: Spark에 대해 기본 작업자 셔플이 있는 클러스터를 만듭니다.

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Apache Spark 예시

  1. EFM 클러스터에서 Spark 예시 jar를 사용하여 공개 셰익스피어 텍스트에서 WordCount 작업을 실행합니다.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

기본 작업자 셔플을 위한 로컬 SSD 구성

기본 작업자 및 HDFS 셔플 구현은 중간 셔플 데이터를 VM 연결 디스크에 기록하고 로컬 SSD로 제공되는 추가적인 처리량 및 IOPS 이점을 얻습니다. 리소스 할당을 돕기 위해 기본 작업자 머신을 구성할 때 vCPU 4개당 약 1개의 로컬 SSD 파티션을 목표로 지정합니다.

로컬 SSD를 연결하려면 --num-worker-local-ssds 플래그를 gcloud dataproc clusters create 명령어로 전달하세요.

보조 작업자 비율

보조 작업자는 셔플 데이터를 기본 작업자에 작성하므로 클러스터에 작업의 셔플 로드를 수용하기에 충분한 CPU, 메모리, 디스크 리소스가 있는 충분한 수의 기본 작업자가 포함되어야 합니다. 자동 확장 클러스터의 경우 기본 그룹이 확장되어 원치 않는 동작이 발생하지 않도록 하려면 minInstances를 기본 작업자 그룹의 자동 확장 정책에서 maxInstances 값으로 설정합니다.

보조-기본 작업자 비율이 10:1 정도와 같이 높으면 CPU 사용, 네트워크, 기본 작업자의 디스크 사용량을 모니터링하여 오버로드되었는지 확인합니다. 방법은 다음과 같습니다.

  1. Google Cloud Console의 VM 인스턴스 페이지로 이동합니다.

  2. 기본 작업자의 왼쪽에 있는 체크박스를 클릭합니다.

  3. 모니터링 탭을 클릭하여 기본 작업자의 CPU 사용, 디스크 IOPS, 네트워크 바이트, 기타 측정항목을 확인합니다.

기본 작업자가 오버로드된 경우 기본 작업자를 수동으로 수직 확장할 수 있습니다.

기본 작업자 그룹 크기 조절

기본 작업자 그룹을 안전하게 확장할 수 있지만 기본 작업자 그룹을 축소하면 작업 진행에 부정적인 영향을 미칠 수 있습니다. 기본 작업자 그룹을 축소하는 작업은 --graceful-decommission-timeout 플래그를 설정하여 사용 설정된 단계적 해제를 사용해야 합니다.

자동 확장된 클러스터: 자동 확장 정책이 있는 EFM 클러스터에서는 기본 작업자 그룹 확장이 사용 중지됩니다. 자동 확장 클러스터에서 기본 작업자 그룹의 크기를 조절하려면 다음 안내를 따르세요.

  1. 자동 확장을 사용 중지합니다.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. 기본 그룹을 확장합니다.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. 자동 확장을 다시 사용 설정합니다.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

기본 작업자 디스크 사용 모니터링

기본 작업자에는 클러스터 셔플 데이터에 충분한 디스크 공간이 있어야 합니다. remaining HDFS capacity 측정항목을 통해 이 기능을 간접적으로 모니터링할 수 있습니다. 로컬 디스크가 가득 차면 HDFS에 공간이 제공되지 않고 남은 용량이 줄어듭니다.

기본적으로 기본 작업자의 로컬 디스크가 용량의 90% 를 초과할 때 노드는 YARN 노드 UI에 UNHEALTHY로 표시됩니다. 디스크 용량 문제가 발생하면 HDFS에서 사용하지 않는 데이터를 삭제하거나 기본 작업자 풀을 확장할 수 있습니다.

중간 셔플 데이터는 일반적으로 작업이 끝날 때까지 삭제되지 않습니다. Spark에서 기본 작업자 셔플을 사용하는 경우 중간 셔플 데이터를 삭제하는 데 작업 완료 후 최대 30분이 걸릴 수 있습니다.

고급 구성

파티션 나누기 및 동시 로드

맵리듀스 또는 Spark 작업을 제출할 때 적정 수준의 파티션 나누기를 구성합니다. 셔플 단계의 입력 및 출력 파티션 수를 결정하는 것은 서로 다른 성능 특성 간에 절충됩니다. 작업 형태에 맞는 값으로 실험하는 것이 가장 좋습니다.

입력 파티션

맵리듀스 및 Spark 입력 파티션 나누기는 입력 데이터 세트로 결정됩니다. Cloud Storage에서 파일을 읽을 때 각 작업은 약 1개의 '블록 크기'에 해당하는 데이터를 처리합니다.

  • Spark SQL 작업의 경우 최대 파티션 크기는 spark.sql.files.maxPartitionBytes로 제어됩니다. 이를 1GB로 늘려보세요. spark.sql.files.maxPartitionBytes=1073741824

  • 맵리듀스 작업 및 Spark RDD의 경우 파티션 크기는 일반적으로 fs.gs.block.size(기본값: 128MB)로 제어됩니다. 이를 1GB로 늘려보세요. mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsize와 같은 InputFormat 특정 속성을 설정할 수도 있습니다.

    • 맵리듀스 작업: --properties fs.gs.block.size=1073741824
    • Spark RDD: --properties spark.hadoop.fs.gs.block.size=1073741824

출력 파티션

후속 단계의 작업 수는 여러 속성에 의해 제어됩니다. 1TB 이상을 처리하는 대규모 작업의 경우 파티션당 최소 1GB를 사용하는 것이 좋습니다.

  • 맵리듀스 작업의 경우 출력 파티션 수는 mapreduce.job.reduces로 제어됩니다.

  • Spark SQL의 경우 출력 파티션 수는 spark.sql.shuffle.partitions로 제어됩니다.

  • RDD API를 사용하는 Spark 작업의 경우 출력 파티션 수를 지정하거나 spark.default.parallelism을 설정할 수 있습니다.

기본 작업자 셔플을 위한 셔플 조정

가장 중요한 속성은 --properties yarn:spark.shuffle.io.serverThreads=<num-threads>입니다. Spark 셔플 서버가 노드 관리자의 일부로 실행되므로 이 속성은 클러스터 수준 YARN 속성입니다. 기본값은 머신에 있는 코어 수의 두 배(2x)입니다(예: n1-highmem-8의 16개 스레드). '셔플 읽기 차단 시간'이 1초보다 크고 기본 작업자가 네트워크, CPU 또는 디스크 제한에 도달하지 않은 경우에는 셔플 서버 스레드 수를 늘려보세요.

더 큰 머신 유형에서는 spark.shuffle.io.numConnectionsPerPeer(기본값: 1)를 늘리는 것이 좋습니다. (예를 들어 호스트 쌍당 연결 5개로 설정).

재시도 상향 조정

앱 마스터, 작업 및 스테이지에 허용되는 최대 시도 수는 다음 속성을 설정하여 구성할 수 있습니다.

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

앱 마스터 및 작업은 단계적 해제 없이 많은 선점형 VM 또는 자동 확장을 사용하는 클러스터에서 더 자주 종료되므로 클러스터에서 앞서의 속성 값을 늘리면 도움이 될 수 있습니다(참고: Spark를 사용하는 EFM과 단계적 해제는 지원되지 않습니다).

EFM 클러스터에서 YARN 단계적 해제

YARN 단계적 해제를 사용하면 실행중인 애플리케이션에 미치는 영향을 최소화하면서 노드를 빠르게 삭제할 수 있습니다. 자동 확장 클러스터의 경우 EFM 클러스터에 연결된 AutoscalingPolicy에서 단계적 해제 제한 시간을 설정할 수 있습니다.

EFM의 단계적 해제 개선

  1. 중간 데이터는 분산 파일 시스템에 저장되므로 해당 노드에서 실행되는 모든 컨테이너가 완료되는 즉시 EFM 클러스터에서 노드를 제거할 수 있습니다. 그에 비해 애플리케이션이 완료될 때까지 표준 Dataproc 클러스터에서 노드가 삭제되지 않습니다.

  2. 노드 삭제는 노드에서 실행 중인 앱 마스터가 완료될 때까지 기다리지 않습니다. 앱 마스터 컨테이너가 종료되면 해제되지 않는 다른 노드에서 노드가 다시 예약됩니다. 작업 진행률은 손실되지 않습니다. 새 앱 마스터는 작업 내역을 읽어서 이전 앱 마스터의 상태를 빠르게 복구합니다.