マニフェストを使用してオープン テーブル形式のクエリを実行する
このドキュメントでは、マニフェスト ファイルを使用して、Apache Hudi や Delta Lake などのオープン テーブル形式で保存されているデータをクエリする方法について説明します。
Hudi や Delta Lake などの一部のオープン テーブル形式では、現在の状態を 1 つ以上のマニフェスト ファイルとしてエクスポートします。マニフェスト ファイルには、テーブルを構成するデータファイルのリストが含まれています。BigQuery のマニフェスト サポートを使用すると、オープン テーブル形式で格納されたデータにクエリを実行して読み込むことができます。
始める前に
Enable the BigQuery Connection, BigQuery Reservation, and BigLake APIs.
BigLake テーブルを作成するには、次のいずれかの方法で Spark コマンドを実行します。
Dataproc クラスタを作成します。Hudi テーブルをクエリする場合は、
--optional-components
フィールドをHUDI
に設定します。Delta テーブルをクエリする場合は、--optional-components
をPresto
に設定します。BigQuery で Spark 用のストアド プロシージャを使用します。方法は次のとおりです。
マニフェスト ファイルを Cloud Storage に保存するため、Cloud Storage バケットを作成します。マニフェスト ファイルにアクセスするには、Cloud Storage バケットに接続する必要があります。方法は次のとおりです。
必要なロール
Hudi と Delta Lake のデータに基づいて BigLake テーブルをクエリするには、次のロールが必要です。
- BigQuery Connection ユーザー(
roles/bigquery.connectionUser
) - BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery ユーザー(
roles/bigquery.user
)
Hudi 外部テーブルをクエリすることもできますが、外部テーブルを BigLake にアップグレードすることをおすすめします。Hudi 外部テーブルをクエリするには、次のロールが必要です。
- BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery ユーザー(
roles/bigquery.user
) - Storage オブジェクト閲覧者(
roles/storage.objectViewer
)
権限に応じて、これらのロールを自身に付与するか、これらのロールを付与するよう管理者に依頼します。ロールの付与の詳細については、リソースに対して付与可能なロールの表示をご覧ください。
Spark BigLake テーブルのクエリに必要な権限については、「必要な権限」を開いて確認してください。
必要な権限
bigquery.connections.use
bigquery.jobs.create
bigquery.readsessions.create
(BigQuery Storage Read API を使用してデータを読み取る場合にのみ必要)bigquery.tables.get
bigquery.tables.getData
カスタムロールや他の事前定義ロールを使用して、これらの権限を取得することもできます。
Hudi ワークロードをクエリする
Hudi データをクエリするには、次の操作を行います。
- Huudi のデータに基づいて外部テーブルを作成します。
- 外部テーブルを BigLake にアップグレードします。
Hudi 外部テーブルを作成する
Hudi と BigQuery の同期ツールを使用してテーブルを同期するときに、use-bq-manifest-file
フラグを有効にして、マニフェスト ファイルを使用する方法に移行します。このフラグを使用すると、BigQuery でサポートされている形式でマニフェスト ファイルがエクスポートされます。このファイルを使用して、--table
パラメータに指定した名前で外部テーブルが作成されます。
Hudi 外部テーブルを作成するには、次の操作を行います。
Hudi 外部テーブルを作成するには、既存の Dataproc クラスタにジョブを送信します。Huudi-BigQuery コネクタをビルドする場合は、
use-bq-manifest-file
フラグを有効にして、マニフェスト ファイルを使用する方法に移行します。このフラグを使用すると、BigQuery でサポートされている形式でマニフェスト ファイルがエクスポートされます。このファイルを使用して、--table
パラメータに指定した名前で外部テーブルが作成されます。spark-submit \ --master yarn \ --packages com.google.cloud:google-cloud-bigquery:2.10.4 \ --class org.apache.hudi.gcp.bigquery.BigQuerySyncTool \ JAR \ --project-id PROJECT_ID \ --dataset-name DATASET \ --dataset-location LOCATION \ --table TABLE \ --source-uri URI \ --source-uri-prefix URI_PREFIX \ --base-path BASE_PATH \ --partitioned-by PARTITION_BY \ --use-bq-manifest-file
次のように置き換えます。
JAR
: Hudi-BigQuery コネクタを使用している場合は、hudi-gcp-bundle-0.14.0.jar
を指定します。Dataproc 2.1 の Hudi コンポーネントを使用している場合は、/usr/lib/hudi/tools/bq-sync-tool/hudi-gcp-bundle-0.12.3.1.jar
を指定します。PROJECT_ID
: Hudi BigLake テーブルを作成するプロジェクト IDDATASET
: Hudi BigLake テーブルを作成するデータセットLOCATION
: Hudi BigLake テーブルを作成するロケーションTABLE
: 作成するテーブルの名前。マニフェスト ファイルにビューを作成した以前のバージョンの Hudi-BigQuery コネクタ(0.13.0 以前)から移行する場合は、同じテーブル名を使用して、既存のダウンストリーム パイプライン コードを保持します。
URI
: Hudi マニフェスト ファイルを格納するために作成した Cloud Storage URIこの URI は第 1 レベルのパーティションを指します。パーティション キーを含めてください 例:
gs://mybucket/hudi/mydataset/EventDate=*
URI_PREFIX
: Cloud Storage URI パスの接頭辞。通常は Hudi テーブルのパスです。BASE_PATH
: Hudi テーブルのベースパス例:
gs://mybucket/hudi/mydataset/
PARTITION_BY
: パーティション値例:
EventDate
コネクタの構成については、Hudi-BigQuery コネクタをご覧ください。
適切なきめ細かいコントロールを設定したり、メタデータ キャッシュを有効にしてパフォーマンスを高速化する場合は、BigLake テーブルをアップグレードするをご覧ください。
Delta ワークロードをクエリする
Delta テーブルがネイティブにサポートされるようになりました。Delta ワークロードには Delta BigLake テーブルを作成することをおすすめします。Delta Lake BigLake テーブルは、列のリマッピングや削除ベクターのあるテーブルなど、より高度な Delta Lake テーブルをサポートしています。また、Delta BigLake テーブルは最新のスナップショットを直接読み取るため、更新がすぐに利用できます。
Delta ワークロードをクエリするには、次の操作を行います。
- マニフェスト ファイルを生成します。
- マニフェスト ファイルに基づいて BigLake テーブルを作成します。
- 適切なきめ細かい制御を設定するか、メタデータ キャッシュを有効にして、パフォーマンスを高速化します。BigLake テーブルをアップグレードするをご覧ください。
マニフェスト ファイルを生成する
BigQuery では、SymLinkTextInputFormat
形式のマニフェスト ファイルがサポートされています。これは、URI の改行区切りリストです。マニフェスト ファイルの生成の詳細については、Presto と Delta Lake の統合を設定して Delta テーブルをクエリするをご覧ください。
マニフェスト ファイルを生成するには、既存の Dataproc クラスタにジョブを送信します。
SQL
Spark を使用して、ロケーション path-to-delta-table
にある Delta テーブルに次のコマンドを実行します。
GENERATE symlink_format_manifest FOR TABLE delta.`<path-to-delta-table>`
Scala
Spark を使用して、ロケーション path-to-delta-table
にある Delta テーブルに次のコマンドを実行します。
val deltaTable = DeltaTable.forPath(<path-to-delta-table>) deltaTable.generate("symlink_format_manifest")
Java
Spark を使用して、ロケーション path-to-delta-table
にある Delta テーブルに次のコマンドを実行します。
DeltaTable deltaTable = DeltaTable.forPath(<path-to-delta-table>); deltaTable.generate("symlink_format_manifest");
Python
Spark を使用して、ロケーション path-to-delta-table
にある Delta テーブルに次のコマンドを実行します。
deltaTable = DeltaTable.forPath(<path-to-delta-table>) deltaTable.generate("symlink_format_manifest")
Delta BigLake テーブルを作成する
Delta BigLake テーブルを作成するには、file_set_spec_type
フィールドを NEW_LINE_DELIMITED_MANIFEST
に設定して CREATE EXTERNAL TABLE
ステートメントを使用します。
[BigQuery] ページに移動します。
クエリエディタで
CREATE EXTERNAL TABLE
ステートメントを実行します。CREATE EXTERNAL TABLE PROJECT_ID.DATASET_NAME.TABLE_NAME WITH PARTITION COLUMNS( `PARTITION_COLUMN PARTITION_COLUMN_TYPE`,) WITH CONNECTION `PROJECT_IDREGION.CONNECTION_NAME` OPTIONS ( format = "DATA_FORMAT", uris = ["URI"], file_set_spec_type = 'NEW_LINE_DELIMITED_MANIFEST', hive_partition_uri_prefix = "PATH_TO_DELTA_TABLE" max_staleness = STALENESS_INTERVAL, metadata_cache_mode = 'CACHE_MODE');
次のように置き換えます。
DATASET_NAME
: 作成したデータセットの名前。TABLE_NAME
: このテーブルに設定する名前。REGION
: 接続のあるロケーション(例:us-east1
)CONNECTION_NAME
: 作成した接続の名前。DATA_FORMAT
: サポートされている任意の形式(PARQUET
など)URI
: マニフェスト ファイルのパス(例:gs://mybucket/path
)PATH_TO_DELTA_TABLE
: パーティション キーをエンコードする前のすべてのソース URI に共通のプレフィックス。STALENESS_INTERVAL
: BigLake テーブルに対するオペレーションでキャッシュ内のメタデータを使用するかどうかを指定します。また、オペレーションでキャッシュ内のメタデータを使用する際の鮮度も指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。メタデータのキャッシュ保存を無効にするには、0 を指定します。これがデフォルトです。
メタデータ キャッシュを有効にするには、30 分から 7 日の間で間隔リテラルの値を指定します。たとえば、4 時間のステイルネス間隔には
INTERVAL 4 HOUR
を指定します。この値を指定すると、キャッシュに保存されたメタデータが過去 4 時間以内に更新されていれば、テーブルに対するオペレーションはそのメタデータを使用します。キャッシュに保存されているメタデータがそれより古い場合、オペレーションは代わりに Delta Lake からメタデータを取得します。CACHE_MODE
: メタデータ キャッシュを自動的に更新するか手動で更新するかを指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。AUTOMATIC
に設定すると、メタデータ キャッシュがシステムで定義された間隔(通常は 30~60 分)で更新されます。自身で決めたスケジュールでメタデータ キャッシュを更新する場合は、
MANUAL
に設定します。この場合は、BQ.REFRESH_EXTERNAL_METADATA_CACHE
システム プロシージャを呼び出してキャッシュを更新できます。STALENESS_INTERVAL
が 0 より大きい値に設定されている場合は、CACHE_MODE
を設定する必要があります。
例:
CREATE EXTERNAL TABLE mydataset.mytable WITH CONNECTION `us-east1.myconnection` OPTIONS ( format="PARQUET", uris=["gs://mybucket/path/partitionpath=*"], file_set_spec_type = 'NEW_LINE_DELIMITED_MANIFEST' hive_partition_uri_prefix = "gs://mybucket/path/" max_staleness = INTERVAL 1 DAY, metadata_cache_mode = 'AUTOMATIC' );
BigLake テーブルをアップグレードする
メタデータ キャッシュとマテリアライズド ビューを利用して、ワークロードのパフォーマンスを高速化できます。メタデータ キャッシュを使用する場合、この設定を同時に指定できます。ソース形式やソース URI など、テーブルの詳細情報を取得するには、テーブル情報を取得するをご覧ください。
外部テーブルを BigLake テーブルに更新したり、既存の BigLake を更新したりするには、次のいずれかのオプションを選択します。
SQL
CREATE OR REPLACE EXTERNAL TABLE
DDL ステートメントを使用してテーブルを更新します。
Google Cloud コンソールで [BigQuery] ページに移動します。
クエリエディタで次のステートメントを入力します。
CREATE OR REPLACE EXTERNAL TABLE `PROJECT_ID.DATASET.EXTERNAL_TABLE_NAME` WITH CONNECTION `REGION.CONNECTION_ID` OPTIONS( format ="TABLE_FORMAT", uris = ['BUCKET_PATH'], max_staleness = STALENESS_INTERVAL, metadata_cache_mode = 'CACHE_MODE' );
次のように置き換えます。
PROJECT_ID
: テーブルを含むプロジェクトの名前DATASET
: テーブルを含むデータセットの名前EXTERNAL_TABLE_NAME
: テーブルの名前REGION
: 接続を含むリージョンCONNECTION_ID
: 使用する接続の名前TABLE_FORMAT
: テーブルで使用される形式テーブルの更新時にこれを変更することはできません。
BUCKET_PATH
: 外部テーブルのデータを含む Cloud Storage バケットへのパス(['gs://bucket_name/[folder_name/]file_name']
形式)。パスにワイルドカードとしてアスタリスク(
*
)を 1 つ使用して、バケットから複数のファイルを選択することもできます。たとえば、['gs://mybucket/file_name*']
のようにします。詳細については、Cloud Storage の URI でのワイルドカードのサポートをご覧ください。複数のパスを指定して、
uris
オプションに複数のバケットを指定できます。次の例に、有効な
uris
値を示します。['gs://bucket/path1/myfile.csv']
['gs://bucket/path1/*.csv']
['gs://bucket/path1/*', 'gs://bucket/path2/file00*']
複数のファイルをターゲットとする
uris
値を指定する場合、それらのファイルはすべて互換性のあるスキーマを共有する必要があります。BigQuery での Cloud Storage URI の使用方法については、Cloud Storage リソースパスをご覧ください。
STALENESS_INTERVAL
: キャッシュ内のメタデータをテーブルに対するオペレーションで使用するかどうかを指定します。また、オペレーションがキャッシュ内のメタデータを使用するためにその鮮度を指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。
メタデータのキャッシュ保存を無効にするには、0 を指定します。これがデフォルトです。
メタデータ キャッシュを有効にするには、30 分から 7 日の間で間隔リテラルの値を指定します。たとえば、4 時間のステイルネス間隔には
INTERVAL 4 HOUR
を指定します。この値を指定すると、キャッシュに保存されたメタデータが過去 4 時間以内に更新されていれば、テーブルに対するオペレーションはそのメタデータを使用します。キャッシュに保存されているメタデータがそれより古い場合、オペレーションは代わりに Cloud Storage からメタデータを取得します。CACHE_MODE
: メタデータ キャッシュを自動的に更新するか手動で更新するかを指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュのパフォーマンスをご覧ください。
AUTOMATIC
に設定すると、メタデータ キャッシュがシステムで定義された間隔(通常は 30~60 分)で更新されます。自身で決めたスケジュールでメタデータ キャッシュを更新する場合は、
MANUAL
に設定します。この場合は、BQ.REFRESH_EXTERNAL_METADATA_CACHE
システム プロシージャを呼び出してキャッシュを更新できます。STALENESS_INTERVAL
が 0 より大きい値に設定されている場合は、CACHE_MODE
を設定する必要があります。
[
実行] をクリックします。
クエリの実行方法について詳しくは、インタラクティブ クエリを実行するをご覧ください。
bq
テーブルを更新するには、bq mkdef
コマンドと bq update
コマンドを使用します。
変更するテーブルの要素を記述する外部テーブル定義を生成します。
bq mkdef --connection_id=PROJECT_ID.REGION.CONNECTION_ID \ --source_format=TABLE_FORMAT \ --metadata_cache_mode=CACHE_MODE \ "BUCKET_PATH" > /tmp/DEFINITION_FILE
次のように置き換えます。
PROJECT_ID
: 接続を含むプロジェクトの名前。REGION
: 接続を含むリージョン。CONNECTION_ID
: 使用する接続の名前。TABLE_FORMAT
: テーブルで使用される形式。テーブルの更新時にこれを変更することはできません。CACHE_MODE
: メタデータ キャッシュを自動的に更新するか手動で更新するかを指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。AUTOMATIC
に設定すると、メタデータ キャッシュがシステムで定義された間隔(通常は 30~60 分)で更新されます。自身で決めたスケジュールでメタデータ キャッシュを更新する場合は、
MANUAL
に設定します。この場合は、BQ.REFRESH_EXTERNAL_METADATA_CACHE
システム プロシージャを呼び出してキャッシュを更新できます。STALENESS_INTERVAL
が 0 より大きい値に設定されている場合は、CACHE_MODE
を設定する必要があります。BUCKET_PATH
: 外部テーブルのデータを含む Cloud Storage バケットへのパス(gs://bucket_name/[folder_name/]file_name
形式)。バケットから選択したファイルを制限するには、パスにワイルドカードとしてアスタリスク(
*
)を 1 つ指定します。たとえば、gs://mybucket/file_name*
のようにします。詳細については、Cloud Storage の URI でのワイルドカードのサポートをご覧ください。複数のパスを指定して、
uris
オプションに複数のバケットを指定できます。次の例に、有効な
uris
値を示します。gs://bucket/path1/myfile.csv
gs://bucket/path1/*.csv
gs://bucket/path1/*,gs://bucket/path2/file00*
複数のファイルをターゲットとする
uris
値を指定する場合、それらのファイルはすべて互換性のあるスキーマを共有する必要があります。BigQuery での Cloud Storage URI の使用方法については、Cloud Storage リソースパスをご覧ください。
DEFINITION_FILE
: 作成するテーブル定義ファイルの名前。
新しい外部テーブルの定義を使用してテーブルを更新します。
bq update --max_staleness=STALENESS_INTERVAL \ --external_table_definition=/tmp/DEFINITION_FILE \ PROJECT_ID:DATASET.EXTERNAL_TABLE_NAME
次のように置き換えます。
STALENESS_INTERVAL
: キャッシュ内のメタデータをテーブルに対するオペレーションで使用するかどうかを指定します。また、オペレーションがキャッシュ内のメタデータを使用するためにその鮮度を指定します。メタデータ キャッシュに関する考慮事項の詳細については、メタデータ キャッシュによるパフォーマンスの向上をご覧ください。メタデータのキャッシュ保存を無効にするには、0 を指定します。これがデフォルトです。
メタデータ キャッシュを有効にするには、
INTERVAL
データ型ドキュメントで説明されているY-M D H:M:S
形式を使用して、30 分から 7 日の間隔値を指定します。たとえば、4 時間のステイルネス間隔の場合、0-0 0 4:0:0
を指定します。この値を指定すると、キャッシュに保存されたメタデータが過去 4 時間以内に更新されていれば、テーブルに対するオペレーションはそのメタデータを使用します。キャッシュに保存されたメタデータがそれより古い場合、オペレーションは代わりに Cloud Storage からメタデータを取得します。DEFINITION_FILE
: 作成または更新したテーブル定義ファイルの名前。PROJECT_ID
: テーブルを含むプロジェクトの名前。DATASET
: テーブルを含むデータセットの名前EXTERNAL_TABLE_NAME
: テーブルの名前。
BigLake と外部テーブルをクエリする
Iceberg BigLake テーブルを作成すると、標準の BigQuery テーブルと同じように GoogleSQL 構文を使用してクエリを実行できます。例: SELECT field1, field2 FROM mydataset.my_cloud_storage_table;
制限事項
BigQuery は、Delta Lake Reader v1 テーブルのクエリのみをサポートします。
Hudi と BigQuery の統合は、Hive スタイルのパーティション分割
copy-on-write
テーブルでのみ機能します。
次のステップ
- BigQuery での SQL の使用について確認する。
- BigLake テーブルについて確認する。
- BigQuery の割り当てについて確認する。