Dataproc 향상된 유연성 모드

Dataproc 향상된 유연성 모드(EFM)는 셔플 데이터를 관리하여 실행 중인 클러스터에서 노드를 삭제할 때 발생하는 작업 진행 지연을 최소화합니다. EFM은 사용자가 선택할 수 있는 두 가지 모드 중 하나로 셔플 데이터를 오프로드합니다.

  1. 기본 작업자 셔플. 매퍼가 기본 작업자에게 데이터를 씁니다. 작업자는 감소 단계 중에 해당 원격 노드에서 가져옵니다. 이 모드는 Spark 작업에만 사용할 수 있으며, 사용이 권장됩니다.

  2. HCFS(Hadoop 호환 파일 시스템) 셔플. 매퍼가 HCFS 구현(기본적으로 HDFS)에 데이터를 씁니다. 기본 작업자 모드와 마찬가지로 기본 작업자만 HDFS 및 HCFS 구현에 참여합니다(HCFS 셔플이 Cloud Storage 커넥터를 사용하는 경우 데이터가 클러스터 외부에 저장됨). 이 모드는 소량의 데이터 작업에 도움이 될 수 있지만 확장 제한으로 인해 대규모 작업에는 사용하지 않는 것이 좋습니다.

두 EFM 모드 모두 보조 작업자에 중간 셔플 데이터를 저장하지 않으므로 EFM은 선점형 VM을 사용하거나 보조 작업자 그룹을 자동 확장만 하는 클러스터에 적합합니다.

제한사항:

  • AppMaster 재배치를 지원하지 않는 Apache Hadoop YARN 작업은 향상된 유연성 모드에서 실패할 수 있습니다(AppMaster가 종료될 때까지 기다리는 시간 참조).
  • 향상된 유연성 모드는 다음과 같은 경우에 권장되지 않습니다
    • 클러스터에서 기본 작업자만 있는 경우
    • 작업 완료 후 최대 30분이 걸릴 수 있으므로 스트리밍 작업 시 중간 셔플 데이터를 정리합니다.
  • 향상된 유연성 모드는 지원되지 않습니다.
    • 기본 작업자 자동 확장이 사용 설정된 경우 대부분의 경우 기본 작업자는 자동으로 마이그레이션되지 않는 셔플 데이터를 계속 저장합니다. 기본 작업자 그룹을 축소하면 EFM 이점이 사라집니다.
    • Spark 작업이 단계적 해제가 사용 설정된 클러스터에서 실행되는 경우 YARN 단계적 해제 메커니즘은 모든 관련 애플리케이션이 완료될 때까지 DECOMMISSIONING 노드를 유지하므로 단계적 해제와 EFM이 교차 목적에 따라 작동할 수 있습니다.

향상된 유연성 모드 사용

향상된 유연성 모드는 실행 엔진별로 구성되며 클러스터 생성 중에 구성해야 합니다.

  • Spark EFM 구현은 dataproc:efm.spark.shuffle 클러스터 속성으로 구성됩니다. 유효한 속성 값:

    • primary-worker: 기본 작업자 셔플용(권장)
    • hcfs: HCFS 기반 셔플용 이 모드는 지원 중단되었으며 이미지 버전 1.5를 실행하는 클러스터에서만 사용할 수 있습니다. 새 워크플로에는 권장되지 않습니다.
  • Hadoop 맵리듀스 구현은 dataproc:efm.mapreduce.shuffle 클러스터 속성으로 구성됩니다. 유효한 속성 값:

    • hcfs

예: Spark용 기본 작업자 셔플 및 맵리듀스용 HCFS 셔플을 사용하여 클러스터를 만듭니다.

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=primary-worker \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --worker-machine-type=n1-highmem-8 \
    --num-workers=25 \
    --num-worker-local-ssds=2 \
    --secondary-worker-type=preemptible \
    --secondary-worker-boot-disk-size=500GB \
    --num-secondary-workers=25

Apache Spark 예시

  1. EFM 클러스터에서 Spark 예시 jar를 사용하여 공개 셰익스피어 텍스트에서 WordCount 작업을 실행합니다.
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

Apache Hadoop MapReduce 예시

  1. EFM 클러스터에서 맵리듀스 예시 jar를 사용하여 나중에 Cloud Storage에서 terasort 작업에 대한 입력 데이터를 생성하려면 작은 teragen 작업을 실행합니다.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    

  2. 데이터에서 테라 정렬 작업을 실행합니다.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

기본 작업자 셔플을 위한 로컬 SSD 구성

기본 작업자 및 HDFS 셔플 구현은 중간 셔플 데이터를 VM 연결 디스크에 기록하고 로컬 SSD로 제공되는 추가적인 처리량 및 IOPS 이점을 얻습니다. 리소스 할당을 돕기 위해 기본 작업자 머신을 구성할 때 vCPU 4개당 약 1개의 로컬 SSD 파티션을 목표로 지정합니다.

로컬 SSD를 연결하려면 --num-worker-local-ssds 플래그를 gcloud Dataproc clusters create 명령어로 전달하세요.

일반적으로 보조 작업자에 로컬 SSD가 필요하지 않습니다. --num-secondary-worker-local-ssds 플래그를 사용하여 클러스터의 보조 작업자에 로컬 SSD를 추가하는 것은 보조 작업자가 로컬에서 셔플 데이터를 쓰지 않기 때문에 중요도가 낮은 경우가 많습니다. 하지만 로컬 SSD는 로컬 디스크 성능을 개선하기 때문에 로컬 디스크 사용으로 인해 작업이 I/O 바운드로 예상되는 경우(작업이 스크래치 공간으로 상당한 로컬 디스크를 사용하거나 파티션이 메모리에 비해 너무 커서 디스크에 빠질 수 있는 경우) 로컬 SSD를 보조 작업자에 추가할 수 있습니다.

보조 작업자 비율

보조 작업자는 셔플 데이터를 기본 작업자에 작성하므로 클러스터에 작업의 셔플 로드를 수용하기에 충분한 CPU, 메모리, 디스크 리소스가 있는 충분한 수의 기본 작업자가 포함되어야 합니다. 자동 확장 클러스터의 경우 기본 그룹이 확장되어 원치 않는 동작이 발생하지 않도록 하려면 minInstances를 기본 작업자 그룹의 자동 확장 정책에서 maxInstances 값으로 설정합니다.

보조-기본 작업자 비율이 10:1 정도와 같이 높으면 CPU 사용, 네트워크, 기본 작업자의 디스크 사용량을 모니터링하여 오버로드되었는지 확인합니다. 그러려면 다음 안내를 따르세요.

  1. Google Cloud Console의 VM 인스턴스 페이지로 이동합니다.

  2. 기본 작업자의 왼쪽에 있는 체크박스를 클릭합니다.

  3. 모니터링 탭을 클릭하여 기본 작업자의 CPU 사용, 디스크 IOPS, 네트워크 바이트, 기타 측정항목을 확인합니다.

기본 작업자가 오버로드된 경우 기본 작업자를 수동으로 수직 확장할 수 있습니다.

기본 작업자 그룹 크기 조절

기본 작업자 그룹을 안전하게 확장할 수 있지만 기본 작업자 그룹을 축소하면 작업 진행에 부정적인 영향을 미칠 수 있습니다. 기본 작업자 그룹을 축소하는 작업은 --graceful-decommission-timeout 플래그를 설정하여 사용 설정된 단계적 해제를 사용해야 합니다.

자동 확장된 클러스터: 자동 확장 정책이 있는 EFM 클러스터에서는 기본 작업자 그룹 확장이 사용 중지됩니다. 자동 확장 클러스터에서 기본 작업자 그룹의 크기를 조절하려면 다음 안내를 따르세요.

  1. 자동 확장을 사용 중지합니다.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --disable-autoscaling
    

  2. 기본 그룹을 확장합니다.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --num-workers=num-primary-workers \
        --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
    

  3. 자동 확장을 다시 사용 설정합니다.

    gcloud dataproc clusters update \
        --cluster=cluster-name \
        --region=region \
        --autoscaling-policy=autoscaling-policy
    

기본 작업자 디스크 사용 모니터링

기본 작업자에는 클러스터 셔플 데이터에 충분한 디스크 공간이 있어야 합니다. remaining HDFS capacity 측정항목을 통해 이 기능을 간접적으로 모니터링할 수 있습니다. 로컬 디스크가 가득 차면 HDFS에 공간이 제공되지 않고 남은 용량이 줄어듭니다.

기본적으로 기본 작업자의 로컬 디스크가 용량의 90% 를 초과할 때 노드는 YARN 노드 UI에 UNHEALTHY로 표시됩니다. 디스크 용량 문제가 발생하면 HDFS에서 사용하지 않는 데이터를 삭제하거나 기본 작업자 풀을 확장할 수 있습니다.

고급 구성

파티션 나누기 및 동시 로드

맵리듀스 또는 Spark 작업을 제출할 때 적정 수준의 파티션 나누기를 구성합니다. 셔플 단계의 입력 및 출력 파티션 수를 결정하는 것은 서로 다른 성능 특성 간에 절충됩니다. 작업 형태에 맞는 값으로 실험하는 것이 가장 좋습니다.

입력 파티션

맵리듀스 및 Spark 입력 파티션 나누기는 입력 데이터 세트로 결정됩니다. Cloud Storage에서 파일을 읽을 때 각 작업은 약 1개의 '블록 크기'에 해당하는 데이터를 처리합니다.

  • Spark SQL 작업의 경우 최대 파티션 크기는 spark.sql.files.maxPartitionBytes로 제어됩니다. 이를 1GB로 늘려보세요. spark.sql.files.maxPartitionBytes=1073741824

  • 맵리듀스 작업 및 Spark RDD의 경우 파티션 크기는 일반적으로 fs.gs.block.size(기본값: 128MB)로 제어됩니다. 이를 1GB로 늘려보세요. mapreduce.input.fileinputformat.split.minsizemapreduce.input.fileinputformat.split.maxsize와 같은 InputFormat 특정 속성을 설정할 수도 있습니다.

    • 맵리듀스 작업: --properties fs.gs.block.size=1073741824
    • Spark RDD: --properties spark.hadoop.fs.gs.block.size=1073741824

출력 파티션

후속 단계의 작업 수는 여러 속성에 의해 제어됩니다. 1TB 이상을 처리하는 대규모 작업의 경우 파티션당 최소 1GB를 사용하는 것이 좋습니다.

  • 맵리듀스 작업의 경우 출력 파티션 수는 mapreduce.job.reduces로 제어됩니다.

  • Spark SQL의 경우 출력 파티션 수는 spark.sql.shuffle.partitions로 제어됩니다.

  • RDD API를 사용하는 Spark 작업의 경우 출력 파티션 수를 지정하거나 spark.default.parallelism을 설정할 수 있습니다.

기본 작업자 셔플을 위한 셔플 조정

가장 중요한 속성은 --properties yarn:spark.shuffle.io.serverThreads=<num-threads>입니다. Spark 셔플 서버가 노드 관리자의 일부로 실행되므로 이 속성은 클러스터 수준 YARN 속성입니다. 기본값은 머신에 있는 코어 수의 두 배(2x)입니다(예: n1-highmem-8의 16개 스레드). '셔플 읽기 차단 시간'이 1초보다 크고 기본 작업자가 네트워크, CPU 또는 디스크 제한에 도달하지 않은 경우에는 셔플 서버 스레드 수를 늘려보세요.

더 큰 머신 유형에서는 spark.shuffle.io.numConnectionsPerPeer(기본값: 1)를 늘리는 것이 좋습니다. (예를 들어 호스트 쌍당 연결 5개로 설정).

재시도 상향 조정

앱 마스터, 작업 및 스테이지에 허용되는 최대 시도 수는 다음 속성을 설정하여 구성할 수 있습니다.

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

앱 마스터 및 작업은 단계적 해제 없이 많은 선점형 VM 또는 자동 확장을 사용하는 클러스터에서 더 자주 종료되므로 클러스터에서 위 속성 값을 늘리면 도움이 될 수 있습니다(참고: Spark를 사용하는 EFM과 단계적 해제는 지원되지 않습니다).

HCFS 셔플을 위한 HDFS 구성

큰 셔플의 성능을 향상시키기 위해서는 dfs.namenode.fslock.fair=false를 설정하여 NameNode의 잠금 경합을 줄일 수 있습니다. 이로 인해 개별 요청에 리소스가 부족할 위험이 있지만 클러스터 전체 처리량은 향상될 수 있습니다. NameNode 성능을 더 향상시키기 위해서는 --num-master-local-ssds를 설정하여 마스터 노드에 로컬 SSD를 연결할 수 있습니다. 또한 DataNode 성능 향상을 위해 --num-worker-local-ssds를 설정하여 기본 작업자에 로컬 SSD를 추가할 수도 있습니다.

HCFS 셔플을 위한 기타 Hadoop 호환 파일 시스템

기본적으로 EFM HCFS 셔플 데이터가 HDFS에 기록되지만 모든 Hadoop 호환 파일 시스템(HCFS)을 사용할 수 있습니다. 예를 들어 Cloud Storage 또는 다른 클러스터의 HDFS에 셔플을 쓰기로 결정할 수 있습니다. 파일 시스템을 지정하려면 클러스터에 작업을 제출할 때 fs.defaultFS가 대상 파일 시스템을 가리키도록 할 수 있습니다.

EFM 클러스터에서 YARN 단계적 해제

YARN 단계적 해제를 사용하면 실행중인 애플리케이션에 미치는 영향을 최소화하면서 노드를 빠르게 삭제할 수 있습니다. 자동 확장 클러스터의 경우 EFM 클러스터에 연결된 AutoscalingPolicy에서 단계적 해제 제한 시간을 설정할 수 있습니다.

맵리듀스 EFM의 단계적 해제 개선

  1. 중간 데이터는 분산 파일 시스템에 저장되므로 해당 노드에서 실행되는 모든 컨테이너가 완료되는 즉시 EFM 클러스터에서 노드를 제거할 수 있습니다. 그에 비해 애플리케이션이 완료될 때까지 표준 Dataproc 클러스터에서 노드가 삭제되지 않습니다.

  2. 노드 삭제는 노드에서 실행 중인 앱 마스터가 완료될 때까지 기다리지 않습니다. 앱 마스터 컨테이너가 종료되면 해제되지 않는 다른 노드에서 노드가 다시 예약됩니다. 작업 진행률은 손실되지 않습니다. 새 앱 마스터는 작업 내역을 읽어서 이전 앱 마스터의 상태를 빠르게 복구합니다.

맵리듀스로 EFM 클러스터에서 단계적 해제 사용

  1. 동일한 수의 기본 및 보조 작업자로 EFM 클러스터를 만듭니다.

    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    

  2. 클러스터에서 맵리듀스 예시 jar를 사용하여 pi 값을 계산하는 맵리듀스 작업을 실행합니다.

    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    

  3. 작업이 실행되는 동안 단계적 해제를 사용하여 클러스터를 축소합니다.

    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    
    작업이 완료되기 전에 클러스터에서 노드가 신속하게 삭제되고 작업 진행 손실이 최소화됩니다. 다음과 같은 이유로 작업 진행이 일시적으로 중지될 수 있습니다.

    • 앱 마스터 장애 조치. 작업 진행률이 0%로 떨어진 후 감소 이전 값으로 즉시 증가하는 경우는 앱 마스터가 종료되고 새 앱 마스터가 상태를 복구했기 때문일 수 있습니다. 이는 장애 조치가 빠르게 발생하므로 작업 진행률에 크게 영향을 주지 않습니다.
    • VM 선점. HDFS는 부분적이 아닌 완전한 맵 작업 출력만을 보존하므로, 맵 작업을 수행하는 동안 VM이 선점되면 작업 진행이 일시적으로 중지될 수 있습니다.

노드 제거 속도를 높이려면 이전의 gcloud 명령어에서 --graceful-decommission-timeout 플래그를 제외하여 단계적 해제 없이 클러스터를 축소할 수 있습니다. 완료된 맵 작업의 작업 진행률은 유지되지만 부분적으로 완료된 맵 작업 출력은 손실됩니다(맵 작업은 다시 실행됨).