Composant Hudi Dataproc facultatif

Vous pouvez installer des composants supplémentaires comme Hudi lorsque vous créez une instance Dataproc. à l'aide de la commande Composants facultatifs . Cette page explique comment installer le composant Hudi (facultatif) sur un cluster Dataproc.

Lorsqu'il est installé sur un cluster Dataproc, Apache Hudi installe les bibliothèques Hudi, et configure Spark et Hive dans le cluster pour travailler avec Hudi.

Versions d'images Dataproc compatibles

Vous pouvez installer le composant Hudi sur les clusters Dataproc créés avec la versions d'image Dataproc suivantes:

Lorsque vous créez un cluster Dataproc avec un cluster Hudi, les composants Spark et Hive suivants sont configurées pour fonctionner avec Hudi.

Fichier de configuration Propriété Valeur par défaut
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Installer le composant

Installez le composant Hudi lorsque vous créez un cluster Dataproc.

Pages des versions d'image Dataproc Listez la version des composants Hudi inclus dans chaque version d'image Dataproc.

Console

  1. Activez le composant.
    • Dans la console Google Cloud, ouvrez Dataproc Créer un cluster . Le panneau Configurer le cluster est sélectionné.
    • Dans la section Composants :
      • Sous Composants facultatifs, sélectionnez Composant Hudi.

Commande gcloud

Pour créer un cluster Dataproc incluant le composant Hudi, exécutez la commande avec l'option --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Remplacez les éléments suivants :

  • CLUSTER_NAME : valeur obligatoire. Nouveau nom du cluster.
  • REGION : valeur obligatoire. La région du cluster.
  • DATAPROC_IMAGE : facultatif. Vous pouvez utiliser cet indicateur facultatif pour spécifiez une version d'image Dataproc autre que celle par défaut (consultez Version par défaut de l'image Dataproc).
  • PROPERTIES : facultatif. Vous pouvez utiliser cet indicateur facultatif pour définir les propriétés du composant Hadoop ; spécifiés avec le paramètre Préfixe de fichier hudi: Exemple: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Propriété de version du composant Hudi: vous pouvez éventuellement spécifier le Propriété dataproc:hudi.version. Remarque:La version du composant Hudi est définie par Dataproc pour être compatible avec la version de l'image de cluster Dataproc. Si vous définissez cette propriété, la création du cluster peut échouer si la version spécifiée n'est pas compatible avec l'image du cluster.
    • Propriétés Spark et Hive: ensembles Dataproc Spark et Hive liés à Huudi lors de la création du cluster. Il n'est pas nécessaire de les configurer lors de la création du cluster ou de l'envoi de tâches.

API REST

Composant Hudi peuvent être installées via l'API Dataproc en utilisant SoftwareConfig.Component dans le cadre d'un clusters.create requête.

Envoyer un job pour lire et écrire des tables Hudi

Après avoir créé un cluster avec le composant Hudi, vous pouvez soumettre des jobs Spark et Hive qui lisent et écrivent des tables Hudi.

Exemple pour gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Exemple de job PySpark

Le fichier PySpark suivant crée, lit et écrit une table Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

La commande gcloud CLI suivante envoie l'exemple de fichier PySpark à Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Utiliser la CLI Hudi

La CLI Hudi se trouve à l'adresse /usr/lib/hudi/cli/hudi-cli.sh sur le Nœud maître de cluster Dataproc. Vous pouvez utiliser la CLI Hudi pour afficher les schémas, les commits et les statistiques de la table Hudi, et pour effectuer manuellement les opérations administratives, telles que les compactages de planification (voir Avec hudi-cli).

Pour démarrer la CLI Hudi et vous connecter à une table Hudi:

  1. Connectez-vous en SSH au nœud maître.
  2. Exécutez /usr/lib/hudi/cli/hudi-cli.sh. L'invite de commande passe à hudi->.
  3. Exécutez connect --path gs://my-bucket/my-hudi-table.
  4. Exécutez des commandes, telles que desc, qui décrit le schéma de la table, ou commits show. qui affiche l'historique des commits.
  5. Pour arrêter la session CLI, exécutez exit.

Pour en savoir plus