使用可选组件功能创建 Dataproc 集群时,您可以安装 Hudi 等其他组件。本页面介绍了如何选择在 Dataproc 集群上安装 Hudi 组件。
当安装在 Dataproc 集群上时,Apache Hudi 组件会安装 Hudi 库,并将集群中的 Spark 和 Hive 配置为与 Hudi 搭配使用。
兼容的 Dataproc 映像版本
您可以在使用以下 Dataproc 映像版本创建的 Dataproc 集群上安装 Hudi 组件:
当您创建 Dataproc with Hudi 集群时,以下 Spark 和 Hive 属性将配置为与 Hudi 搭配使用。
配置文件 |
属性 |
默认值 |
/etc/spark/conf/spark-defaults.conf |
spark.serializer |
org.apache.spark.serializer.KryoSerializer |
spark.sql.catalog.spark_catalog |
org.apache.spark.sql.hudi.catalog.HoodieCatalog |
spark.sql.extensions |
org.apache.spark.sql.hudi.HoodieSparkSessionExtension |
spark.driver.extraClassPath |
/usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar |
spark.executor.extraClassPath |
/usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar |
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar |
安装组件
创建 Dataproc 集群时,安装 Hudi 组件。
Dataproc 映像发布版本页面列出了每个 Dataproc 映像版本中包含的 Hudi 组件版本。
控制台
- 启用组件。
- 在 Google Cloud 控制台中,打开 Dataproc 创建集群页面。已选择设置集群面板。
- 在组件部分中执行以下操作:
gcloud 命令
如需创建包含 Hudi 组件的 Dataproc 集群,请使用带有 --optional-components
标志的命令。
gcloud dataproc clusters create CLUSTER_NAME \
--region=REGION \
--optional-components=HUDI \
--image-version=DATAPROC_VERSION \
--properties=PROPERTIES
替换以下内容:
- CLUSTER_NAME:必填。新集群名称。
- REGION:必填。集群区域。
- DATAPROC_IMAGE:可选。您可以使用此可选此标志来指定非默认 Dataproc 映像版本(请参阅默认 Dataproc 映像版本)。
- PROPERTIES:可选。您可以使用此可选标志来设置 Hudi 组件属性,这些属性使用
hudi:
文件前缀指定。
properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE
- Hudi 组件版本属性:您可以选择指定
dataproc:hudi.version
属性。注意:Dataproc 将 Hudi 组件版本设置为与 Dataproc 集群映像版本兼容。如果您设置了此属性,那么当指定的版本与集群映像不兼容时,集群创建可能会失败。
- Spark 和 Hive 属性:创建集群时,Dataproc 会设置与 Hudi 相关的 Spark 和 Hive 属性。创建集群或提交作业时无需设置它们。
提交作业以读取和写入 Hudi 表
使用 Hudi 组件创建集群后,您可以提交读取和写入 Hudi 表的 Spark 和 Hive 作业。
gcloud CLI
示例:
gcloud dataproc jobs submit pyspark \
--cluster=CLUSTER_NAME \
--region=region \
--job-file=JOB_FILE \
-- JOB_ARGS
PySpark 作业示例
以下 PySpark 文件会创建、读取和写入 Hudi 表。
#!/usr/bin/env python
"""Pyspark Hudi test."""
import sys
from pyspark.sql import SparkSession
def create_hudi_table(spark, table_name, table_uri):
"""Creates Hudi table."""
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
uuid string,
begin_lat double,
begin_lon double,
end_lat double,
end_lon double,
driver string,
rider string,
fare double,
partitionpath string,
ts long
) USING hudi
LOCATION '{table_uri}'
TBLPROPERTIES (
type = 'cow',
primaryKey = 'uuid',
preCombineField = 'ts'
)
PARTITIONED BY (partitionpath)
"""
spark.sql(create_table_sql)
def generate_test_dataframe(spark, n_rows):
"""Generates test dataframe with Hudi's built-in data generator."""
sc = spark.sparkContext
utils = sc._jvm.org.apache.hudi.QuickstartUtils
data_generator = utils.DataGenerator()
inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
return spark.read.json(sc.parallelize(inserts, 2))
def write_hudi_table(table_name, table_uri, df):
"""Writes Hudi table."""
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
}
df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)
def query_commit_history(spark, table_name, table_uri):
tmp_table = f'{table_name}_commit_history'
spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
query = f"""
SELECT DISTINCT(_hoodie_commit_time)
FROM {tmp_table}
ORDER BY _hoodie_commit_time
DESC
"""
return spark.sql(query)
def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
"""Reads Hudi table at the given commit timestamp."""
if commit_ts:
options = {'as.of.instant': commit_ts}
else:
options = {}
tmp_table = f'{table_name}_snapshot'
spark.read.format('hudi').options(**options).load(
table_uri
).createOrReplaceTempView(tmp_table)
query = f"""
SELECT _hoodie_commit_time, begin_lat, begin_lon,
driver, end_lat, end_lon, fare, partitionpath,
rider, ts, uuid
FROM {tmp_table}
"""
return spark.sql(query)
def main():
"""Test create write and read Hudi table."""
if len(sys.argv) != 3:
raise Exception('Expected arguments: <table_name> <table_uri>')
table_name = sys.argv[1]
table_uri = sys.argv[2]
app_name = f'pyspark-hudi-test_{table_name}'
print(f'Creating Spark session {app_name} ...')
spark = SparkSession.builder.appName(app_name).getOrCreate()
spark.sparkContext.setLogLevel('WARN')
print(f'Creating Hudi table {table_name} at {table_uri} ...')
create_hudi_table(spark, table_name, table_uri)
print('Generating test data batch 1...')
n_rows1 = 10
input_df1 = generate_test_dataframe(spark, n_rows1)
input_df1.show(truncate=False)
print('Writing Hudi table, batch 1 ...')
write_hudi_table(table_name, table_uri, input_df1)
print('Generating test data batch 2...')
n_rows2 = 10
input_df2 = generate_test_dataframe(spark, n_rows2)
input_df2.show(truncate=False)
print('Writing Hudi table, batch 2 ...')
write_hudi_table(table_name, table_uri, input_df2)
print('Querying commit history ...')
commits_df = query_commit_history(spark, table_name, table_uri)
commits_df.show(truncate=False)
previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time
print('Reading the Hudi table snapshot at the latest commit ...')
output_df1 = read_hudi_table(spark, table_name, table_uri)
output_df1.show(truncate=False)
print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
output_df2.show(truncate=False)
print('Stopping Spark session ...')
spark.stop()
print('All done')
main()
以下 gcloud CLI 命令会将示例 PySpark 文件提交到 Dataproc。
gcloud dataproc jobs submit pyspark \
--cluster=CLUSTER_NAME \
gs://BUCKET_NAME/pyspark_hudi_example.py \
-- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME
使用 Hudi CLI
Hudi CLI 位于 Dataproc 集群主服务器节点上的 /usr/lib/hudi/cli/hudi-cli.sh
。您可以使用 Hudi CLI 查看 Hudi 表架构、提交内容和统计信息,以及手动执行管理操作,如安排压缩(请参阅使用 hudi-cli)。
如需启动 Hudi CLI 并连接到 Hudi 表,请执行以下操作:
- 通过 SSH 连接到主节点。
- 运行
/usr/lib/hudi/cli/hudi-cli.sh
。命令提示符会更改为 hudi->
。
- 运行
connect --path gs://my-bucket/my-hudi-table
。
- 运行命令,例如描述表架构的
desc
或显示提交历史记录的 commits show
。
- 如需停止 CLI 会话,请运行
exit
。