Dataproc 的 Persistent History Server

概览

Dataproc 持久性历史记录服务器 (PHS) 提供网页界面,可用于查看在有效或已删除的 Dataproc 集群上运行的作业的历史记录。它适用于 Dataproc 映像版本 1.5 及更高版本,并在单节点 Dataproc 集群上运行。它为以下文件和数据提供网页界面

  • MapReduce 和 Spark 作业历史记录文件

  • YARN 时间轴服务 v2 创建并存储在 Bigtable 实例中的应用时间轴数据文件。

  • YARN 聚合日志

在 Dataproc 作业集群生命周期内,持久性历史记录服务器访问并显示 Spark 和 MapReduce 作业历史记录文件以及写入 Cloud Storage 的 YARN 日志文件。

创建 PHS 集群

您可以在本地终端或 Cloud Shell 中使用以下标志和集群属性运行以下 gcloud dataproc clusters create 命令,以创建 Dataproc 永久性历史记录服务器单节点集群。

gcloud dataproc clusters create cluster-name \
    --region=region \
    --single-node \
    --enable-component-gateway \
    --properties=properties

备注:

  • --enable-component-gateway:在 PHS 集群上启用组件网关网页界面时需要。
  • 添加以下一个或多个集群属性,以使用 Persistent History Server 保留和查看作业记录和其他日志。

    示例:

    --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/*/spark-job-history 
    --properties=mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/*/mapreduce-job-history/done

    • yarn:yarn.nodemanager.remote-app-log-dir=gs://bucket-name/*/yarn-logs:添加此属性以指定 PHS 将访问作业集群写入的 YARN 日志的 Cloud Storage 位置(请参阅创建作业集群)。显示的值使用“*”通配符,以允许 PHS 匹配由不同作业集群写入的指定存储分区中的多个目录(但请参阅效率注意事项:使用中路径通配符)。
    • spark:spark.history.fs.logDirectory=gs://bucket-name/*/spark-job-history:添加此属性可启用永久性 Spark 作业历史记录。此属性指定 PHS 将访问作业集群写入的 Spark 作业历史记录日志的位置(请参阅创建作业集群)。显示的值使用“*”通配符,以允许 PHS 匹配由不同作业集群写入的指定存储分区中的多个目录(但请参阅效率注意事项:使用中路径通配符)。

      注意:在 Dataproc 2.0+ 集群中,还必须设置以下属性以启用 PHS Spark 历史记录日志(请参阅 Spark 历史记录服务器配置选项)。spark.history.custom.executor.log.url 值是一个字面量值,其中包含将由持久性历史记录服务器设置的变量的 {{PLACEHOLDERS}}。这些变量并非由用户设置;请按如下所示传入属性值。

      --properties=spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false
      --properties=spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}
      
    • mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/*/mapreduce-job-history/done:添加此属性可启用永久性 MapReduce 作业历史记录。此属性指定 PHS 将访问作业集群写入的 MapReduce 作业历史记录日志的 Cloud Storage 位置(请参阅创建作业集群)。显示的值使用“*”通配符,以允许 PHS 匹配由不同作业集群写入的指定存储分区中的多个目录(但请参阅效率注意事项:使用中路径通配符)。

    • dataproc:yarn.atsv2.bigtable.instance=projects/project-id/instance_id/bigtable-instance-id配置 Yarn 时间轴服务 v2 后,添加此属性即可使用 PHS 集群查看 YARN 应用时间轴服务 V2Tez 网页界面上的时间轴数据(请参阅组件网关网页界面)。

创建 Dataproc 作业集群

您可以在本地终端或 Cloud Shell 中运行以下命令,创建将作业历史记录文件写入持久性历史记录服务器 (PHS) 的 Dataproc 作业集群。

gcloud dataproc clusters create cluster-name \
    --region=region \
    --enable-component-gateway \
    --properties=properties
    other args ...

备注:

  • --enable-component-gateway:在作业集群上启用组件网关网页界面时需要使用此标志。
  • 添加以下一个或多个集群属性,以设置与 PHS 相关的非默认 Cloud Storage 位置和其他作业集群属性。

    示例:

    --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/job-cluster-1/spark-job-history 
    --properties=mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/job-cluster-1/mapreduce-job-history/done
    • yarn:yarn.nodemanager.remote-app-log-dir:默认情况下,系统会在 Dataproc 作业集群上启用聚合 YARN 日志,并将其写入集群临时存储分区。添加此属性可指定其他 Cloud Storage 位置,集群将在其中写入汇总日志以供 Persistent History Server 访问。
      yarn:yarn.nodemanager.remote-app-log-dir=gs://bucket-name/directory-name/yarn-logs
      
    • spark:spark.history.fs.logDirectoryspark:spark.eventLog.dir:默认情况下,Spark 作业历史记录文件保存在 /spark-job-history 目录下的 temp bucket 集群中。您可以添加这些属性,为这些文件指定不同的 Cloud Storage 位置。如果同时使用这两个属性,它们必须指向同一存储分区中的目录。
      spark:spark.history.fs.logDirectory=gs://bucket-name/directory-name/spark-job-history,
      spark:spark.eventLog.dir=gs://bucket-name/directory-name/spark-job-history
      
    • mapred:mapreduce.jobhistory.done-dirmapred:mapreduce.jobhistory.intermediate-done-dir:默认情况下,MapReduce 作业记录文件保存在 /mapreduce-job-history/done/mapreduce-job-history/intermediate-done 目录中的集群 temp bucket 中。中间 mapreduce.jobhistory.intermediate-done-dir 位置是临时存储空间;MapMap 作业完成后,中间文件会移至 mapreduce.jobhistory.done-dir 位置。您可以添加这些属性,为这些文件指定不同的 Cloud Storage 位置。如果同时使用这两个属性,它们必须指向同一存储分区中的目录。
      mapred:mapreduce.jobhistory.done-dir=gs://bucket-name/directory-name/mapreduce-job-history/done,
      mapred:mapreduce.jobhistory.intermediate-done-dir=gs://bucket-name/directory-name/mapreduce-job-history/intermediate-done
      
    • spark:spark.history.fs.gs.outputstream.typespark:spark.history.fs.gs.outputstream.sync.min.interval.ms:添加这些 Cloud Storage 连接器属性可更改作业集群将数据发送到 Cloud Storage 的默认行为。默认的 spark:spark.history.fs.gs.outputstream.typeBASIC,它会在作业完成后将数据发送到 Cloud Storage。您可以将此设置更改为 FLUSHABLE_COMPOSITE 以更改刷新行为,以便在作业运行时定期将数据复制到 Cloud Storage。
      spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE
      
      默认 spark:spark.history.fs.gs.outputstream.sync.min.interval.ms(用于控制数据传输到 Cloud Storage 的频率)是 5000ms,可以更改为其他 ms 时间间隔:
      spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=intervalms
      
      注意:如需设置这些属性,Dataproc 作业集群映像版本必须使用 Cloud Storage 连接器 2.2.0 或更高版本。您可以通过 Dataproc 映像版本列表页面查看映像版本上安装的连接器版本。
    • dataproc:yarn.atsv2.bigtable.instance配置 Yarn 时间轴服务 v2 后,请添加此属性,以将 YARN 时间轴数据写入指定的 Bigtable 实例,以便在 PHS 集群 YARN 应用时间轴服务 V2Tez 网页界面上查看。注意:如果 Bigtable 实例不存在,则无法创建集群。
      dataproc:yarn.atsv2.bigtable.instance=projects/project-id/instance_id/bigtable-instance-id
      

将 PHS 与 Spark 批处理工作负载搭配使用

如需将永久性历史记录服务器与 Dataproc 无服务器用于处理 Spark 批处理工作负载,请执行以下操作:

  1. 创建 PHS 集群

  2. 提交 Spark 批处理工作负载时,请选择或指定 PHS 集群。

在 Google Kubernetes Engine 上将 PHS 与 Dataproc 搭配使用

如需将永久性历史记录服务器与 Dataproc on GKE 搭配使用,请执行以下操作:

  1. 创建 PHS 集群

  2. 创建 Dataproc on GKE 虚拟集群时,请选择或指定 PHS 集群。

组件网关网页界面

在控制台的 Dataproc 集群页面中,点击 PHS 集群名称以打开集群详情页面。在网页界面标签页下,选择“组件网关”链接,以打开 PHS 集群上运行的网页界面。

Spark 历史记录服务器网页界面

以下屏幕截图显示了 Spark 历史记录服务器网页界面,其中显示了在设置作业集群(spark.history.fs.logDirectoryspark:spark.eventLog.dir 以及 PHS 集群的 spark.history.fs.logDirectory 位置后,在 job-cluster-1 和 job-cluster-2 上运行的 Spark 作业的链接):

job-cluster-1 gs://example-cloud-storage-bucket/job-cluster-1/spark-job-history
job-cluster-2 gs://example-cloud-storage-bucket/job-cluster-2/spark-job-history
phs-cluster gs://example-cloud-storage-bucket/*/spark-job-history

您可以在 Spark 历史记录服务器网页界面中按应用名称列出作业,方法是在搜索框中输入应用名称。可以通过以下方式之一设置应用名称(按优先级列出):

  1. 创建 Spark 上下文时在应用代码内设置
  2. 提交作业时由 spark.app.name 属性设置
  3. 由 Dataproc 设置为作业的完整 REST 资源名称 (projects/project-id/regions/region/jobs/job-id)

用户可以在搜索框中输入应用或资源名称字词,以查找和列出招聘信息。

事件日志

Spark 历史记录服务器网页界面提供了一个事件日志按钮,您可以点击该按钮以下载 Spark 事件日志。这些日志适用于检查 Spark 应用的生命周期。

Spark 作业

Spark 应用分为多个作业,这些作业进一步分为多个阶段。每个阶段可能有多个任务,这些任务在执行程序节点(工作器)上运行。

  • 点击网页界面中的 Spark 应用 ID 以打开“Spark 作业”页面,该页面提供了应用内作业的事件时间轴和摘要。

  • 点击某个作业即可打开“作业详情”页面,其中包含有向无环图 (DAG) 以及作业阶段的摘要。

  • 点击某个阶段或使用“阶段”标签页选择某个阶段即可打开“阶段详情”页面。

    “阶段详情”包括阶段内任务的 DAG 可视化、事件时间轴和指标。您可以使用此页面排查与绞杀任务、调度程序延迟时间和内存不足错误相关的问题。DAG 可视化工具会显示从中派生阶段的代码行,从而帮助您追踪发生问题的代码。

  • 点击“执行程序”标签页,了解 Spark 应用的驱动程序和执行程序节点。

    本页面上的重要信息包括核心数和每个执行程序上运行的任务数。

Tez 网页界面

Tez 是 Dataproc 上的 Hive 和 Pig 的默认执行引擎。在 Dataproc 作业集群上提交 Hive 作业会启动 Teez 应用(请参阅在 Dataproc 上使用 Apache Hive)。

如果您在创建 PHS 和 Dataproc 作业集群时配置了 Yarn 时间轴服务 v2 并设置 dataproc:yarn.atsv2.bigtable.instance 属性,则 YARN 会将生成的 Hive 和 Pig 作业时间轴数据写入指定的 Bigtable 实例,以便检索并在 PHS 服务器上运行的 Tez 网页界面上显示。

YARN 应用时间轴 V2 网页界面

如果您在创建 PHS 和 Dataproc 作业集群时配置了 Yarn 时间轴服务 v2 并设置 dataproc:yarn.atsv2.bigtable.instance 属性,则 YARN 会将生成的作业时间轴数据写入指定的 Bigtable 实例,以供检索并在 PHS 服务器上运行的 YARN 应用时间轴服务网页界面上显示。Dataproc 作业列在网页界面的流活动标签页下。

配置 Yarn 时间轴服务 v2

如需配置 Yarn 时间轴服务 v2,请设置 Bigtable 实例,并根据需要检查服务帐号角色,如下所示:

  1. 创建 Bigtable 实例

  2. 检查服务帐号角色(如果需要)。Dataproc 集群虚拟机使用的默认虚拟机服务帐号具有为 YARN 时间轴服务创建和配置 Bigtable 实例所需的权限。如果您使用用户管理的虚拟机服务帐号创建作业或 PHS 集群,则该帐号必须具有 Bigtable AdministratorBigtable User 角色

所需的表架构

YARN Timeline 服务 v2 的 Dataproc PHS 支持需要在 Bigtable 实例中创建特定架构。创建作业集群或 PHS 集群时,Dataproc 会创建所需的架构,并将 dataproc:yarn.atsv2.bigtable.instance 属性设置为指向 Bigtable 实例。

以下是所需的 Bigtable 实例架构:

列族
prod.timelineservice.application c、i、m
prod.timelineservice.app_flow 分钟
prod.timelineservice.entity c、i、m
prod.timelineservice.flowactivity i
prod.timelineservice.flowrun i
prod.timelineservice.subapplication c、i、m

Bigtable 垃圾回收

您可以为 ATSv2 表配置基于年龄的大垃圾回收垃圾回收

  • 安装 cbt(包括创建 .cbrtc file)。

  • 创建 ATSv2 基于年龄的垃圾回收政策:

export NUMBER_OF_DAYS = number \
cbt setgcpolicy prod.timelineservice.application c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.application i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.application m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.app_flow m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.flowactivity i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.flowrun i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication m maxage=${NUMBER_OF_DAYS}

备注:

NUMBER_OF_DAYS:最长天数为 30d