Komponen Hudi opsional Dataproc

Anda dapat menginstal komponen tambahan seperti Hudi saat membuat cluster Dataproc menggunakan fitur Optional components. Halaman ini menjelaskan cara untuk secara opsional menginstal komponen Hudi di cluster Dataproc.

Saat diinstal pada cluster Dataproc, komponen Apache Hudi menginstal library Hudi dan mengonfigurasi Spark dan Hive dalam cluster agar berfungsi dengan Hudi.

Versi gambar Dataproc yang kompatibel

Anda dapat menginstal komponen Hudi pada cluster Dataproc yang dibuat dengan versi gambar Dataproc berikut:

Saat Anda membuat Dataproc dengan cluster Hudi, properti Spark dan Hive berikut dikonfigurasi agar berfungsi dengan Hudi.

File konfigurasi Properti Nilai default
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Menginstal komponen

Instal komponen Hudi saat Anda membuat cluster Dataproc.

Halaman versi rilis gambar Dataproc mencantumkan versi komponen Hudi yang disertakan dalam setiap rilis image Dataproc.

Konsol

  1. Aktifkan komponen.
    • Di konsol Google Cloud, buka halaman Create a cluster pada Dataproc. Panel Siapkan cluster dipilih.
    • Di bagian Komponen:
      • Di bagian Komponen opsional, pilih komponen Hudi.

Perintah gcloud

Untuk membuat cluster Dataproc yang menyertakan komponen Hudi, gunakan perintah dengan flag --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Ganti kode berikut:

  • CLUSTER_NAME: Wajib diisi. Nama cluster baru.
  • REGION: Wajib diisi. Region cluster.
  • DATAPROC_IMAGE: Opsional. Anda dapat menggunakan tanda ini secara opsional untuk menentukan versi gambar Dataproc non-default (lihat Versi gambar Dataproc default).
  • PROPERTIES: Opsional. Anda dapat menggunakan flag ini secara opsional untuk menetapkan properti komponen Hudi, yang ditentukan dengan awalan file hudi: Contoh: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Properti versi komponen Hudi: Secara opsional, Anda dapat menentukan properti dataproc:hudi.version. Catatan: Versi komponen Hudi ditetapkan oleh Dataproc agar kompatibel dengan versi gambar cluster Dataproc. Jika Anda menetapkan properti ini, pembuatan cluster dapat gagal jika versi yang ditentukan tidak kompatibel dengan image cluster.
    • Properti Spark dan Hive: Dataproc menetapkan properti Spark dan Hive terkait Hudi saat cluster dibuat. Anda tidak perlu menetapkannya saat membuat cluster atau mengirimkan tugas.

REST API

Komponen Hudi dapat diinstal melalui Dataproc API menggunakan SoftwareConfig.Component sebagai bagian dari permintaan clusters.create.

Mengirim tugas untuk membaca dan menulis tabel Hudi

Setelah membuat cluster dengan komponen Hudi, Anda dapat mengirimkan tugas Spark dan Hive yang membaca dan menulis tabel Hudi.

Contoh gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    --job-file=JOB_FILE \
    -- JOB_ARGS

Contoh tugas PySpark

File PySpark berikut membuat, membaca, dan menulis tabel Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession

def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)

def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))

def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)

def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)

def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)

def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')

main()

Perintah gcloud CLI berikut mengirimkan contoh file PySpark ke Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Menggunakan Hudi CLI

Hudi CLI terletak di /usr/lib/hudi/cli/hudi-cli.sh pada node master cluster Dataproc. Anda dapat menggunakan Hudi CLI untuk melihat skema, commit, dan statistik tabel Hudi, serta untuk melakukan operasi administratif secara manual, seperti pemadatan jadwal (lihat Menggunakan hudi-cli).

Untuk memulai Hudi CLI dan terhubung ke tabel Hudi:

  1. SSH ke node master.
  2. Jalankan /usr/lib/hudi/cli/hudi-cli.sh. Command prompt berubah menjadi hudi->.
  3. Jalankan connect --path gs://my-bucket/my-hudi-table.
  4. Jalankan perintah, seperti desc, yang menjelaskan skema tabel, atau commits show, yang menampilkan histori commit.
  5. Untuk menghentikan sesi CLI, jalankan exit.

Untuk informasi selengkapnya