Apache Spark でメタデータにアクセスする

このページでは、Spark を実行する Dataproc クラスタを作成する方法について説明します。

概要

Dataproc Metastore サービス インスタンスが Dataplex レイクに関連付けられた後にクラスタを作成します。これにより、クラスタが Hive Metastore エンドポイントを使用して Dataplex メタデータにアクセスできるようになります。

Dataplex 内で管理されているメタデータには、Hive Metastore などの標準インターフェースを使用してアクセスし、Spark クエリを強化できます。クエリは Dataproc クラスタで実行されます。

Parquet データの場合は、Spark プロパティ spark.sql.hive.convertMetastoreParquetfalse に設定して、実行エラーを回避します。詳細

Dataproc クラスタを作成する

次のコマンドを実行して Dataproc クラスタを作成し、Dataplex レイクに関連付けられた Dataproc Metastore サービスを指定します。

  GRPC_ENDPOINT=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(endpointUri)" | cut -c9-)

  WHDIR=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.configOverrides.'hive.metastore.warehouse.dir')")

  METASTORE_VERSION=$(gcloud metastore services describe SERVICE_ID \
    --location LOCATION \
    --format "value(hiveMetastoreConfig.version)")

  # This command  creates a cluster with default settings. You can customize
  # it as needed. The --optional-components, --initialization-actions,
  # --metadata and --properties flags are used to to connect with
  # the associated metastore.
  gcloud dataproc clusters create CLUSTER_ID \
    --project PROJECT \
    --region LOCATION \
    --scopes "https://www.googleapis.com/auth/cloud-platform" \
    --image-version 2.0-debian10 \
    --optional-components=DOCKER \
    --initialization-actions "gs://metastore-init-actions/metastore-grpc-proxy/metastore-grpc-proxy.sh" \
    --metadata "proxy-uri=$GRPC_ENDPOINT,hive-version=$METASTORE_VERSION" \
    --properties "hive:hive.metastore.uris=thrift://localhost:9083,hive:hive.metastore.warehouse.dir=$WHDIR"

メタデータを探索する

DQL クエリを実行してメタデータを探索し、Spark クエリを実行してデータをクエリします。

始める前に

  1. Dataproc クラスタのプライマリ ノードで SSH セッションを開きます。

    VM_ZONE=$(gcloud dataproc clusters describe CLUSTER_ID \
      --project PROJECT \
      --region LOCATION \
      --format "value(config.gceClusterConfig.zoneUri)")
    gcloud compute ssh CLUSTER_ID-m --project PROJECT --zone $VM_ZONE
    
  2. プライマリ ノードのコマンド プロンプトで、新しい Python REPL を開きます。

    python3
    

データベースの一覧取得

レイク内の各 Dataplex ゾーンは、メタストア データベースにマッピングされます。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW DATABASES")
  df.show()

テーブルのリスト表示

いずれかのゾーン内のテーブルをリスト表示します。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("SHOW TABLES IN ZONE_ID")
  df.show()

データのクエリ

いずれかのテーブルのデータをクエリします。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  # Modify the SQL statement to retrieve or filter on table columns.
  df = session.sql("SELECT COLUMNS FROM ZONE_ID.TABLE_ID WHERE QUERY LIMIT 10")
  df.show()

メタデータにテーブルとパーティションを作成する

DDL クエリを実行して、Apache Spark を使用して Dataplex メタデータにテーブルとパーティションを作成します。

サポートされているデータ型、ファイル形式、行形式の詳細については、サポートされている値をご覧ください。

始める前に

テーブルを作成する前に、基になるデータを含む Cloud Storage バケットにマッピングする Dataplex アセットを作成します。詳しくは、アセットを追加するをご覧ください。

テーブルを作成する

Parquet、ORC、AVRO、CSV、JSON テーブルがサポートされています。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) PARTITIONED BY (COLUMN) STORED AS FILE_FORMAT ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'gs://MY_GCP_BUCKET/TABLE_LOCATION' TBLPROPERTIES('dataplex.entity.partition_style' = 'HIVE_COMPATIBLE')")
  df.show()

テーブルを変更する

Dataplex では、テーブルのロケーションを変更することや、テーブルのパーティション列を編集することはできません。テーブルを変更しても、userManaged が自動的に true に設定されることはありません。

Spark SQL では、テーブルの名前の変更列の追加、テーブルのファイル形式を設定することが可能です。

テーブルの名前を変更する

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE OLD_TABLE_NAME RENAME TO NEW_TABLE_NAME")
  df.show()

列を追加する

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME ADD COLUMN (COLUMN_NAME DATA_TYPE"))
  df.show()

ファイル形式を設定する

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE TABLE_NAME SET FILEFORMAT FILE_FORMAT")
  df.show()

テーブルを削除する

Dataplex のメタデータ API からテーブルを削除しても、Cloud Storage の基盤となるデータは削除されません。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("DROP TABLE ZONE_ID.TABLE_ID")
  df.show()

パーティションを追加する

Dataplex では、作成したパーティションを変更することはできません。ただし、パーティションは削除される可能性があります。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID ADD PARTITION (COLUMN1=VALUE1) PARTITION (COLUMN2=VALUE2)")
  df.show()

上記の例に示すように、同じパーティション キーと異なるパーティション値の複数のパーティションを追加できます。

パーティションを削除する

パーティションを削除するには、次のコマンドを実行します。

  import pyspark.sql as sql

  session = sql.SparkSession.builder.enableHiveSupport().getOrCreate()

  df = session.sql("ALTER TABLE ZONE_ID.TABLE_ID DROP PARTITION (COLUMN=VALUE)")
  df.show()

Iceberg テーブルをクエリする

Apache Spark を使用して Iceberg テーブルに対してクエリを実行できます。

始める前に

Iceberg を使用して Spark SQL セッションを設定する。

  spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg テーブルを作成する

Iceberg テーブルを作成するには、次のコマンドを実行します。

  CREATE TABLE ZONE_ID.TABLE_ID (COLUMNS DATA_TYPE) USING ICEBERG PARTITIONED BY (COLUMN) LOCATION 'gs://MY_GCP_BUCKET/TABLE_ID' TBLPROPERTIES ('write.format.default' = 'TABLE_FORMAT');

Iceberg のスナップショットと履歴を確認する

Apache Spark を使用して、Iceberg テーブルのスナップショットと履歴を取得できます。

始める前に

Iceberg をサポートする PySpark セッションを設定します。

  pyspark --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.1 --conf
  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf
  spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog --conf
  spark.sql.catalog.spark_catalog.type=hive --conf
  spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog --conf
  spark.sql.catalog.local.type=hadoop --conf
  spark.sql.catalog.local.warehouse=$PWD/warehouse

Iceberg テーブルの履歴を取得する

Iceberg テーブルの履歴を取得するには、次のコマンドを実行します。

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.history").show(truncate=False)

Iceberg テーブルのスナップショットを取得する

Iceberg テーブルのスナップショットを取得するには、次のコマンドを実行します。

  spark.read.format("iceberg").load("ZONE_ID.TABLE_ID.snapshots").show(truncate=False, vertical=True)

サポートされているデータ型とファイル形式

サポートされているデータ型は次のように定義されます。

データ型
基本ロール
  • TINYINT
  • SMALLINT
  • INT
  • BIGINT
  • BOOLEAN
  • FLOAT
  • DOUBLE
  • DOUBLE PRECISION
  • STRING
  • BINARY
  • TIMESTAMP
  • DECIMAL
  • DATE
配列 ARRAY < DATA_TYPE >
構造 STRUCT < COLUMN : DATA_TYPE >

サポートされているファイル形式は次のとおりです。

  • TEXTFILE
  • ORC
  • PARQUET
  • AVRO
  • JSONFILE

ファイル形式の詳細については、ストレージ形式をご覧ください。

サポートされている行形式は次のとおりです。

  • DELIMITED [FIELDS TERMINATED BY CHAR]
  • SERDE SERDE_NAME [WITH SERDEPROPERTIES (PROPERTY_NAME=PROPERTY_VALUE, PROPERTY_NAME=PROPERTY_VALUE, ...)]

次のステップ