Componente Hudi opcional de Dataproc

Puedes instalar componentes adicionales, como Hudi, al crear un clúster de Dataproc mediante la función Componentes opcionales. En esta página se describe cómo puedes instalar el componente Hudi en un clúster de Dataproc (opcional).

Cuando se instala en un clúster de Dataproc, el componente Apache Hudi instala las bibliotecas de Hudi y configura Spark y Hive en el clúster para que funcionen con Hudi.

Versiones de imagen de Dataproc compatibles

Puedes instalar el componente Hudi en clústeres de Dataproc creados con las siguientes versiones de imagen de Dataproc:

Cuando creas un clúster de Dataproc con Hudi, se configuran las siguientes propiedades de Spark y Hive para que funcionen con Hudi.

Archivo de configuración Propiedad Valor predeterminado
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Instalar el componente

Instala el componente Hudi al crear un clúster de Dataproc.

En las páginas de versiones de Dataproc se indica la versión del componente Hudi incluida en cada versión de Dataproc.

Consola

  1. Habilita el componente.
    • En la consola de Google Cloud , abre la página de Dataproc Crear un clúster. El panel Configurar clúster está seleccionado.
    • En la sección Componentes, haz lo siguiente:
      • En Componentes opcionales, selecciona el componente Hudi.

Comando gcloud

Para crear un clúster de Dataproc que incluya el componente Hudi, usa el comando con la marca --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Haz los cambios siguientes:

  • CLUSTER_NAME: obligatorio. El nuevo nombre del clúster.
  • REGION: obligatorio. La región del clúster.
  • DATAPROC_IMAGE: opcional. Puedes usar esta marca opcional para especificar una versión de imagen de Dataproc que no sea la predeterminada (consulta Versión de imagen de Dataproc predeterminada).
  • PROPERTIES: opcional. Puedes usar esta marca opcional para definir las propiedades de los componentes de Hudi, que se especifican con el prefijo de archivo hudi:. Por ejemplo, properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Propiedad de la versión del componente Hudi: puede especificar la propiedad dataproc:hudi.version. Nota: Dataproc define la versión del componente Hudi para que sea compatible con la versión de la imagen del clúster de Dataproc. Si define esta propiedad, es posible que no se pueda crear el clúster si la versión especificada no es compatible con la imagen del clúster.
    • Propiedades de Spark y Hive: Dataproc define las propiedades de Spark y Hive relacionadas con Hudi cuando se crea el clúster. No es necesario definirlos al crear el clúster o enviar trabajos.

API REST

El componente Hudi se puede instalar a través de la API de Dataproc mediante SoftwareConfig.Component como parte de una solicitud clusters.create.

Enviar un trabajo para leer y escribir tablas Hudi

Después de crear un clúster con el componente Hudi, puedes enviar tareas de Spark y Hive que lean y escriban tablas Hudi.

Ejemplo de gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Tarea de PySpark de ejemplo

El siguiente archivo de PySpark crea, lee y escribe una tabla Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

El siguiente comando de gcloud CLI envía el archivo PySpark de ejemplo a Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Usar la CLI de Hudi

La CLI de Hudi se encuentra en /usr/lib/hudi/cli/hudi-cli.sh en el nodo maestro del clúster de Dataproc. Puedes usar la CLI de Hudi para ver los esquemas, las confirmaciones y las estadísticas de las tablas de Hudi, así como para realizar manualmente operaciones administrativas, como programar compactaciones (consulta Usar hudi-cli).

Para iniciar la CLI de Hudi y conectarte a una tabla de Hudi, haz lo siguiente:

  1. Accede al nodo maestro mediante SSH.
  2. Ejecuta /usr/lib/hudi/cli/hudi-cli.sh. El símbolo del sistema cambia a hudi->.
  3. Ejecuta connect --path gs://my-bucket/my-hudi-table.
  4. Ejecuta comandos, como desc, que describe el esquema de la tabla, o commits show, que muestra el historial de confirmaciones.
  5. Para detener la sesión de la CLI, ejecuta exit.

Siguientes pasos