선택적 구성요소 기능을 사용하여 Dataproc 클러스터를 만들 때 Hudi과 같은 추가 구성요소를 설치할 수 있습니다. 이 페이지에서는 Dataproc 클러스터에 Hudi 구성요소를 선택적으로 설치하는 방법을 설명합니다.
Dataproc 클러스터에 설치하면 Apache Hudi 구성요소가 Hudi 라이브러리를 설치하고 클러스터에서 Hudi와 작동하도록 Spark 및 Hive를 구성합니다.
호환되는 Dataproc 이미지 버전
다음 Dataproc 이미지 버전으로 생성된 Dataproc 클러스터에 Hudi 구성요소를 설치할 수 있습니다.
Hudi 클러스터로 Dataproc을 만들면 다음 Spark 및 Hive 속성이 Hudi와 함께 작동하도록 구성됩니다.
구성 파일 |
속성 |
기본값 |
/etc/spark/conf/spark-defaults.conf |
spark.serializer |
org.apache.spark.serializer.KryoSerializer |
spark.sql.catalog.spark_catalog |
org.apache.spark.sql.hudi.catalog.HoodieCatalog |
spark.sql.extensions |
org.apache.spark.sql.hudi.HoodieSparkSessionExtension |
spark.driver.extraClassPath |
/usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar |
spark.executor.extraClassPath |
/usr/lib/hudi/lib/hudi-sparkspark-version-bundle_scala-version-hudi-version.jar |
/etc/hive/conf/hive-site.xml |
hive.aux.jars.path |
file:///usr/lib/hudi/lib/hudi-hadoop-mr-bundle-version.jar |
구성요소 설치
Dataproc 클러스터를 만들 때 Hudi 구성요소를 설치합니다.
Dataproc 이미지 출시 버전 페이지에는 각 Dataproc 이미지 출시에 포함된 Hudi 구성요소 버전이 나열되어 있습니다.
콘솔
- 구성요소를 사용 설정합니다.
- Google Cloud 콘솔에서 Dataproc 클러스터 만들기 페이지를 엽니다. 클러스터 설정 패널이 선택되었습니다.
- 구성요소 섹션에서 다음을 수행합니다.
- 선택적 구성요소 아래에서 Hudi 구성요소를 선택합니다.
gcloud 명령어
Hudi 구성요소가 포함된 Dataproc 클러스터를 만들려면 --optional-components
플래그와 함께 명령어를 사용합니다.
gcloud dataproc clusters create CLUSTER_NAME \
--region=REGION \
--optional-components=HUDI \
--image-version=DATAPROC_VERSION \
--properties=PROPERTIES
다음을 바꿉니다.
- CLUSTER_NAME: 필수. 새 클러스터 이름입니다.
- REGION: 필수. 클러스터 리전입니다.
- DATAPROC_IMAGE: 선택사항. 이 선택적 플래그를 사용하여 기본이 아닌 Dataproc 이미지 버전을 지정할 수 있습니다(기본 Dataproc 이미지 버전 참조).
- PROPERTIES: 선택사항. 이 선택적 플래그를 사용하여
hudi:
파일 프리픽스로 지정된 Hudi 구성요소 속성을 설정할 수 있습니다(예: properties=hudi:hoodie.datasource.write.table.type=COPY_ON_WRITE
).- Hudi 구성요소 버전 속성: 선택적으로
dataproc:hudi.version
속성을 지정할 수 있습니다. 참고: Hudi 구성요소 버전은 Dataproc 클러스터 이미지 버전과 호환되도록 Dataproc에서 설정됩니다. 이 속성을 설정한 경우 지정된 버전이 클러스터 이미지와 호환되지 않으면 클러스터 생성이 실패할 수 있습니다.
- Spark 및 Hive 속성: Dataproc은 클러스터가 생성될 때 Hudi 관련 Spark 및 Hive 속성을 설정합니다. 클러스터를 만들거나 작업을 제출할 때 이를 설정할 필요가 없습니다.
Hudi 테이블을 읽고 쓰는 작업 제출
Hudi 구성요소로 클러스터를 만든 후 Hudi 테이블을 읽고 쓰는 Spark 및 Hive 작업을 제출할 수 있습니다.
gcloud CLI
예시:
gcloud dataproc jobs submit pyspark \
--cluster=CLUSTER_NAME \
--region=region \
JOB_FILE \
-- JOB_ARGS
샘플 PySpark 작업
다음 PySpark 파일은 Hudi 테이블을 만들고, 읽고, 작성합니다.
#!/usr/bin/env python
"""Pyspark Hudi test."""
import sys
from pyspark.sql import SparkSession
def create_hudi_table(spark, table_name, table_uri):
"""Creates Hudi table."""
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
uuid string,
begin_lat double,
begin_lon double,
end_lat double,
end_lon double,
driver string,
rider string,
fare double,
partitionpath string,
ts long
) USING hudi
LOCATION '{table_uri}'
TBLPROPERTIES (
type = 'cow',
primaryKey = 'uuid',
preCombineField = 'ts'
)
PARTITIONED BY (partitionpath)
"""
spark.sql(create_table_sql)
def generate_test_dataframe(spark, n_rows):
"""Generates test dataframe with Hudi's built-in data generator."""
sc = spark.sparkContext
utils = sc._jvm.org.apache.hudi.QuickstartUtils
data_generator = utils.DataGenerator()
inserts = utils.convertToStringList(data_generator.generateInserts(n_rows))
return spark.read.json(sc.parallelize(inserts, 2))
def write_hudi_table(table_name, table_uri, df):
"""Writes Hudi table."""
hudi_options = {
'hoodie.table.name': table_name,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': table_name,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2,
}
df.write.format('hudi').options(**hudi_options).mode('append').save(table_uri)
def query_commit_history(spark, table_name, table_uri):
tmp_table = f'{table_name}_commit_history'
spark.read.format('hudi').load(table_uri).createOrReplaceTempView(tmp_table)
query = f"""
SELECT DISTINCT(_hoodie_commit_time)
FROM {tmp_table}
ORDER BY _hoodie_commit_time
DESC
"""
return spark.sql(query)
def read_hudi_table(spark, table_name, table_uri, commit_ts=''):
"""Reads Hudi table at the given commit timestamp."""
if commit_ts:
options = {'as.of.instant': commit_ts}
else:
options = {}
tmp_table = f'{table_name}_snapshot'
spark.read.format('hudi').options(**options).load(
table_uri
).createOrReplaceTempView(tmp_table)
query = f"""
SELECT _hoodie_commit_time, begin_lat, begin_lon,
driver, end_lat, end_lon, fare, partitionpath,
rider, ts, uuid
FROM {tmp_table}
"""
return spark.sql(query)
def main():
"""Test create write and read Hudi table."""
if len(sys.argv) != 3:
raise Exception('Expected arguments: <table_name> <table_uri>')
table_name = sys.argv[1]
table_uri = sys.argv[2]
app_name = f'pyspark-hudi-test_{table_name}'
print(f'Creating Spark session {app_name} ...')
spark = SparkSession.builder.appName(app_name).getOrCreate()
spark.sparkContext.setLogLevel('WARN')
print(f'Creating Hudi table {table_name} at {table_uri} ...')
create_hudi_table(spark, table_name, table_uri)
print('Generating test data batch 1...')
n_rows1 = 10
input_df1 = generate_test_dataframe(spark, n_rows1)
input_df1.show(truncate=False)
print('Writing Hudi table, batch 1 ...')
write_hudi_table(table_name, table_uri, input_df1)
print('Generating test data batch 2...')
n_rows2 = 10
input_df2 = generate_test_dataframe(spark, n_rows2)
input_df2.show(truncate=False)
print('Writing Hudi table, batch 2 ...')
write_hudi_table(table_name, table_uri, input_df2)
print('Querying commit history ...')
commits_df = query_commit_history(spark, table_name, table_uri)
commits_df.show(truncate=False)
previous_commit_ts = commits_df.collect()[1]._hoodie_commit_time
print('Reading the Hudi table snapshot at the latest commit ...')
output_df1 = read_hudi_table(spark, table_name, table_uri)
output_df1.show(truncate=False)
print(f'Reading the Hudi table snapshot at {previous_commit_ts} ...')
output_df2 = read_hudi_table(spark, table_name, table_uri, previous_commit_ts)
output_df2.show(truncate=False)
print('Stopping Spark session ...')
spark.stop()
print('All done')
main()
다음 gcloud CLI 명령어는 샘플 PySpark 파일을 Dataproc에 제출합니다.
gcloud dataproc jobs submit pyspark \
--cluster=CLUSTER_NAME \
gs://BUCKET_NAME/pyspark_hudi_example.py \
-- TABLE_NAME gs://BUCKET_NAME/TABLE_NAME
Hudi CLI 사용
Hudi CLI는 Dataproc 클러스터 마스터 노드의 /usr/lib/hudi/cli/hudi-cli.sh
에 있습니다. Hudi CLI를 사용하여 Hudi 테이블 스키마, 커밋, 통계를 보고 일정 압축과 같은 관리 작업을 수동으로 수행할 수 있습니다(hudi-cli 사용 참조).
Hudi CLI를 시작하고 Hudi 테이블에 연결하려면 다음 안내를 따르세요.
- 마스터 노드에 SSH를 통해 연결합니다.
/usr/lib/hudi/cli/hudi-cli.sh
를 실행합니다. 명령 프롬프트가 hudi->
로 변경됩니다.
connect --path gs://my-bucket/my-hudi-table
를 실행합니다.
- 테이블 스키마를 설명하는
desc
또는 커밋 기록을 표시하는 commits show
와 같은 명령어를 실행합니다.
- CLI 세션을 중지하려면
exit
을 실행합니다.