Composant Hudi facultatif de Dataproc

Vous pouvez installer des composants supplémentaires tels que Hudi lorsque vous créez un cluster Dataproc à l'aide de la fonctionnalité Composants facultatifs. Cette page explique comment installer le composant Hudi sur un cluster Dataproc (facultatif).

Lorsqu'il est installé sur un cluster Dataproc, le composant Apache Hudi installe les bibliothèques Hudi, et configure Spark et Hive dans le cluster afin qu'il fonctionne avec Hudi.

Versions d'image Dataproc compatibles

Vous pouvez installer le composant Hudi sur des clusters Dataproc créés avec les versions d'image Dataproc suivantes:

Lorsque vous créez un cluster Dataproc avec Hudi, les propriétés Spark et Hive suivantes sont configurées pour fonctionner avec Hudi.

Fichier de configuration Propriété Valeur par défaut
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Installer le composant

Installez le composant Hudi lorsque vous créez un cluster Dataproc.

Les pages des versions d'images Dataproc répertorient la version des composants Hudi incluses dans chaque version d'image Dataproc.

Console

  1. Activez le composant.
    • Dans la console Google Cloud, ouvrez la page Dataproc Créer un cluster. Le panneau Configurer le cluster est sélectionné.
    • Dans la section Composants :
      • Sous Composants facultatifs, sélectionnez le composant Hudi.

Commande gcloud

Pour créer un cluster Dataproc incluant le composant Hudi, exécutez la commande avec l'option --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Remplacez les éléments suivants :

  • CLUSTER_NAME : valeur obligatoire. Nom du nouveau cluster.
  • REGION : valeur obligatoire. Région du cluster.
  • DATAPROC_IMAGE : facultatif. Vous pouvez utiliser cette option facultative pour spécifier une version d'image Dataproc autre que celle par défaut (consultez la section Version de l'image Dataproc par défaut).
  • PROPERTIES : facultatif. Vous pouvez utiliser cet indicateur facultatif pour définir les propriétés des composants Hudi, qui sont spécifiées avec le préfixe de fichier hudi: (exemple: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Propriété de version du composant Hudi: vous pouvez éventuellement spécifier la propriété dataproc:hudi.version. Remarque:La version du composant Hudi est définie par Dataproc comme étant compatible avec la version de l'image du cluster Dataproc. Si vous définissez cette propriété, la création du cluster peut échouer si la version spécifiée n'est pas compatible avec l'image du cluster.
    • Propriétés Spark et Hive: Dataproc définit les propriétés Spark et Hive liées à Hudi lors de la création du cluster. Vous n'avez pas besoin de les définir lors de la création du cluster ou de l'envoi de tâches.

API REST

Le composant Hudi peut être installé via l'API Dataproc à l'aide de SoftwareConfig.Component dans le cadre d'une requête clusters.create.

Envoyer une tâche pour lire et écrire des tables Hudi

Après avoir créé un cluster avec le composant Hudi, vous pouvez envoyer des tâches Spark et Hive qui lisent et écrivent des tables Hudi.

Exemple pour gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Exemple de job PySpark

Le fichier PySpark suivant crée, lit et écrit une table Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

La commande gcloud CLI suivante envoie l'exemple de fichier PySpark à Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Utiliser la CLI Hudi

La CLI Hudi se trouve à l'emplacement /usr/lib/hudi/cli/hudi-cli.sh sur le nœud maître du cluster Dataproc. La CLI Hudi vous permet d'afficher les schémas, les commits et les statistiques des tables Hudi, ainsi que d'effectuer manuellement des opérations administratives, telles que la planification du compactage (voir Utiliser hudi-cli).

Pour démarrer la CLI Hudi et vous connecter à une table Hudi:

  1. Connectez-vous en SSH au nœud maître.
  2. Exécutez /usr/lib/hudi/cli/hudi-cli.sh. L'invite de commande devient hudi->.
  3. Exécutez connect --path gs://my-bucket/my-hudi-table.
  4. Exécutez des commandes telles que desc, qui décrit le schéma de la table, ou commits show, qui affiche l'historique des commits.
  5. Pour arrêter la session de CLI, exécutez exit.

Pour en savoir plus