Apache Spark のメタデータにアクセスする

このページでは、Spark を実行する Dataproc クラスタを作成する方法について説明します。

概要

Dataproc Metastore サービス インスタンスが Dataplex レイクに関連付けられた後にクラスタを作成して、クラスタが Hive Metastore エンドポイントを利用して Dataplex メタデータにアクセスできるようにします。

Dataplex 内で管理されるメタデータには、Hive Metastore などの標準インターフェースを使用してアクセスし、Spark クエリを強化できます。クエリは Dataproc クラスタで実行されます。

Parquet データの場合は、実行エラーを避けるために、Spark プロパティ spark.sql.hive.convertMetastoreParquetfalse に設定します。詳細

Dataproc クラスタを作成する

次のコマンドを実行して Dataproc クラスタを作成します。その際、Dataplex レイクに関連付けられている Dataproc Metastore サービスを指定します。

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

メタデータを調べる

DQL クエリを実行してメタデータを調べ、Spark クエリを実行してデータをクエリします。

始める前に

  1. Dataproc クラスタのプライマリ ノードで SSH セッションを開きます。

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. プライマリ ノードのコマンド プロンプトで、新しい Python REPL を開きます。

    python3
    

データベースの一覧取得

レイク内の各 Dataplex ゾーンは、メタストア データベースにマッピングされます。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

テーブルのリスト表示

いずれかのゾーンのテーブルを一覧表示します。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

データのクエリ

いずれかのテーブルのデータに対してクエリを実行します。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

メタデータにテーブルとパーティションを作成する

DDL クエリを実行して、Apache Spark を使用して Dataplex メタデータ内にテーブルとパーティションを作成します。

サポートされているデータ型、ファイル形式、行形式の詳細については、サポートされている値をご覧ください。

始める前に

テーブルを作成する前に、基になるデータを含む Cloud Storage バケットにマッピングする Dataplex アセットを作成します。詳しくは、アセットを追加するをご覧ください。

テーブルを作成する

Parquet、ORC、AVRO、CSV、JSON の各テーブルがサポートされています。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

テーブルの変更

Dataplex では、テーブルのロケーションの変更や、テーブルのパーティション列の編集はできません。テーブルを変更しても、userManaged が自動的に true に設定されることはありません。

Spark SQL では、テーブルの名前変更列の追加、テーブルのファイル形式の設定ができます。

テーブルの名前を変更する

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

列を追加する

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

ファイル形式を設定する

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

テーブルを削除する

Dataplex のメタデータ API からテーブルを削除しても、基になる Cloud Storage のデータは削除されません。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

パーティションを追加する

Dataplex では、一度作成したパーティションを変更できません。ただし、パーティションは削除される可能性があります。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

上記の例のように、同じパーティション キーと異なるパーティション値の複数のパーティションを追加できます。

パーティションを削除する

パーティションを削除するには、次のコマンドを実行します。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Iceberg テーブルに対してクエリを実行する

Apache Spark を使用して Iceberg テーブルをクエリできます。

始める前に

Iceberg で Spark SQL セッションを設定します。

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg テーブルを作成する

Iceberg テーブルを作成するには、次のコマンドを実行します。

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Iceberg のスナップショットと履歴を確認する

Apache Spark を使用して、Iceberg テーブルのスナップショットと履歴を取得できます。

始める前に

Iceberg のサポートとともに PySpark セッションを設定します。

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg のテーブルの履歴を取得する

Iceberg テーブルの履歴を取得するには、次のコマンドを実行します。

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Iceberg テーブルのスナップショットを取得する

Iceberg テーブルのスナップショットを取得するには、次のコマンドを実行します。

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

サポートされるデータ型とファイル形式

サポートされているデータ型は次のように定義されています。

データ型
基本ロール
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
配列 ARRAY < DATA_TYPE >
構造 STRUCT < COLUMN : DATA_TYPE >

サポートされているファイル形式は次のとおりです。

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

ファイル形式の詳細については、ストレージ形式をご覧ください。

サポートされている行形式は次のように定義されています。

  • 期限 [CHAR によりフィールドが終了した後]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

次のステップ