Composant Hudi facultatif de Dataproc

Vous pouvez installer des composants supplémentaires tels que Hudi lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page explique comment installer le composant Hudi sur un cluster Dataproc.

Lorsqu'il est installé sur un cluster Dataproc, le composant Apache Hudi installe les bibliothèques Hudi et configure Spark et Hive dans le cluster pour qu'ils fonctionnent avec Hudi.

Versions d'image Dataproc compatibles

Vous pouvez installer le composant Hudi sur les clusters Dataproc créés avec les versions d'image Dataproc suivantes:

Lorsque vous créez un cluster Dataproc avec Hudi, les propriétés Spark et Hive suivantes sont configurées pour fonctionner avec Hudi.

Fichier de configuration Propriété Valeur par défaut
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Installer le composant

Installez le composant Hudi lorsque vous créez un cluster Dataproc.

Les pages de version des images Dataproc indiquent la version du composant Hudi incluse dans chaque version d'image Dataproc.

Console

  1. Activez le composant.

Commande gcloud

Pour créer un cluster Dataproc incluant le composant Hudi, exécutez la commande avec l'option --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Remplacez les éléments suivants :

API REST

Le composant Hudi peut être installé via l'API Dataproc à l'aide de SoftwareConfig.Component dans le cadre d'une requête clusters.create.

Envoyer une tâche pour lire et écrire des tables Hudi

Après avoir créé un cluster avec le composant Hudi, vous pouvez envoyer des tâches Spark et Hive qui lisent et écrivent des tables Hudi.

Exemple gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Exemple de tâche PySpark

Le fichier PySpark suivant crée, lit et écrit une table Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

La commande gcloud CLI suivante envoie l'exemple de fichier PySpark à Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Utiliser la CLI Hudi

La CLI Hudi se trouve à /usr/lib/hudi/cli/hudi-cli.sh sur le nœud maître du cluster Dataproc. Vous pouvez utiliser la CLI Hudi pour afficher les schémas, les commits et les statistiques des tables Hudi, et pour effectuer manuellement des opérations administratives, telles que des compactions planifiées (voir Utiliser hudi-cli).

Pour démarrer la CLI Hudi et vous connecter à une table Hudi:

  1. Connectez-vous en SSH au nœud maître.
  2. Exécutez /usr/lib/hudi/cli/hudi-cli.sh. L'invite de commande passe à hudi->.
  3. Exécutez connect --path gs://my-bucket/my-hudi-table.
  4. Exécutez des commandes telles que desc, qui décrit le schéma de la table, ou commits show, qui affiche l'historique des commits.
  5. Pour arrêter la session de la CLI, exécutez exit.

Pour en savoir plus