Dataproc optional Hudi component

You can install additional components like Hudi when you create a Dataproc cluster using the Optional components feature. This page describes how you can optionally install the Hudi component on a Dataproc cluster.

When installed on a Dataproc cluster, the Apache Hudi component installs Hudi libraries and configures Spark and Hive in the cluster to work with Hudi.

Compatible Dataproc image versions

You can install the Hudi component on Dataproc clusters created with the following Dataproc image versions:

When you create a Dataproc with Hudi cluster, the following Spark and Hive properties are configured to work with Hudi.

Config file Property Default value
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Install the component

Install the Hudi component when you create a Dataproc cluster.

The Dataproc image release version pages list the Hudi component version included in each Dataproc image release.

Console

  1. Enable the component.
    • In the Google Cloud console, open the Dataproc Create a cluster page. The Set up cluster panel is selected.
    • In the Components section:
      • Under Optional components, select the Hudi component.

gcloud command

To create a Dataproc cluster that includes the Hudi component, use the command with the --optional-components flag.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Replace the following:

  • CLUSTER_NAME: Required. The new cluster name.
  • REGION: Required. The cluster region.
  • DATAPROC_IMAGE: Optional. You can use this optional this flag to specify a non-default Dataproc image version (see Default Dataproc image version).
  • PROPERTIES: Optional. You can use this optional this flag to set Hudi component properties, which are specified with the hudi: file-prefix Example: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Hudi component version property: You can optionally specify the dataproc:hudi.version property. Note: The Hudi component version is set by Dataproc to be compatible with the Dataproc cluster image version. If you set this property, cluster creation can fail if the specified version is not compatible with the cluster image.
    • Spark and Hive properties: Dataproc sets Hudi-related Spark and Hive properties when the cluster is created. You do not need to set them when creating the cluster or submitting jobs.

REST API

The Hudi component can be installed through the Dataproc API using SoftwareConfig.Component as part of a clusters.create request.

Submit a job to read and write Hudi tables

After creating a cluster with the Hudi component, you can submit Spark and Hive jobs that read and write Hudi tables.

gcloud CLI example:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    --job-file=JOB_FILE \
    -- JOB_ARGS

Sample PySpark job

The following PySpark file creates, reads, and writes a Hudi table.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

The following gcloud CLI command submits the sample PySpark file to Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Use the Hudi CLI

The Hudi CLI is located at /usr/lib/hudi/cli/hudi-cli.sh on the Dataproc cluster master node. You can use the Hudi CLI to view Hudi table schemas, commits, and statistics, and to manually perform administrative operations, such as schedule compactions (see Using hudi-cli).

To start the Hudi CLI and connect to a Hudi table:

  1. SSH into the master node.
  2. Run /usr/lib/hudi/cli/hudi-cli.sh. The command prompt changes to hudi->.
  3. Run connect --path gs://my-bucket/my-hudi-table.
  4. Run commands, such as desc, which describes the table schema, or commits show, which shows the commit history.
  5. To stop the CLI session, run exit.

For more information