Dataproc の Hudi オプション コンポーネント

Dataproc クラスタを作成する際には、オプション コンポーネント機能を使用して、Hudi などの追加コンポーネントをインストールできます。このページでは、必要に応じて、Dataproc クラスタに Hudi コンポーネントをインストールする方法について説明します。

Dataproc クラスタにインストールすると、Apache Hudi コンポーネントは Hudi ライブラリをインストールし、Hudi と連携するようにクラスタに Spark と Hive を構成します。

互換性のある Dataproc イメージ バージョン

次の Dataproc イメージ バージョンで作成された Dataproc クラスタに Hudi コンポーネントをインストールできます。

Hudi クラスタで Dataproc を作成すると、次の Spark プロパティと Hive プロパティが Hudi と連携するように構成されます。

構成ファイル プロパティ デフォルト値
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

コンポーネントをインストールする

Dataproc クラスタの作成時に Hudi コンポーネントをインストールします。

Dataproc イメージ リリースのバージョン ページには、各 Dataproc イメージ リリースに含まれる Hudi コンポーネントのバージョンが一覧表示されています。

コンソール

  1. コンポーネントを有効にします。
    • Google Cloud コンソールで、Dataproc の [クラスタの作成] ページを開きます。[クラスタの設定] パネルが選択されています。
    • [コンポーネント] セクションで次の設定を行います。
      • [オプション コンポーネント] で [Hudi] コンポーネントを選択します。

gcloud コマンド

Hudi コンポーネントを含む Dataproc クラスタを作成するには、--optional-components フラグを指定してコマンドを使用します。

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

次のように置き換えます。

  • CLUSTER_NAME: 必須。新しいクラスタ名。
  • REGION: 必須。クラスタ リージョン
  • DATAPROC_IMAGE: 省略可。このオプション フラグを使用すると、デフォルト以外の Dataproc イメージ バージョンを指定できます(デフォルトの Dataproc イメージ バージョンをご覧ください)。
  • PROPERTIES: 省略可。このオプション フラグを使用すると、hudi: ファイル接頭辞で指定される Hudi コンポーネント プロパティを設定できます。(例: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE)。
    • Hudi コンポーネント バージョンのプロパティ: 必要に応じて dataproc:hudi.version プロパティを指定できます。注: Hudi コンポーネントのバージョンは、Dataproc クラスタ イメージ バージョンとの互換性を確保するために Dataproc によって設定されます。このプロパティを設定すると、指定したバージョンがクラスタ イメージと互換性がない場合、クラスタの作成に失敗することがあります。
    • Spark と Hive のプロパティ: Dataproc は、クラスタの作成時に Hudi 関連の Spark プロパティと Hive プロパティを設定します。クラスタの作成時やジョブの送信時に設定する必要はありません。

REST API

Hudi コンポーネントは、clusters.create リクエストの一部として SoftwareConfig.Component を使用して Dataproc API によりインストールできます。

ジョブを送信して Hudi テーブルの読み取りと書き込みを行う

Hudi コンポーネントでクラスタを作成した後に、Hudi テーブルの読み取りと書き込みを行う Spark ジョブと Hive ジョブを送信できます。

gcloud CLI の例:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

PySpark サンプルジョブ

次の PySpark ファイルは、Hudi テーブルの作成、読み取り、書き込みを行います。

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

次の gcloud CLI コマンドは、PySpark サンプル ファイルを Dataproc に送信します。

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Hudi CLI を使用する

Hudi CLI は、Dataproc クラスタ マスターノードの /usr/lib/hudi/cli/hudi-cli.sh にあります。Hudi CLI を使用して、Hudi テーブルのスキーマ、commit、統計情報を表示し、スケジュールの圧縮などの管理オペレーションを手動で実行できます(hudi-cli の使用をご覧ください)。

Hudi CLI を起動して Hudi テーブルに接続するには:

  1. マスターノードに SSH で接続します
  2. /usr/lib/hudi/cli/hudi-cli.sh を実行します。コマンド プロンプトが hudi-> に変わります。
  3. connect --path gs://my-bucket/my-hudi-table を実行します。
  4. テーブル スキーマを記述する desc や、commit 履歴を表示する commits show などのコマンドを実行します。
  5. CLI セッションを停止するには、exit を実行します。

次のステップ