Dataproc で BigQuery Metastore を使用する

このドキュメントでは、 Compute Engine 上の Dataproc で BigQuery Metastore を使用する方法について説明します。この接続により、Apache Spark などのオープンソース ソフトウェア エンジン間で機能する単一の共有メタストアが提供されます。

始める前に

  1. Google Cloud プロジェクトで課金を有効にします。詳しくは、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
  2. BigQuery API と Dataproc API を有効にします。

    API を有効にする

  3. 省略可: BigQuery メタストアの仕組みと、使用する理由を理解します。

必要なロール

メタデータ ストアとして BigQuery Metastore で Spark と Dataproc を使用するのに必要な権限を取得するには、管理者に次の IAM ロールを付与するよう依頼します。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

全般的なワークフロー

BigQuery メタストアで Compute Engine で Dataproc を使用する一般的な手順は次のとおりです。

  1. Dataproc クラスタを作成するか、既存のクラスタを構成します。
  2. 任意のオープンソース ソフトウェア エンジン(Spark など)に接続します。
  3. JAR ファイルを使用して、Apache Iceberg カタログ プラグインをクラスタにインストールします。
  4. 使用しているオープンソース ソフトウェア エンジンに応じて、必要に応じて BigQuery Metastore リソースを作成して管理します。
  5. BigQuery で BigQuery メタストア リソースにアクセスして使用します。

BigQuery Metastore を Spark に接続する

次の手順では、インタラクティブな Spark SQL を使用して Dataproc を BigQuery Metastore に接続する方法を示します。

Iceberg カタログ プラグインをダウンロードする

BigQuery メタストアを Dataproc と Spark に接続するには、BigQuery メタストア Iceberg カタログ プラグイン jar ファイルを使用する必要があります。

このファイルは、Dataproc イメージ バージョン 2.2 にデフォルトで含まれています。Dataproc クラスタがインターネットに直接アクセスできない場合は、プラグインをダウンロードして、Dataproc クラスタがアクセスできる Cloud Storage バケットにアップロードする必要があります。

BigQuery Metastore Apache Iceberg カタログ プラグインをダウンロードします

Dataproc クラスタを構成する

BigQuery Metastore に接続する前に、Dataproc クラスタを設定する必要があります。

これを行うには、新しいクラスタを作成するか、既存のクラスタを使用します。その後、このクラスタを使用してインタラクティブな Spark SQL を実行し、BigQuery メタストア リソースを管理します。

  • クラスタが作成されるリージョンのサブネットで、限定公開の Google アクセス(PGA)を有効にする必要があります。デフォルトでは、2.2(デフォルト)以降のイメージ バージョンで作成された Dataproc クラスタ VM には、内部 IP アドレスのみがあります。クラスタ VM が Google API と通信できるようにするには、クラスタが作成されたリージョンの default(またはユーザー指定のネットワーク名)ネットワーク サブネットで限定公開の Google アクセスを有効にします

  • このガイドの Zeppelin ウェブ インターフェースの例を実行する場合は、Zeppelin オプション コンポーネントが有効になっている Dataproc クラスタを使用または作成する必要があります。

新しいクラスタ

新しい Dataproc クラスタを作成するには、次の gcloud dataproc clusters create コマンドを実行します。この構成には、BigQuery メタストアを使用するのに必要な設定が含まれています。

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

次のように置き換えます。

  • CLUSTER_NAME: Dataproc クラスタの名前。
  • PROJECT_ID: クラスタを作成する Google Cloud プロジェクトの ID。
  • LOCATION: クラスタを作成する Google Cloud リージョン。

既存のクラスタ

既存のクラスタを構成するには、次の Iceberg Spark ランタイムをクラスタに追加します。

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2

ランタイムは、次のいずれかの方法で追加できます。

Spark ジョブの送信

Spark ジョブを送信するには、次のいずれかの方法を使用します。

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

次のように置き換えます。

  • PROJECT_ID: Dataproc クラスタを含む Google Cloud プロジェクトの ID。
  • CLUSTER_NAME: Spark SQL ジョブの実行に使用している Dataproc クラスタの名前。
  • REGION: クラスタが配置されている Compute Engine のリージョン
  • LOCATION: BigQuery リソースのロケーション。
  • CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。
  • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。
  • SPARK_SQL_COMMAND: 実行する Spark SQL クエリ。このクエリには、リソースを作成するコマンドを含めます。たとえば、名前空間とテーブルを作成します。

インタラクティブなスパーク

Spark に接続してカタログ プラグインをインストールする

BigQuery Metastore のカタログ プラグインをインストールするには、SSH を使用して Dataproc クラスタに接続します。

  1. Google Cloud コンソールで、[VM インスタンス] ページに移動します。
  2. Dataproc VM インスタンスに接続するには、仮想マシン インスタンスのリストで [SSH] をクリックします。出力は次のようになります。

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. ターミナルで、次の BigQuery メタストアの初期化コマンドを実行します。

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    次のように置き換えます。

    • CATALOG_NAME: SQL ジョブで使用している Spark カタログの名前。
    • PROJECT_ID: Spark カタログがリンクされる BigQuery Metastore カタログの Google Cloud プロジェクト ID。
    • LOCATION: BigQuery メタストアの Google Cloud のロケーション。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。

    クラスタに正常に接続すると、Spark ターミナルに spark-sql プロンプトが表示されます。

    spark-sql (default)>
    

BigQuery Metastore リソースを管理する

これで BigQuery メタストアに接続されました。BigQuery Metastore に保存されているメタデータに基づいて、既存のリソースを表示したり、新しいリソースを作成したりできます。

たとえば、インタラクティブな Spark SQL セッションで次のコマンドを実行して、Iceberg Namespace とテーブルを作成します。

  • カスタムの Iceberg カタログを使用します。

    USE `CATALOG_NAME`;
  • Namespace を作成します。

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • 作成した Namespace を使用します。

    USE NAMESPACE_NAME;
  • Iceberg テーブルを作成します。

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • 表の行を挿入します。

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • テーブルの列を追加します。

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • テーブルのメタデータを表示する:

    DESCRIBE EXTENDED TABLE_NAME;
  • Namespace 内のテーブルを一覧表示します。

    SHOW TABLES;

Zeppelin ノートブック

  1. Google Cloud コンソールで、[Dataproc Clusters] ページに移動します。

    Dataproc クラスタに移動

  2. 使用するクラスタの名前をクリックします。

    [クラスタの詳細] ページが開きます。

  3. ナビゲーション メニューで [Web interfaces](ウェブ インターフェース)をクリックします。

  4. [コンポーネント ゲートウェイ] で [Zeppelin] をクリックします。Zeppelin ノートブック ページが開きます。

  5. ナビゲーション メニューで [ノートブック]、[+ 新しいメモを作成] の順にクリックします。

  6. ダイアログでノートブック名を入力します。デフォルトのインタープリタとして [Spark] が選択されたままにします。

  7. [作成] をクリックします。新しいノートブックが作成されます。

  8. ノートブックで、設定メニューをクリックし、[インタープリタ] をクリックします。

  9. [Search interpreters] フィールドで「Spark」を検索します。

  10. [編集] をクリックします。

  11. [Spark.jars] フィールドに、Spark JAR の URI を入力します。

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
    
  12. [保存] をクリックします。

  13. [OK] をクリックします。

  14. 次の PySpark コードを Zeppelin ノートブックにコピーします。

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    次のように置き換えます。

    • CATALOG_NAME: SQL ジョブに使用する Spark カタログの名前。
    • PROJECT_ID: Dataproc クラスタを含む Google Cloud プロジェクトの ID。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスを含む Cloud Storage フォルダ。この値は gs:// で始まります。
    • NAMESPACE_NAME: Spark テーブルを参照する Namespace 名。
    • WAREHOUSE_DIRECTORY: データ ウェアハウスが保存されている Cloud Storage フォルダの URI。
    • TABLE_NAME: Spark テーブルのテーブル名。
  15. 実行アイコンをクリックするか、Shift-Enter キーを押してコードを実行します。ジョブが完了すると、ステータス メッセージに「Spark Job Finished」と表示され、出力にテーブルの内容が表示されます。

次のステップ