このページでは、Spark を実行する Dataproc クラスタを作成する方法について説明します。
概要
Dataproc Metastore サービス インスタンスが Dataplex レイクに関連付けられた後にクラスタを作成して、クラスタが Hive Metastore エンドポイントを利用して Dataplex メタデータにアクセスできるようにします。
Dataplex 内で管理されるメタデータには、Hive Metastore などの標準インターフェースを使用してアクセスし、Spark クエリを強化できます。クエリは Dataproc クラスタで実行されます。
Parquet データの場合は、実行エラーを避けるために、Spark プロパティ spark.sql.hive.convertMetastoreParquet
を false
に設定します。詳細
Dataproc クラスタを作成する
次のコマンドを実行して Dataproc クラスタを作成します。その際、Dataplex レイクに関連付けられている Dataproc Metastore サービスを指定します。
GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(endpointUri)" | cut -c9-)
WHDIR=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")
METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
--location LOCATION \
--format "value(hiveMetastoreConfig.version)")
# This command creates a cluster with default settings. You can customize
# it as needed. The --optional-components, --initialization-actions,
# --metadata and --properties flags are used to to connect with
# the associated metastore.
gcloud dataproc clusters create CLUSTER_ID \
--project PROJECT \
--region LOCATION \
--scopes "https://www.googleapis.com/auth/cloud-platform" \
--image-version 2.0-debian10 \
--optional-components=DOCKER \
--initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
--metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
--properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"
メタデータを調べる
DQL クエリを実行してメタデータを調べ、Spark クエリを実行してデータをクエリします。
始める前に
Dataproc クラスタのプライマリ ノードで SSH セッションを開きます。
VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \ --project PROJECT \ --region LOCATION \ --format "value(config.gceClusterConfig.zoneUri)") gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
プライマリ ノードのコマンド プロンプトで、新しい Python REPL を開きます。
python3
データベースの一覧取得
レイク内の各 Dataplex ゾーンは、メタストア データベースにマッピングされます。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW DATABASES")
df.show()
テーブルのリスト表示
いずれかのゾーンのテーブルを一覧表示します。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("SHOW TABLES IN ZONE_ID")
df.show()
データのクエリ
いずれかのテーブルのデータに対してクエリを実行します。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
# Modify the SQL statement to retrieve or filter on table columns.
df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
df.show()
メタデータにテーブルとパーティションを作成する
DDL クエリを実行して、Apache Spark を使用して Dataplex メタデータ内にテーブルとパーティションを作成します。
サポートされているデータ型、ファイル形式、行形式の詳細については、サポートされている値をご覧ください。
始める前に
テーブルを作成する前に、基になるデータを含む Cloud Storage バケットにマッピングする Dataplex アセットを作成します。詳しくは、アセットを追加するをご覧ください。
テーブルを作成する
Parquet、ORC、AVRO、CSV、JSON の各テーブルがサポートされています。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
df.show()
テーブルの変更
Dataplex では、テーブルのロケーションの変更や、テーブルのパーティション列の編集はできません。テーブルを変更しても、userManaged が自動的に true
に設定されることはありません。
Spark SQL では、テーブルの名前変更、列の追加、テーブルのファイル形式の設定ができます。
テーブルの名前を変更する
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
df.show()
列を追加する
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
df.show()
ファイル形式を設定する
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
df.show()
テーブルを削除する
Dataplex のメタデータ API からテーブルを削除しても、基になる Cloud Storage のデータは削除されません。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
df.show()
パーティションを追加する
Dataplex では、一度作成したパーティションを変更できません。ただし、パーティションは削除される可能性があります。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
df.show()
上記の例のように、同じパーティション キーと異なるパーティション値の複数のパーティションを追加できます。
パーティションを削除する
パーティションを削除するには、次のコマンドを実行します。
import pyspark.sql as sql
session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()
df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
df.show()
Iceberg テーブルに対してクエリを実行する
Apache Spark を使用して Iceberg テーブルをクエリできます。
始める前に
Iceberg で Spark SQL セッションを設定します。
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Iceberg テーブルを作成する
Iceberg テーブルを作成するには、次のコマンドを実行します。
CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');
Iceberg のスナップショットと履歴を確認する
Apache Spark を使用して、Iceberg テーブルのスナップショットと履歴を取得できます。
始める前に
Iceberg のサポートとともに PySpark セッションを設定します。
pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
spark.sql.catalog.spark_catalog.type=hive --conf
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
spark.sql.catalog.local.type=hadoop --conf
spark.sql.catalog.local.warehouse=$PWD/warehouse
Iceberg のテーブルの履歴を取得する
Iceberg テーブルの履歴を取得するには、次のコマンドを実行します。
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)
Iceberg テーブルのスナップショットを取得する
Iceberg テーブルのスナップショットを取得するには、次のコマンドを実行します。
spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)
サポートされるデータ型とファイル形式
サポートされているデータ型は次のように定義されています。
データ型 | 値 |
---|---|
基本ロール |
|
配列 | ARRAY < DATA_TYPE > |
構造 | STRUCT < COLUMN : DATA_TYPE > |
サポートされているファイル形式は次のとおりです。
TEXTFILE
ORC
PARQUET
AVRO
JSONFILE
ファイル形式の詳細については、ストレージ形式をご覧ください。
サポートされている行形式は次のように定義されています。
- 期限 [CHAR によりフィールドが終了した後]
- SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]
次のステップ
- メタデータの管理について詳細を確認する。