Dataproc Persistent History Server

Overview

The Dataproc Persistent History Server (PHS) provides web interfaces to view job history for jobs run on active or deleted Dataproc clusters. It is available in Dataproc image version 1.5 and later, and runs on a single node Dataproc cluster. It provides web interfaces to the following files and data:

  • MapReduce and Spark job history files

  • Application Timeline data files created by YARN Timeline Service v2 and stored in a Bigtable instance.

  • YARN aggregation logs

The Persistent History Server accesses and displays Spark and MapReduce job history files, and YARN log files written to Cloud Storage during the lifetime of Dataproc job clusters.

Create a PHS cluster

You can run the following gcloud dataproc clusters create command in a local terminal or in Cloud Shell with the following flags and cluster properties to create a Dataproc Persistent History Server single-node cluster.

gcloud dataproc clusters create cluster-name \
    --region=region \
    --single-node \
    --enable-component-gateway \
    --properties=properties

Notes:

  • --enable-component-gateway: Required to enable Component Gateway web interfaces on the PHS cluster.
  • Add one or more of the following cluster properties to enable persisting and viewing job history and other logs using the Persistent History Server.

    Example:

    --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/*/spark-job-history 
    --properties=mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/*/mapreduce-job-history/done

    • yarn:yarn.nodemanager.remote-app-log-dir=gs://bucket-name/*/yarn-logs: Add this property to specify the Cloud Storage location where the PHS will access YARN logs written by job clusters (see Create a job clusters). The value shown uses a "*" wildcard character to allow the PHS to match multiple directories in the specified bucket written to by different job clusters (but see Efficiency Consideration: Using Mid-Path Wildcards).
    • spark:spark.history.fs.logDirectory=gs://bucket-name/*/spark-job-history: Add this property to enable persistent Spark job history. This property specifies the location where the PHS will access Spark job history logs written by job clusters (see Create a job cluster). The value shown uses a "*" wildcard character to allow the PHS to match multiple directories in the specified bucket written to by different job clusters (but see Efficiency Consideration: Using Mid-Path Wildcards).

      Note: In Dataproc 2.0+ clusters, the following properties must also be set to enable PHS Spark history logs (see Spark History Server Configuration Options). The spark.history.custom.executor.log.url value is a literal value that contains {{PLACEHOLDERS}} for variables that will be set by the Persistent History Server. These variables are not set by users; pass in the property value as shown.

      --properties=spark:spark.history.custom.executor.log.url.applyIncompleteApplication=false
      --properties=spark:spark.history.custom.executor.log.url={{YARN_LOG_SERVER_URL}}/{{NM_HOST}}:{{NM_PORT}}/{{CONTAINER_ID}}/{{CONTAINER_ID}}/{{USER}}/{{FILE_NAME}}
      
    • mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/*/mapreduce-job-history/done: Add this property to enable persistent MapReduce job history. This property specifies the Cloud Storage location where the PHS will access MapReduce job history logs written by job clusters (see Create a job clusters). The value shown uses a "*" wildcard character to allow the PHS to match multiple directories in the specified bucket written to by different job clusters (but see Efficiency Consideration: Using Mid-Path Wildcards).

    • dataproc:yarn.atsv2.bigtable.instance=projects/project-id/instance_id/bigtable-instance-id: After you Configure Yarn Timeline Service v2, add this property to use the PHS cluster to view timeline data on the YARN Application Timeline Service V2 and Tez web interfaces (see Component Gateway web interfaces).

Create a Dataproc job cluster

You can run the following command in a local terminal or in Cloud Shell to create a Dataproc job cluster that writes job history files to a Persistent History Server (PHS).

gcloud dataproc clusters create cluster-name \
    --region=region \
    --enable-component-gateway \
    --properties=properties
    other args ...

Notes:

  • --enable-component-gateway: This flag is required to enable Component Gateway web interfaces on the job cluster.
  • Add one or more of the following cluster properties to set PHS-related non-default Cloud Storage locations and other job cluster properties.

    Example:

    --properties=spark:spark.history.fs.logDirectory=gs://bucket-name/job-cluster-1/spark-job-history 
    --properties=mapred:mapreduce.jobhistory.read-only.dir-pattern=gs://bucket-name/job-cluster-1/mapreduce-job-history/done
    • yarn:yarn.nodemanager.remote-app-log-dir: By default, aggregated YARN logs are enabled on Dataproc job clusters and written to the cluster temp bucket. Add this property to specify a different Cloud Storage location where the cluster will write aggregation logs for access by the Persistent History Server.
      yarn:yarn.nodemanager.remote-app-log-dir=gs://bucket-name/directory-name/yarn-logs
      
    • spark:spark.history.fs.logDirectory and spark:spark.eventLog.dir: By default, Spark job history files are saved in the cluster temp bucket in the /spark-job-history directory. You can add these properties to specify different Cloud Storage locations for these files. If both properties are used, they must point to directories in the same bucket.
      spark:spark.history.fs.logDirectory=gs://bucket-name/directory-name/spark-job-history,
      spark:spark.eventLog.dir=gs://bucket-name/directory-name/spark-job-history
      
    • mapred:mapreduce.jobhistory.done-dir and mapred:mapreduce.jobhistory.intermediate-done-dir: By default, MapReduce job history files are saved in the cluster temp bucket in the /mapreduce-job-history/done and /mapreduce-job-history/intermediate-done directories. The intermediate mapreduce.jobhistory.intermediate-done-dir location is temporary storage; intermediate files are moved to the mapreduce.jobhistory.done-dir location when the MapReduce job completes. You can add these properties to specify different Cloud Storage locations for these files. If both properties are used, they must point to directories in the same bucket.
      mapred:mapreduce.jobhistory.done-dir=gs://bucket-name/directory-name/mapreduce-job-history/done,
      mapred:mapreduce.jobhistory.intermediate-done-dir=gs://bucket-name/directory-name/mapreduce-job-history/intermediate-done
      
    • spark:spark.history.fs.gs.outputstream.type and spark:spark.history.fs.gs.outputstream.sync.min.interval.ms: Add these Cloud Storage connector properties to change the default behaviour of how the job cluster sends data to Cloud Storage. The default spark:spark.history.fs.gs.outputstream.type is BASIC, which sends data to Cloud Storage after job completion. You can change this setting to FLUSHABLE_COMPOSITE to change flush behavior to copy data to Cloud Storage at regular intervals while the job is running.
      spark:spark.history.fs.gs.outputstream.type=FLUSHABLE_COMPOSITE
      
      The default spark:spark.history.fs.gs.outputstream.sync.min.interval.ms, which controls the frequency at which data is transferred to Cloud Storage, is 5000ms, and can be changed to a different ms time interval:
      spark:spark.history.fs.gs.outputstream.sync.min.interval.ms=intervalms
      
      Note: To set these properties, the Dataproc job cluster image version must use Cloud Storage connector version 2.2.0 or later. You can check the connector version installed on image versions from the Dataproc image version list page.
    • dataproc:yarn.atsv2.bigtable.instance: After you Configure Yarn Timeline Service v2, add this property to write YARN timeline data to the specified Bigtable instance for viewing on the PHS cluster YARN Application Timeline Service V2 and Tez web interfaces. Note: cluster creation will fail if the Bigtable instance does not exist.
      dataproc:yarn.atsv2.bigtable.instance=projects/project-id/instance_id/bigtable-instance-id
      

Use PHS with Spark batch workloads

To use the Persistent History Server with Dataproc Serverless for Spark batch workloads:

  1. Create a PHS cluster.

  2. Select or specify the PHS cluster when you submit a Spark batch workload.

Use PHS with Dataproc on Google Kubernetes Engine

To use the Persistent History Server with Dataproc on GKE:

  1. Create a PHS cluster.

  2. Select or specify the PHS cluster when you create a Dataproc on GKE virtual cluster.

Component Gateway web interfaces

In the in the Google Cloud console, from the Dataproc Clusters page, click PHS cluster name to open the Cluster details page. Under the Web Interfaces tab, select the Component gateway links to open web interfaces running on the PHS cluster.

Spark History Server web interface

The following screenshot shows the Spark History Server web interface displaying links to Spark jobs run on job-cluster-1 and job-cluster-2 after setting up the job clusters'spark.history.fs.logDirectory and spark:spark.eventLog.dir and PHS cluster's spark.history.fs.logDirectory locations as follows:

job-cluster-1 gs://example-cloud-storage-bucket/job-cluster-1/spark-job-history
job-cluster-2 gs://example-cloud-storage-bucket/job-cluster-2/spark-job-history
phs-cluster gs://example-cloud-storage-bucket/*/spark-job-history

You can list jobs by App Name in the Spark History Server web interface by entering an app name in the search box. The app name can be set in one of the following ways (listed by priority):

  1. Set inside the application code when creating the spark context
  2. Set by the spark.app.name property when the job is submitted
  3. Set by Dataproc to the full REST resource name for job (projects/project-id/regions/region/jobs/job-id)

Users can input an app or resource name term in the Search box to find and list jobs.

Event logs

The Spark History Server web interface provides an Event Log button you can click to download Spark event logs. These logs are useful for examining the lifecycle of the Spark application.

Spark jobs

Spark applications are broken down into multiple jobs, which are further broken down into multiple stages. Each stage can have multiple tasks, which are run on executor nodes (workers).

  • Click on a Spark App ID in the web interface to open the Spark Jobs page, which provides an event timeline and summary of jobs within the application.

  • Click a job to open a Job Details page with a Directed Acyclic Graph (DAG) and summary of job stages.

  • Click on a stage or use the Stages tab to select a stage to open the Stage Details page.

    Stage Details includes a DAG visualization, an event timeline, and metrics for the tasks within the stage. You can use this page to troubleshoot issues related to strangled tasks, scheduler delays, and out of memory errors. The DAG visualizer shows the line of code from which the stage is derived, helping you track issues back to the code.

  • Click on the Executors tab for information about the Spark application's driver and executor nodes.

    Important pieces of information on this page include the number of cores and the number of tasks that were run on each executor.

Tez web interface

Tez is the default execution engine for Hive and Pig on Dataproc. Submitting a Hive job on a Dataproc job cluster launches a Tez application (see Using Apache Hive on Dataproc ).

If you configured Yarn Timeline Service v2 and set the dataproc:yarn.atsv2.bigtable.instance property when you created the PHS and Dataproc job clusters, YARN writes generated Hive and Pig job timeline data to the specified Bigtable instance for retrieval and display on the Tez web interface running on the PHS server.

YARN Application Timeline V2 web interface

If you configured Yarn Timeline Service v2 and set the dataproc:yarn.atsv2.bigtable.instance property when you created the PHS and Dataproc job clusters, YARN writes generated job timeline data to the specified Bigtable instance for retrieval and display on the YARN Application Timeline Service web interface running on the PHS server. Dataproc jobs are listed under the Flow Activity tab in web interface.

Configure Yarn Timeline Service v2

To configure Yarn Timeline Service v2, set up a Bigtable instance and, in needed, check service account roles, as follows:

  1. Create a Bigtable instance.

  2. Check service account roles, if needed. The default VM service account used by Dataproc cluster VMs has the permissions needed create and configure the Bigtable instance for the YARN Timeline Service. If you create your job or PHS cluster with a user-managed VM Service account, the account has must have either the Bigtable Administrator or Bigtable User role.

Required table schema

Dataproc PHS support for YARN Timeline Service v2 requires a specific schema created in the Bigtable instance. Dataproc creates the required schema when a job cluster or PHS cluster is created with the dataproc:yarn.atsv2.bigtable.instance property set to point to the Bigtable instance.

The following is the required Bigtable instance schema:

Tables Column families
prod.timelineservice.application c,i,m
prod.timelineservice.app_flow m
prod.timelineservice.entity c,i,m
prod.timelineservice.flowactivity i
prod.timelineservice.flowrun i
prod.timelineservice.subapplication c,i,m

Bigtable garbage collection

You can configure age-based Bigtable Garbage Collection for ATSv2 tables:

  • Install cbt, (including the creation of the .cbrtc file).

  • Create the ATSv2 age-based garbage collection policy:

export NUMBER_OF_DAYS = number \
cbt setgcpolicy prod.timelineservice.application c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.application i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.application m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.app_flow m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.entity m maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.flowactivity i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.flowrun i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication c maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication i maxage=${NUMBER_OF_DAYS} \
cbt setgcpolicy prod.timelineservice.subapplication m maxage=${NUMBER_OF_DAYS}

Notes:

NUMBER_OF_DAYS: Maximum number of days is 30d.