Dataproc 增强的灵活性模式

删除 Dataproc 节点后,Dataproc 增强的灵活性模式 (EFM) 会在 HDFS 中保留有状态节点数据,例如 mapreduce shuffle 数据。

由于 HDFS 仅在主工作器上运行,因此此功能特别适合使用抢占式虚拟机的集群或仅自动扩缩抢占式工作器组。

限制:

  • 增强的灵活性模式目前仅支持 1.4 映像集群
  • 不支持 AppMaster 重新定位的 Apache Hadoop RNRN 作业在增强的灵活性模式中可能遭遇更多失败(请参阅何时等待 AppMasters 完成)。
  • 不建议使用增强的灵活性模式:
    • 仅有主要工作器的集群
    • 主要工作器自动扩缩功能已启用,除非 EFM 配置为指向其他集群的 HDFS
      注意:如果作业在很大程度上依赖于 HDFS,则建议自动扩缩 HDFS。

使用增强的灵活性模式

创建一个启用了增强的灵活性模式的集群:

gcloud dataproc clusters create cluster-name \
    --region=region \
    --properties=dataproc:efm.spark.shuffle=hcfs \
    --properties=dataproc:efm.mapreduce.shuffle=hcfs \
    --image-version=1.4

Apache Hadoop MapReduce 示例

  1. 从增强的灵性性模式集群上的 mapreduce 示例 jar 中,运行一个小型 Teragen 作业,以在 Cloud Storage 中生成输入数据供后续的 terasort 作业使用。
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- teragen 1000 Cloud Storage output URI (for example, gs://terasort/input)
    
  2. 对数据运行 terasort 作业。
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- terasort gs://terasort/input gs://terasort/output
    

Apache Spark 示例

  1. 在增强的灵活性模式集群上使用 Spark 示例 jar 对公共莎士比亚文字运行 WordCount 作业。
    gcloud dataproc jobs submit spark \
        --cluster=cluster-name \
        --region=region \
        --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
        --class=org.apache.spark.examples.JavaWordCount \
        -- gs://apache-beam-samples/shakespeare/macbeth.txt
    

增强的灵活性模式集群上的 YARN 安全停用

YARN 安全停用可用于快速移除节点,而对运行的应用产生最小的影响。对于自动扩缩集群,可以在附加到增强的灵性性模式集群的 AutoscalingPolicy 中设置安全停用超时

对安全停用功能的增强的灵活性模式增强功能

  1. 由于中间数据存储在分布式文件系统中,因此在这些节点上运行的所有容器完成运行后,可以立即从增强的灵性性模式集群中移除节点。相比之下,在应用完成之前,系统不会移除标准 Dataproc 集群上的节点。

  2. 节点移除不会等待在节点上运行的应用主实例结束运行。 应用主容器被终止后,将在另一个未停用的节点上重新安排。作业进度不会丢失:新应用主实例通过读取作业历史记录快速恢复之前的应用主实例的状态。

使用增强的灵活性模式集群上的安全停用功能

例如:

  1. 创建一个增强的灵活性模式集群,其中包含同等数量的主要和辅助工作器。
    gcloud dataproc clusters create cluster-name \
        --properties=dataproc:efm.spark.shuffle=hcfs \
        --properties=dataproc:efm.mapreduce.shuffle=hcfs \
        --image-version=1.4 \
        --region=region \
        --num-workers=5 \
        --num-secondary-workers=5
    
  2. 在集群上的 mapreduce 示例 jar 中,运行用于计算 pi 值的 mapreduce 作业。
    gcloud dataproc jobs submit hadoop \
        --cluster=cluster-name \
        --region=region \
        --jar=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
        -- pi 1000 10000000
    
  3. 当作业运行时,使用安全停用功能来缩减集群。
    gcloud dataproc clusters update cluster-name \
        --region=region \
        --num-secondary-workers=0 \
        --graceful-decommission-timeout=1h
    

在作业完成前,系统会快速从集群中移除节点,同时最大限度地减少作业进度损失。作业进度的临时下降可能原因如下:

  • 应用主实例故障转移。如果作业进度下降到 0%,然后立即跳至下降前的值,则可能是因为应用主实例被终止,新应用主实例恢复了其状态。这不会大幅影响作业进度,因为故障转移很快就会发生。
  • 虚拟机抢占由于 HDFS 仅保留完整而非部分的映射任务输出,因此在处理映射任务时虚拟机会被抢占。

高级配置

配置 HDFS

要提升大型重排的性能,您可以在创建集群时增加 dfs.namenode.handler.countdfs.datanode.handler.count 的值,从而增加名称节点和数据节点的服务线程数。默认情况下,每个名称节点和数据节点都有 10 个服务线程。 要进一步改善名称节点的性能,您可以通过将 --num-master-local-ssds 标志传递到 clusters create 命令以将本地 SSD 挂接到主节点。您还可以使用 --num-worker-local-ssds 标志将本地 SSD 挂接到数据节点以提高 HDFS 性能。

分区和并行性

提交 MapReduce 或 Spark 作业时,请务必配置适当的分区级别。给定 shuffle 阶段的输入和输出分区数量会权衡不同的性能特征。最好使用适合您的工作形状的值进行实验。

对于 MapReduce 作业,输出分区数由 mapreduce.job.reduces 控制。对于 Spark SQL,输出分区数由 spark.sql.shuffle.partitions 控制。对于使用 RDD API 的 Spark 作业,可以指定分区数。MapReduce 和 Spark 输入分区由输入数据集隐式设置。

增加重试次数

应用主实例、任务和阶段允许的最大尝试次数可通过设置以下属性进行配置:

yarn:yarn.resourcemanager.am.max-attempts
mapred:mapreduce.map.maxattempts
mapred:mapreduce.reduce.maxattempts
spark:spark.task.maxFailures
spark:spark.stage.maxConsecutiveAttempts

由于应用主实例和任务在使用许多抢占式虚拟机的集群中更频繁地结束,或者在没有安全停用的情况下自动扩缩,因此建议在这些集群中增加上述值。

抢占式工作器比率

由于抢占式工作器将自己的 shuffle 数据写入 HDFS,请务必确保您的集群包含足够多的主要工作器来处理作业的 shuffle 数据。对于自动扩缩集群,您可以通过在 AutoscalingPolicy 中的 workerConfigsecondaryWorkerConfig 中分配权重来控制集群中的抢占式工作器的目标比例。

其他与 Hadoop 兼容的文件系统

默认情况下,系统会向 HDFS 写入 shuffle 数据,但您可以使用任何与 Hadoop 兼容的文件系统 (HCFS)。例如,您可以决定将 Shuffle 数据写入 Cloud Storage 或其他集群的 HDFS。要指定文件系统,您可以在向集群提交作业时将 fs.defaultFS 指向目标文件系统。