Componente Hudi opcional de Dataproc

Puedes instalar componentes adicionales, como Hudi, cuando creas un Dataproc. clúster mediante Componentes opcionales . En esta página, se describe cómo instalar opcionalmente el componente Hudi. en un clúster de Dataproc.

Cuando se instala en un clúster de Dataproc, Apache Hudi Este componente instala bibliotecas Hudi y configura Spark y Hive en el clúster. de trabajar con Hudi.

Versiones de imágenes compatibles de Dataproc

Puedes instalar el componente Hudi en los clústeres de Dataproc que se crearon con el siguientes versiones de imágenes de Dataproc:

Cuando creas un clúster de Dataproc con Hudi, los siguientes elementos de Spark y Hive están configuradas para funcionar con Hudi.

Archivo de configuración Propiedad Valor predeterminado
/etc/spark/conf/spark-defaults.conf spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.catalog.spark_catalog org.apache.spark.sql.hudi.catalog.HoodieCatalog
spark.sql.extensions org.apache.spark.sql.hudi.HoodieSparkSessionExtension
spark.driver.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
spark.executor.extraClassPath /usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar
/etc/hive/conf/hive-site.xml hive.aux.jars.path file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar

Instala el componente

Instala el componente Hudi cuando crees un clúster de Dataproc.

Las páginas de la versión de actualización de la imagen de Dataproc Enumera la versión del componente de Hudi incluida en cada versión de la imagen de Dataproc.

Console

  1. Habilita el componente.
    • En la consola de Google Cloud, abre Dataproc Crea un clúster . Se selecciona el panel Configurar clúster.
    • En la sección Componentes, sigue estos pasos:
      • En Componentes opcionales, selecciona la Hudi.

Comando de gcloud

Para crear un clúster de Dataproc que incluya el componente Hudi, usa el comando con la marca --optional-components.

gcloud dataproc clusters create CLUSTER_NAME \
    --region=REGION \
    --optional-components=HUDI \
    --image-version=DATAPROC_VERSION \
    --properties=PROPERTIES

Reemplaza lo siguiente:

  • CLUSTER_NAME: Obligatorio. El nombre del clúster nuevo.
  • REGION: Obligatorio. La región del clúster.
  • DATAPROC_IMAGE: Opcional Puedes usar esta marca opcional para especifica una versión de imagen de Dataproc no predeterminada (consulta Versión predeterminada de la imagen de Dataproc).
  • PROPERTIES: Opcional Puedes usar esta marca opcional para Establece las propiedades de los componentes de Hue. que se especifican con el Prefijo de archivo hudi: Ejemplo: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE).
    • Propiedad de la versión del componente Hudi: Opcionalmente, puedes especificar la Propiedad dataproc:hudi.version. Nota: La versión del componente Hudi se establece por Dataproc para que sea compatible con la versión de imagen del clúster de Dataproc. Si configura esta propiedad, la creación del clúster puede fallar si la versión especificada no es compatible con la imagen del clúster.
    • Propiedades de Spark y Hive: conjuntos de Dataproc Spark y Hive relacionados con Hadoop las propiedades cuando se crea el clúster. No es necesario que las establezcas cuando creas el clúster o envías trabajos.

API de REST

El componente Hudi se puede instalar a través de la API de Dataproc con SoftwareConfig.Component como parte de una clusters.create para cada solicitud.

Envía un trabajo para leer y escribir tablas Hudi

Después de crear un clúster con el componente Hudi, puedes enviar trabajos de Spark y Hive que lean y escriban tablas de Hudi.

Ejemplo de gcloud CLI:

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    --region=region \
    JOB_FILE \
    -- JOB_ARGS

Trabajo de muestra de PySpark

El siguiente archivo de PySpark crea, lee y escribe una tabla Hudi.

#!/usr/bin/env python
"""Pyspark Hudi test."""

import sys
from pyspark.sql import SparkSession


def create_hudi_table(spark, table_name, table_uri):
  """Creates Hudi table."""
  create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
      uuid string,
      begin_lat double,
      begin_lon double,
      end_lat double,
      end_lon double,
      driver string,
      rider string,
      fare double,
      partitionpath string,
      ts long
    ) USING hudi
    LOCATION '{table_uri}'
    TBLPROPERTIES (
      type = 'cow',
      primaryKey = 'uuid',
      preCombineField = 'ts'
    )
    PARTITIONED BY (partitionpath)
  """
  spark.sql(create_table_sql)


def generate_test_dataframe(spark, n_rows):
  """Generates test dataframe with Hudi's built-in data generator."""
  sc = spark.sparkContext
  utils = sc._jvm.org.apache.hudi.QuickstartUtils
  data_generator = utils.DataGenerator()
  inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
  return spark.read.json(sc.parallelize(inserts, 2))


def write_hudi_table(table_name, table_uri, df):
  """Writes Hudi table."""
  hudi_options = {
      'hoodie.table.name': table_name,
      'hoodie.datasource.write.recordkey.field': 'uuid',
      'hoodie.datasource.write.partitionpath.field': 'partitionpath',
      'hoodie.datasource.write.table.name': table_name,
      'hoodie.datasource.write.operation': 'upsert',
      'hoodie.datasource.write.precombine.field': 'ts',
      'hoodie.upsert.shuffle.parallelism': 2,
      'hoodie.insert.shuffle.parallelism': 2,
  }
  df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)


def query_commit_history(spark, table_name, table_uri):
  tmp_table = f'{table_name}_commit_history'
  spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT DISTINCT(_hoodie_commit_time)
    FROM {tmp_table}
    ORDER BY _hoodie_commit_time
    DESC
  """
  return spark.sql(query)


def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
  """Reads Hudi table at the given commit timestamp."""
  if commit_ts:
    options = {'as.of.instant': commit_ts}
  else:
    options = {}
  tmp_table = f'{table_name}_snapshot'
  spark.read.format('hudi').options(**options).load(
      table_uri
  ).createOrReplaceTempView(tmp_table)
  query = f"""
    SELECT _hoodie_commit_time, begin_lat, begin_lon,
        driver, end_lat, end_lon, fare, partitionpath,
        rider, ts, uuid
    FROM {tmp_table}
  """
  return spark.sql(query)


def main():
  """Test create write and read Hudi table."""
  if len(sys.argv) != 3:
    raise Exception('Expected arguments: <table_name> <table_uri>')

  table_name = sys.argv[1]
  table_uri = sys.argv[2]

  app_name = f'pyspark-hudi-test_{table_name}'
  print(f'Creating Spark session {app_name} ...')
  spark = SparkSession.builder.appName(app_name).getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

  print(f'Creating Hudi table {table_name} at {table_uri} ...')
  create_hudi_table(spark, table_name, table_uri)

  print('Generating test data batch 1...')
  n_rows1 = 10
  input_df1 = generate_test_dataframe(spark, n_rows1)
  input_df1.show(truncate=False)

  print('Writing Hudi table, batch 1 ...')
  write_hudi_table(table_name, table_uri, input_df1)

  print('Generating test data batch 2...')
  n_rows2 = 10
  input_df2 = generate_test_dataframe(spark, n_rows2)
  input_df2.show(truncate=False)

  print('Writing Hudi table, batch 2 ...')
  write_hudi_table(table_name, table_uri, input_df2)

  print('Querying commit history ...')
  commits_df = query_commit_history(spark, table_name, table_uri)
  commits_df.show(truncate=False)
  previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time

  print('Reading the Hudi table snapshot at the latest commit ...')
  output_df1 = read_hudi_table(spark, table_name, table_uri)
  output_df1.show(truncate=False)

  print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
  output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
  output_df2.show(truncate=False)

  print('Stopping Spark session ...')
  spark.stop()

  print('All done')


main()

El siguiente comando de gcloud CLI envía el archivo de PySpark de muestra a Dataproc.

gcloud dataproc jobs submit pyspark \
    --cluster=CLUSTER_NAME \
    gs://BUCKET_NAME/pyspark_hudi_example.py \
    -- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME

Usa la CLI de Hudi

La CLI de Hudi está ubicada en /usr/lib/hudi/cli/hudi-cli.sh, en el Nodo instancia principal del clúster de Dataproc. Puedes usar la CLI de Hudi para ver esquemas, confirmaciones y estadísticas de tablas de Hudi y ejecutar manualmente operaciones administrativas, como compactaciones de programas (consulta Usa hudi-cli).

Para iniciar la CLI de Hudi y conectarte a una tabla de Hudi, sigue estos pasos:

  1. Establece una conexión SSH al nodo principal.
  2. Ejecuta /usr/lib/hudi/cli/hudi-cli.sh. El símbolo del sistema cambia a hudi->.
  3. Ejecuta connect --path gs://my-bucket/my-hudi-table.
  4. Ejecuta comandos, como desc, que describe el esquema de la tabla, o commits show. que muestra el historial de confirmaciones.
  5. Para detener la sesión de la CLI, ejecuta exit.

Más información