Komponen Hudi opsional Dataproc

Anda dapat menginstal komponen tambahan seperti Hudi saat membuat cluster Dataproc menggunakan fitur Komponen opsional. Halaman ini menjelaskan cara menginstal komponen Hudi secara opsional di cluster Dataproc.

Saat diinstal di cluster Dataproc, komponen Apache Hudi akan menginstal library Hudi dan mengonfigurasi Spark dan Hive di cluster agar berfungsi dengan Hudi.

Versi image Dataproc yang kompatibel

Anda dapat menginstal komponen Hudi di cluster Dataproc yang dibuat dengan versi image Dataproc berikut:

Saat Anda membuat Dataproc dengan cluster Hudi, properti Spark dan Hive berikut akan dikonfigurasi agar berfungsi dengan Hudi.

File konfigurasi Properti Nilai default
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Menginstal komponen

Instal komponen Hudi saat Anda membuat cluster Dataproc.

Halaman versi rilis image Dataproc mencantumkan versi komponen Hudi yang disertakan dalam setiap rilis image Dataproc.

Konsol

  1. Aktifkan komponen.
    • Di konsol Google Cloud, buka halaman Create a cluster Dataproc. Panel Siapkan cluster dipilih.
    • Di bagian Components:
      • Di bagian Komponen opsional, pilih komponen Hudi.

Perintah gcloud

Untuk membuat cluster Dataproc yang menyertakan komponen Hudi, gunakan perintah dengan flag --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Ganti kode berikut:

  • CLUSTER_NAME: Wajib diisi. Nama cluster baru.
  • REGION: Wajib diisi. Region cluster.
  • DATAPROC_IMAGE: Opsional. Anda dapat menggunakan tanda opsional ini untuk menentukan versi image Dataproc non-default (lihat Versi image Dataproc default).
  • PROPERTIES: Opsional. Anda dapat menggunakan flag opsional ini untuk menetapkan properti komponen Hudi, yang ditentukan dengan hudi: file-prefix Contoh: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Properti versi komponen Hudi: Anda dapat menentukan properti dataproc:hudi.version secara opsional. Catatan: Versi komponen Hudi ditetapkan oleh Dataproc agar kompatibel dengan versi image cluster Dataproc. Jika Anda menetapkan properti ini, pembuatan cluster dapat gagal jika versi yang ditentukan tidak kompatibel dengan image cluster.
    • Properti Spark dan Hive: Dataproc menetapkan properti Spark dan Hive terkait Hudi saat cluster dibuat. Anda tidak perlu menetapkannya saat membuat cluster atau mengirimkan tugas.

REST API

Komponen Hudi dapat diinstal melalui Dataproc API menggunakan SoftwareConfig.Component sebagai bagian dari permintaan clusters.create.

Mengirimkan tugas untuk membaca dan menulis tabel Hudi

Setelah membuat cluster dengan komponen Hudi, Anda dapat mengirimkan tugas Spark dan Hive yang membaca dan menulis tabel Hudi.

Contoh gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Contoh tugas PySpark

File PySpark berikut membuat, membaca, dan menulis tabel Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

Perintah gcloud CLI berikut mengirimkan file PySpark contoh ke Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Menggunakan Hudi CLI

CLI Hudi terletak di /usr/lib/hudi/cli/hudi-cli.sh pada node master cluster Dataproc. Anda dapat menggunakan Hudi CLI untuk melihat skema, commit, dan statistik tabel Hudi, serta untuk melakukan operasi administratif secara manual, seperti jadwalkan pemadatan (lihat Menggunakan hudi-cli).

Untuk memulai Hudi CLI dan terhubung ke tabel Hudi:

  1. Gunakan SSH untuk mengakses node master.
  2. Jalankan /usr/lib/hudi/cli/hudi-cli.sh. Command prompt berubah menjadi hudi->.
  3. Jalankan connect --path gs://my-bucket/my-hudi-table.
  4. Jalankan perintah, seperti desc, yang menjelaskan skema tabel, atau commits show, yang menampilkan histori commit.
  5. Untuk menghentikan sesi CLI, jalankan exit.

Untuk informasi selengkapnya