Componente Hudi facoltativo Dataproc

Puoi installare componenti aggiuntivi come Hudi quando crei un progetto Dataproc utilizzando il cluster Componenti facoltativi funzionalità. In questa pagina viene descritto come installare facoltativamente il componente Hudi su un cluster Dataproc.

Quando è installato su un cluster Dataproc, Apache Hudi installa le librerie Hudi e configura Spark e Hive nel cluster per lavorare con Hudi.

Versioni immagine Dataproc compatibili

Puoi installare il componente Hudi sui cluster Dataproc creati con le seguenti versioni immagine di Dataproc:

Quando crei un cluster Dataproc con Hudi, vengono creati i seguenti elementi Spark e Hive sono configurate in modo da funzionare con Hudi.

File di configurazione Proprietà Valore predefinito
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Installa il componente

Installa il componente Hudi quando crei un cluster Dataproc.

Le pagine delle versioni delle release delle immagini Dataproc indica la versione del componente Hudi inclusa in ogni release dell'immagine Dataproc.

Console

  1. Attiva il componente.
    • Nella console Google Cloud, apri Dataproc Crea un cluster . Il riquadro Configura cluster è selezionato.
    • Nella sezione Componenti:
        .
      • In Componenti facoltativi, seleziona Componente Hudi.

Comando g-cloud

Per creare un cluster Dataproc che includa il componente Hudi, usa il comando con il flag --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Sostituisci quanto segue:

  • CLUSTER_NAME: campo obbligatorio. Il nome del nuovo cluster.
  • REGION: campo obbligatorio. La regione del cluster.
  • DATAPROC_IMAGE: facoltativo. Puoi usare questo flag facoltativo specificare una versione immagine Dataproc non predefinita (vedi Versione immagine Dataproc predefinita).
  • PROPERTIES: facoltativo. Puoi usare questo flag facoltativo imposta le proprietà del componente Hudi, che sono specificati con prefisso file di hudi: Esempio: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Proprietà della versione del componente Hudi: puoi specificare facoltativamente la proprietà Proprietà dataproc:hudi.version. Nota: la versione del componente Hudi è impostata da Dataproc sia compatibile con la versione immagine del cluster Dataproc. Se imposti questa proprietà, la creazione del cluster può non riuscire se la versione specificata compatibili con l'immagine del cluster.
    • Proprietà Spark e Hive: set Dataproc Spark e Hive correlati a Hudi quando viene creato il cluster. Non è necessario impostarle durante la creazione del cluster o l'invio di job.

API REST

Il componente Hudi può essere installato tramite l'API Dataproc utilizzando SoftwareConfig.Component nell'ambito di un clusters.create richiesta.

Invia un job per leggere e scrivere tabelle Hudi

Dopo aver creato un cluster con il componente Hudi, puoi inviare job Spark e Hive che leggono e scrivono tabelle Hudi.

Esempio di gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Esempio di job PySpark

Il seguente file PySpark crea, legge e scrive una tabella Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

Il seguente comando gcloud CLI invia il file PySpark di esempio a Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Utilizzare l'interfaccia a riga di comando Hudi

L'interfaccia a riga di comando Hudi si trova all'indirizzo /usr/lib/hudi/cli/hudi-cli.sh sulla Nodo master del cluster Dataproc. Puoi utilizzare Hudi CLI per visualizzare schemi, commit e statistiche delle tabelle Hudi ed eseguire manualmente operazioni amministrative, come la pianificazione di compattazioni (vedi utilizzando hudi-cli).

Per avviare Hudi CLI e connetterti a una tabella Hudi:

  1. Accedi tramite SSH al nodo master.
  2. Esegui /usr/lib/hudi/cli/hudi-cli.sh. Il prompt dei comandi diventa hudi->.
  3. Esegui connect --path gs://my-bucket/my-hudi-table.
  4. Esegui comandi, come desc, che descrive lo schema della tabella, oppure commits show, che mostra la cronologia dei commit.
  5. Per arrestare la sessione dell'interfaccia a riga di comando, esegui exit.

Per ulteriori informazioni