创建 Dataproc 时,您可以安装 Hudi 等其他组件
使用
可选组件
功能。本页介绍了如何选择安装 Hudi 组件
Dataproc 集群上的资源。
安装在 Dataproc 集群上时,
Apache Hudi
组件安装 Hudi 库并在集群中配置 Spark 和 Hive
与 Hudi 合作。
兼容的 Dataproc 映像版本
您可以在使用
以下 Dataproc 映像版本:
使用 Hudi 集群创建 Dataproc 时,以下 Spark 和 Hive 代码
属性配置为使用 Hudi。
配置文件 |
属性 |
默认值 |
/etc/spark/conf/spark-defaults.conf |
spark.serializer |
org.apache.spark.serializer.KryoSerializer |
spark.sql.catalog.spark_catalog |
org.apache.spark.sql.hudi.catalog.HoodieCatalog |
spark.sql.extensions |
org.apache.spark.sql.hudi.HoodieSparkSessionExtension |
spark.driver.extraClassPath |
/usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar |
spark.executor.extraClassPath |
/usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar |
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar |
安装组件
创建 Dataproc 集群时安装 Hudi 组件。
Dataproc 映像发布版本页面
列出每个 Dataproc 映像版本中包含的 Hudi 组件版本。
控制台
- 启用组件。
- 在 Google Cloud 控制台中,打开 Dataproc
创建集群
页面。已选择设置集群面板。
- 在组件部分中执行以下操作:
gcloud 命令
要创建包含 Hudi 组件的 Dataproc 集群,
请使用带有 --optional-components
标志的命令。
gcloud dataproc clusters create CLUSTER_NAME \
--region=REGION \
--optional-components=HUDI \
--image-version=DATAPROC_VERSION \
--properties=PROPERTIES
替换以下内容:
- CLUSTER_NAME:必填。新集群名称。
- REGION:必填。集群区域。
- DATAPROC_IMAGE:可选。您可以使用以下可选标志
指定非默认的 Dataproc 映像版本(请参阅
默认 Dataproc 映像版本)。
- PROPERTIES:可选。您可以使用以下可选标志
设置 Hudi 组件属性,
使用
hudi:
文件前缀
示例:properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE
)。
提交作业以读取和写入 Hudi 表
使用 Hudi 组件创建集群后,
您可以提交读取和写入 Hudi 表的 Spark 和 Hive 作业。
gcloud CLI
示例:
gcloud dataproc jobs submit pyspark \
--cluster=CLUSTER_NAME \
--region=region \
JOB_FILE \
-- JOB_ARGS
示例 PySpark 作业
以下 PySpark 文件可创建、读取和写入 Hudi 表。
#!/usr/bin/env python
"""Pyspark Hudi test."""
import sys
from pyspark.sql import SparkSession
def create_hudi_table(spark, table_name, table_uri):
"""Creates Hudi table."""
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
uuid string,
begin_lat double,
begin_lon double,
end_lat double,
end_lon double,
driver string,
rider string,
fare double,
partitionpath string,
ts long
) USING hudi
LOCATION '{table_uri}'
TBLPROPERTIES (
type = 'cow',
primaryKey = 'uuid',
preCombineField = 'ts'
)
PARTITIONED BY (partitionpath)
"""
spark.sql(create_table_sql)
def generate_test_dataframe(spark, n_rows):
"""Generates test dataframe with Hudi's built-in data generator."""
sc = spark.sparkContext
utils = sc._jvm.org.apache.hudi.QuickstartUtils
data_generator = utils.DataGenerator()
inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
return spark.read.json(sc.parallelize(inserts, 2))
def write_hudi_table(table_name, table_uri, df):
"""Writes Hudi table."""
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
}
df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)
def query_commit_history(spark, table_name, table_uri):
tmp_table = f'{table_name}_commit_history'
spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
query = f"""
SELECT DISTINCT(_hoodie_commit_time)
FROM {tmp_table}
ORDER BY _hoodie_commit_time
DESC
"""
return spark.sql(query)
def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
"""Reads Hudi table at the given commit timestamp."""
if commit_ts:
options = {'as.of.instant': commit_ts}
else:
options = {}
tmp_table = f'{table_name}_snapshot'
spark.read.format('hudi').options(**options).load(
table_uri
).createOrReplaceTempView(tmp_table)
query = f"""
SELECT _hoodie_commit_time, begin_lat, begin_lon,
driver, end_lat, end_lon, fare, partitionpath,
rider, ts, uuid
FROM {tmp_table}
"""
return spark.sql(query)
def main():
"""Test create write and read Hudi table."""
if len(sys.argv) != 3:
raise Exception('Expected arguments: <table_name> <table_uri>')
table_name = sys.argv[1]
table_uri = sys.argv[2]
app_name = f'pyspark-hudi-test_{table_name}'
print(f'Creating Spark session {app_name} ...')
spark = SparkSession.builder.appName(app_name).getOrCreate()
spark.sparkContext.setLogLevel('WARN')
print(f'Creating Hudi table {table_name} at {table_uri} ...')
create_hudi_table(spark, table_name, table_uri)
print('Generating test data batch 1...')
n_rows1 = 10
input_df1 = generate_test_dataframe(spark, n_rows1)
input_df1.show(truncate=False)
print('Writing Hudi table, batch 1 ...')
write_hudi_table(table_name, table_uri, input_df1)
print('Generating test data batch 2...')
n_rows2 = 10
input_df2 = generate_test_dataframe(spark, n_rows2)
input_df2.show(truncate=False)
print('Writing Hudi table, batch 2 ...')
write_hudi_table(table_name, table_uri, input_df2)
print('Querying commit history ...')
commits_df = query_commit_history(spark, table_name, table_uri)
commits_df.show(truncate=False)
previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time
print('Reading the Hudi table snapshot at the latest commit ...')
output_df1 = read_hudi_table(spark, table_name, table_uri)
output_df1.show(truncate=False)
print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
output_df2.show(truncate=False)
print('Stopping Spark session ...')
spark.stop()
print('All done')
main()
以下 gcloud CLI 命令可提交示例 PySpark 文件
Dataproc。
gcloud dataproc jobs submit pyspark \
--cluster=CLUSTER_NAME \
gs://BUCKET_NAME/pyspark_hudi_example.py \
-- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME
使用 Hudi CLI
Hudi CLI 位于 /usr/lib/hudi/cli/hudi-cli.sh
Dataproc 集群主服务器节点。您可以使用 Hudi CLI
查看 Hudi 表架构、提交和统计信息,以及手动执行
管理操作,例如调度压缩(请参阅
使用 hudi-cli)。
如需启动 Hudi CLI 并连接到 Hudi 表格,请执行以下操作:
- 通过 SSH 连接到主节点。
- 运行
/usr/lib/hudi/cli/hudi-cli.sh
。命令提示符将更改为 hudi->
。
- 运行
connect --path gs://my-bucket/my-hudi-table
。
- 运行命令,例如描述表架构的
desc
或 commits show
。
显示提交历史记录
- 如需停止 CLI 会话,请运行
exit
。