マニフェストを使用してオープン テーブル形式のクエリを実行する

このドキュメントでは、マニフェスト ファイルを使用して、Apache HudiDelta Lake などのオープン テーブル形式で保存されているデータをクエリする方法について説明します。

Hudi や Delta Lake などの一部のオープン テーブル形式では、現在の状態を 1 つ以上のマニフェスト ファイルとしてエクスポートします。マニフェスト ファイルには、テーブルを構成するデータファイルのリストが含まれています。BigQuery のマニフェスト サポートを使用すると、オープン テーブル形式で格納されたデータにクエリを実行して読み込むことができます。

始める前に

必要なロール

Hudi と Delta Lake のデータに基づいて BigLake テーブルをクエリするには、次のロールが必要です。

  • BigQuery Connection ユーザー(roles/bigquery.connectionUser
  • BigQuery データ閲覧者(roles/bigquery.dataViewer
  • BigQuery ユーザー(roles/bigquery.user

Hudi 外部テーブルをクエリすることもできますが、外部テーブルを BigLake にアップグレードすることをおすすめします。Hudi 外部テーブルをクエリするには、次のロールが必要です。

  • BigQuery データ閲覧者(roles/bigquery.dataViewer
  • BigQuery ユーザー(roles/bigquery.user
  • Storage オブジェクト閲覧者(roles/storage.objectViewer

権限に応じて、これらのロールを自身に付与するか、これらのロールを付与するよう管理者に依頼します。ロールの付与の詳細については、リソースに対して付与可能なロールの表示をご覧ください。

Spark BigLake テーブルのクエリに必要な権限については、「必要な権限」を開いて確認してください。

必要な権限

カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。

Hudi ワークロードをクエリする

Hudi データをクエリするには、次の操作を行います。

  1. Huudi のデータに基づいて外部テーブルを作成します。
  2. 外部テーブルを BigLake にアップグレードします

Hudi 外部テーブルを作成する

Hudi と BigQuery の同期ツールを使用してテーブルを同期するときに、use-bq-manifest-file フラグを有効にして、マニフェスト ファイルを使用する方法に移行します。このフラグを使用すると、BigQuery でサポートされている形式でマニフェスト ファイルがエクスポートされます。このファイルを使用して、--table パラメータに指定した名前で外部テーブルが作成されます。

Hudi 外部テーブルを作成するには、次の操作を行います。

  1. Hudi 外部テーブルを作成するには、既存の Dataproc クラスタにジョブを送信します。Huudi-BigQuery コネクタをビルドする場合は、use-bq-manifest-file フラグを有効にして、マニフェスト ファイルを使用する方法に移行します。このフラグを使用すると、BigQuery でサポートされている形式でマニフェスト ファイルがエクスポートされます。このファイルを使用して、--table パラメータに指定した名前で外部テーブルが作成されます。

    spark-submit \
       --master yarn \
       --packages com.google.cloud:google-cloud-bigquery:2.10.4 \
       --class org.apache.hudi.gcp.bigquery.BigQuerySyncTool  \
       JAR \
       --project-id PROJECT_ID \
       --dataset-name DATASET \
       --dataset-location LOCATION \
       --table TABLE \
       --source-uri URI  \
       --source-uri-prefix URI_PREFIX \
       --base-path BASE_PATH  \
       --partitioned-by PARTITION_BY \
       --use-bq-manifest-file
    

    次のように置き換えます。

    • JAR: Hudi-BigQuery コネクタを使用している場合は、hudi-gcp-bundle-0.14.0.jar を指定します。Dataproc 2.1 の Hudi コンポーネントを使用している場合は、/usr/lib/hudi/tools/bq-sync-tool/hudi-gcp-bundle-0.12.3.1.jar を指定します。

    • PROJECT_ID: Hudi BigLake テーブルを作成するプロジェクト ID

    • DATASET: Hudi BigLake テーブルを作成するデータセット

    • LOCATION: Hudi BigLake テーブルを作成するロケーション

    • TABLE: 作成するテーブルの名前。

      マニフェスト ファイルにビューを作成した以前のバージョンの Hudi-BigQuery コネクタ(0.13.0 以前)から移行する場合は、同じテーブル名を使用して、既存のダウンストリーム パイプライン コードを保持します。

    • URI: Hudi マニフェスト ファイルを格納するために作成した Cloud Storage URI

      この URI は第 1 レベルのパーティションを指します。パーティション キーを含めてください 例: gs://mybucket/hudi/mydataset/EventDate=*

    • URI_PREFIX: Cloud Storage URI パスの接頭辞。通常は Hudi テーブルのパスです。

    • BASE_PATH: Hudi テーブルのベースパス

      例: gs://mybucket/hudi/mydataset/

    • PARTITION_BY: パーティション値

      例: EventDate

    コネクタの構成については、Hudi-BigQuery コネクタをご覧ください。

  2. 適切なきめ細かいコントロールを設定したり、メタデータ キャッシュを有効にしてパフォーマンスを高速化する場合は、BigLake テーブルをアップグレードするをご覧ください。

Delta ワークロードをクエリする

Delta テーブルがネイティブにサポートされるようになりました。Delta ワークロードには Delta BigLake テーブルを作成することをおすすめします。Delta Lake BigLake テーブルは、列のリマッピングや削除ベクターのあるテーブルなど、より高度な Delta Lake テーブルをサポートしています。また、Delta BigLake テーブルは最新のスナップショットを直接読み取るため、更新がすぐに利用できます。

Delta ワークロードをクエリするには、次の操作を行います。

  1. マニフェスト ファイルを生成します
  2. マニフェスト ファイルに基づいて BigLake テーブルを作成します。
  3. 適切なきめ細かい制御を設定するか、メタデータ キャッシュを有効にして、パフォーマンスを高速化します。BigLake テーブルをアップグレードするをご覧ください。

マニフェスト ファイルを生成する

BigQuery では、SymLinkTextInputFormat 形式のマニフェスト ファイルがサポートされています。これは、URI の改行区切りリストです。マニフェスト ファイルの生成の詳細については、Presto と Delta Lake の統合を設定して Delta テーブルをクエリするをご覧ください。

マニフェスト ファイルを生成するには、既存の Dataproc クラスタにジョブを送信します。

SQL

Spark を使用して、ロケーション path-to-delta-table にある Delta テーブルに次のコマンドを実行します。

GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`

Scala

Spark を使用して、ロケーション path-to-delta-table にある Delta テーブルに次のコマンドを実行します。

val deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

Java

Spark を使用して、ロケーション path-to-delta-table にある Delta テーブルに次のコマンドを実行します。

DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>);
deltaTable.generate("symlink_format_manifest");

Python

Spark を使用して、ロケーション path-to-delta-table にある Delta テーブルに次のコマンドを実行します。

deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

Delta BigLake テーブルを作成する

Delta BigLake テーブルを作成するには、file_set_spec_type フィールドを NEW_LINE_DELIMITED_MANIFEST に設定して CREATE EXTERNAL TABLE ステートメントを使用します。

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで CREATE EXTERNAL TABLE ステートメントを実行します。

    CREATE EXTERNAL TABLE PROJECT_ID.DATASET_NAME.TABLE_NAME
    WITH PARTITION COLUMNS(
    `PARTITION_COLUMN PARTITION_COLUMN_TYPE`,)
    WITH CONNECTION `PROJECT_IDREGION.CONNECTION_NAME`
    OPTIONS (
       format = "DATA_FORMAT",
       uris = ["URI"],
       file_set_spec_type = 'NEW_LINE_DELIMITED_MANIFEST',
       hive_partition_uri_prefix = "PATH_TO_DELTA_TABLE"
       max_staleness = STALENESS_INTERVAL,
       metadata_cache_mode = 'CACHE_MODE');
    

    次のように置き換えます。

    • DATASET_NAME: 作成したデータセットの名前。
    • TABLE_NAME: このテーブルに設定する名前。
    • REGION: 接続のあるロケーション(例: us-east1
    • CONNECTION_NAME: 作成した接続の名前。
    • DATA_FORMAT: サポートされている任意の形式PARQUET など)
    • URI: マニフェスト ファイルのパス(例: gs://mybucket/path
    • PATH_TO_DELTA_TABLE: パーティション キーをエンコードする前のすべてのソース URI に共通のプレフィックス。
    • STALENESS_INTERVAL: BigLake テーブルに対するオペレーションでキャッシュ内のメタデータを使用するかどうかを指定します。また、オペレーションでキャッシュ内のメタデータを使用する際の鮮度も指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。

      メタデータのキャッシュ保存を無効にするには、0 を指定します。これがデフォルトです。

      メタデータ キャッシュを有効にするには、30 分から 7 日の間で間隔リテラルの値を指定します。たとえば、4 時間のステイルネス間隔には INTERVAL 4 HOUR を指定します。この値を指定すると、キャッシュに保存されたメタデータが過去 4 時間以内に更新されていれば、テーブルに対するオペレーションはそのメタデータを使用します。キャッシュに保存されているメタデータがそれより古い場合、オペレーションは代わりに Delta Lake からメタデータを取得します。

    • CACHE_MODE: メタデータ キャッシュを自動的に更新するか手動で更新するかを指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。

      AUTOMATIC に設定すると、メタデータ キャッシュがシステムで定義された間隔(通常は 30~60 分)で更新されます。

      自身で決めたスケジュールでメタデータ キャッシュを更新する場合は、MANUAL に設定します。この場合は、BQ.REFRESH_EXTERNAL_METADATA_CACHE システム プロシージャを呼び出してキャッシュを更新できます。

      STALENESS_INTERVAL が 0 より大きい値に設定されている場合は、CACHE_MODE を設定する必要があります。

    例:

    CREATE EXTERNAL TABLE mydataset.mytable
    WITH CONNECTION `us-east1.myconnection`
    OPTIONS (
        format="PARQUET",
        uris=["gs://mybucket/path/partitionpath=*"],
        file_set_spec_type = 'NEW_LINE_DELIMITED_MANIFEST'
        hive_partition_uri_prefix = "gs://mybucket/path/"
        max_staleness = INTERVAL 1 DAY,
        metadata_cache_mode = 'AUTOMATIC'
    );

BigLake テーブルをアップグレードする

メタデータ キャッシュマテリアライズド ビューを利用して、ワークロードのパフォーマンスを高速化できます。メタデータ キャッシュを使用する場合、この設定を同時に指定できます。ソース形式やソース URI など、テーブルの詳細情報を取得するには、テーブル情報を取得するをご覧ください。

外部テーブルを BigLake テーブルに更新したり、既存の BigLake を更新したりするには、次のいずれかのオプションを選択します。

SQL

CREATE OR REPLACE EXTERNAL TABLE DDL ステートメントを使用してテーブルを更新します。

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで次のステートメントを入力します。

    CREATE OR REPLACE EXTERNAL TABLE
      `PROJECT_ID.DATASET.EXTERNAL_TABLE_NAME`
      WITH CONNECTION `REGION.CONNECTION_ID`
      OPTIONS(
        format ="TABLE_FORMAT",
        uris = ['BUCKET_PATH'],
        max_staleness = STALENESS_INTERVAL,
        metadata_cache_mode = 'CACHE_MODE'
        );

    次のように置き換えます。

    • PROJECT_ID: テーブルを含むプロジェクトの名前
    • DATASET: テーブルを含むデータセットの名前
    • EXTERNAL_TABLE_NAME: テーブルの名前
    • REGION: 接続を含むリージョン
    • CONNECTION_ID: 使用する接続の名前
    • TABLE_FORMAT: テーブルで使用される形式

      テーブルの更新時にこれを変更することはできません。

    • BUCKET_PATH: 外部テーブルのデータを含む Cloud Storage バケットへのパス(['gs://bucket_name/[folder_name/]file_name'] 形式)。

      パスにワイルドカードとしてアスタリスク(*)を 1 つ使用して、バケットから複数のファイルを選択することもできます。たとえば、['gs://mybucket/file_name*'] のようにします。詳細については、Cloud Storage の URI でのワイルドカードのサポートをご覧ください。

      複数のパスを指定して、uris オプションに複数のバケットを指定できます。

      次の例に、有効な uris 値を示します。

      • ['gs://bucket/path1/myfile.csv']
      • ['gs://bucket/path1/*.csv']
      • ['gs://bucket/path1/*', 'gs://bucket/path2/file00*']

      複数のファイルをターゲットとする uris 値を指定する場合、それらのファイルはすべて互換性のあるスキーマを共有する必要があります。

      BigQuery での Cloud Storage URI の使用方法については、Cloud Storage リソースパスをご覧ください。

    • STALENESS_INTERVAL: キャッシュ内のメタデータをテーブルに対するオペレーションで使用するかどうかを指定します。また、オペレーションがキャッシュ内のメタデータを使用するためにその鮮度を指定します。

      メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。

      メタデータのキャッシュ保存を無効にするには、0 を指定します。これがデフォルトです。

      メタデータ キャッシュを有効にするには、30 分から 7 日の間で間隔リテラルの値を指定します。たとえば、4 時間のステイルネス間隔には INTERVAL 4 HOUR を指定します。この値を指定すると、キャッシュに保存されたメタデータが過去 4 時間以内に更新されていれば、テーブルに対するオペレーションはそのメタデータを使用します。キャッシュに保存されているメタデータがそれより古い場合、オペレーションは代わりに Cloud Storage からメタデータを取得します。

    • CACHE_MODE: メタデータ キャッシュを自動的に更新するか手動で更新するかを指定します。

      メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュのパフォーマンスをご覧ください。

      AUTOMATIC に設定すると、メタデータ キャッシュがシステムで定義された間隔(通常は 30~60 分)で更新されます。

      自身で決めたスケジュールでメタデータ キャッシュを更新する場合は、MANUAL に設定します。この場合は、BQ.REFRESH_EXTERNAL_METADATA_CACHE システム プロシージャを呼び出してキャッシュを更新できます。

      STALENESS_INTERVAL が 0 より大きい値に設定されている場合は、CACHE_MODE を設定する必要があります。

  3. [実行] をクリックします。

クエリの実行方法について詳しくは、インタラクティブ クエリを実行するをご覧ください。

bq

テーブルを更新するには、bq mkdef コマンドと bq update コマンドを使用します。

  1. 変更するテーブルの要素を記述する外部テーブル定義を生成します。

    bq mkdef --connection_id=PROJECT_ID.REGION.CONNECTION_ID \
    --source_format=TABLE_FORMAT \
    --metadata_cache_mode=CACHE_MODE \
    "BUCKET_PATH" > /tmp/DEFINITION_FILE

    次のように置き換えます。

    • PROJECT_ID: 接続を含むプロジェクトの名前。
    • REGION: 接続を含むリージョン。
    • CONNECTION_ID: 使用する接続の名前。
    • TABLE_FORMAT: テーブルで使用される形式。テーブルの更新時にこれを変更することはできません。
    • CACHE_MODE: メタデータ キャッシュを自動的に更新するか手動で更新するかを指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。

      AUTOMATIC に設定すると、メタデータ キャッシュがシステムで定義された間隔(通常は 30~60 分)で更新されます。

      自身で決めたスケジュールでメタデータ キャッシュを更新する場合は、MANUAL に設定します。この場合は、BQ.REFRESH_EXTERNAL_METADATA_CACHE システム プロシージャを呼び出してキャッシュを更新できます。

      STALENESS_INTERVAL が 0 より大きい値に設定されている場合は、CACHE_MODE を設定する必要があります。

    • BUCKET_PATH: 外部テーブルのデータを含む Cloud Storage バケットへのパス(gs://bucket_name/[folder_name/]file_name 形式)。

      バケットから選択したファイルを制限するには、パスにワイルドカードとしてアスタリスク(*)を 1 つ指定します。たとえば、gs://mybucket/file_name* のようにします。詳細については、Cloud Storage の URI でのワイルドカードのサポートをご覧ください。

      複数のパスを指定して、uris オプションに複数のバケットを指定できます。

      次の例に、有効な uris 値を示します。

      • gs://bucket/path1/myfile.csv
      • gs://bucket/path1/*.csv
      • gs://bucket/path1/*,gs://bucket/path2/file00*

      複数のファイルをターゲットとする uris 値を指定する場合、それらのファイルはすべて互換性のあるスキーマを共有する必要があります。

      BigQuery での Cloud Storage URI の使用方法については、Cloud Storage リソースパスをご覧ください。

    • DEFINITION_FILE: 作成するテーブル定義ファイルの名前。

  2. 新しい外部テーブルの定義を使用してテーブルを更新します。

    bq update --max_staleness=STALENESS_INTERVAL \
    --external_table_definition=/tmp/DEFINITION_FILE \
    PROJECT_ID:DATASET.EXTERNAL_TABLE_NAME

    次のように置き換えます。

    • STALENESS_INTERVAL: キャッシュ内のメタデータをテーブルに対するオペレーションで使用するかどうかを指定します。また、オペレーションがキャッシュ内のメタデータを使用するためにその鮮度を指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。

      メタデータのキャッシュ保存を無効にするには、0 を指定します。これがデフォルトです。

      メタデータ キャッシュを有効にするには、INTERVAL データ型ドキュメントで説明されている Y-M D H:M:S 形式を使用して、30 分から 7 日の間隔値を指定します。たとえば、4 時間のステイルネス間隔の場合、0-0 0 4:0:0 を指定します。この値を指定すると、キャッシュに保存されたメタデータが過去 4 時間以内に更新されていれば、テーブルに対するオペレーションはそのメタデータを使用します。キャッシュに保存されたメタデータがそれより古い場合、オペレーションは代わりに Cloud Storage からメタデータを取得します。

    • DEFINITION_FILE: 作成または更新したテーブル定義ファイルの名前。

    • PROJECT_ID: テーブルを含むプロジェクトの名前。

    • DATASET: テーブルを含むデータセットの名前

    • EXTERNAL_TABLE_NAME: テーブルの名前。

BigLake と外部テーブルをクエリする

Iceberg BigLake テーブルを作成すると、標準の BigQuery テーブルと同じように GoogleSQL 構文を使用してクエリを実行できます。例: SELECT field1, field2 FROM mydataset.my_cloud_storage_table;

制限事項

  • BigQuery は、Delta Lake Reader v1 テーブルのクエリのみをサポートします。

  • Hudi と BigQuery の統合は、Hive スタイルのパーティション分割 copy-on-write テーブルでのみ機能します。

次のステップ