Cloud Storage からの Parquet データの読み込み

このページでは、Cloud Storage から BigQuery への Parquet データの読み込みの概要を説明します。

Parquet は、Apache Hadoop エコシステムで広く使用されているオープンソースの列指向のデータ形式です。

Parquet データを Cloud Storage から読み込む際に、新しいテーブルまたはパーティションにデータを読み込むことも、既存のテーブルまたはパーティションにデータを追加したり、上書きしたりすることもできます。BigQuery に読み込まれたデータは Capacitor の列型(BigQuery のストレージ形式)に変換されます。

Cloud Storage から BigQuery テーブルにデータを読み込むとき、テーブルを含むデータセットが Cloud Storage バケットと同じリージョンまたはマルチリージョンのロケーションに存在している必要があります。

ローカル ファイルから Parquet データを読み込む方法については、ローカル ファイルからのデータの読み込みをご覧ください。

制限事項

Cloud Storage バケットから BigQuery にデータを読み込む際には、次の制限があります。

  • データセットのロケーションが US マルチリージョン以外の値に設定されている場合、Cloud Storage バケットはデータセットと同じロケーションに存在する必要があります。
  • BigQuery では外部データソースに対して整合性が保証されません。クエリの実行中に基になるデータを変更すると、予期しない動作が発生する可能性があります。
  • BigQuery では、Cloud Storage オブジェクトのバージョニングはサポートされていません。Cloud Storage URI に世代番号を含めると、読み込みジョブは失敗します。

入力ファイルの要件

Parquet ファイルを BigQuery に読み込むときに resourcesExceeded エラーを回避するには、次のガイドラインに従ってください。

  • レコードのサイズは 50 MB 以下にします。
  • 入力データの列が 100 を超える場合は、ページサイズをデフォルトのページサイズ(1 × 1,024 × 1,024 バイト)より小さくすることを検討してください。これは、大幅に圧縮している場合に特に便利です。

始める前に

このドキュメントの各タスクを実行するために必要な権限をユーザーに与える Identity and Access Management(IAM)のロールを付与します。

必要な権限

BigQuery にデータを読み込むには、読み込みジョブを実行してデータを BigQuery のテーブルとパーティションに読み込む IAM 権限が必要です。Cloud Storage からデータを読み込む場合は、データを含むバケットに対する IAM アクセス権限も必要です。

BigQuery にデータを読み込む権限

新しい BigQuery テーブルやパーティションにデータを読み込む場合、または既存のテーブルやパーティションにデータの追加や上書きを行う場合は、次の IAM 権限が必要です。

  • bigquery.tables.create
  • bigquery.tables.updateData
  • bigquery.tables.update
  • bigquery.jobs.create

以下の各事前定義 IAM ロールには、BigQuery テーブルやパーティションにデータを読み込むために必要な権限が含まれています。

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.adminbigquery.jobs.create 権限を含む)
  • bigquery.userbigquery.jobs.create 権限を含む)
  • bigquery.jobUserbigquery.jobs.create 権限を含む)

また、bigquery.datasets.create 権限がある場合は、作成するデータセットで読み込みジョブを使用してテーブルの作成と更新を行えます。

BigQuery での IAM のロールと権限については、事前定義ロールと権限をご覧ください。

Cloud Storage からデータを読み込む権限

Cloud Storage バケットからデータを読み込むには、次の IAM 権限が必要です。

IAM 事前定義ロール roles/storage.objectViewer には、Cloud Storage バケットからデータを読み込むために必要なすべての権限が含まれています。

Parquet のスキーマ

Parquet ファイルを BigQuery に読み込むと、自己記述型ソースデータから自動的にテーブル スキーマが取得されます。BigQuery がソースデータからスキーマを取得する際は、アルファベット順で最後のファイルが使用されます。

たとえば、Cloud Storage に次の Parquet ファイルがあるとします。

gs://mybucket/00/
  a.parquet
  z.parquet
gs://mybucket/01/
  b.parquet

bq コマンドライン ツールでこのコマンドを実行すると、すべてのファイルがカンマ区切りのリストとして読み込まれ、mybucket/01/b.parquet からスキーマが取得されます。

bq load \
--source_format=PARQUET \
dataset.table \
"gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

異なるスキーマを持つ複数の Parquet ファイルを読み込む場合、複数のスキーマで指定された同一の列は、各スキーマ定義内で同じモードである必要があります。

BigQuery がスキーマを検出すると、一部の Parquet データ型は、BigQuery SQL 構文に対応するように BigQuery データ型に変換されます。詳細については、Parquet の変換をご覧ください。

Parquet 圧縮

BigQuery は、Parquet ファイル内のデータブロックに対して次の圧縮コーデックをサポートしています。

  • GZip
  • LZO_1CLZO_1X
  • Snappy
  • ZSTD

Parquet データを新しいテーブルに読み込む

次のいずれかの方法で、Parquet データを新しいテーブルに読み込むことができます。

  • コンソール
  • bq コマンドライン ツールの bq load コマンド
  • jobs.insert API メソッドと load ジョブの構成
  • クライアント ライブラリ

Parquet データを Cloud Storage から新しい BigQuery テーブルに読み込むには:

コンソール

  1. コンソールで、[BigQuery] ページに移動します。

    BigQuery に移動

  2. [エクスプローラ] ペインでプロジェクトを展開し、データセットを選択します。
  3. [データセット情報] セクションで、 [テーブルを作成] をクリックします。
  4. [テーブルの作成] パネルで、次の詳細を指定します。
    1. [ソース] セクションの [テーブルの作成元] リストで [Google Cloud Storage] を選択します。次に、以下の操作を行います。
      1. Cloud Storage バケットからファイルを選択するか、Cloud Storage URI を入力します。コンソールで複数の URI を指定することはできませんが、ワイルドカードはサポートされています。Cloud Storage バケットは、作成、追加、または上書きするテーブルを含むデータセットと同じロケーションに存在している必要があります。BigQuery テーブルを作成するためのソースファイルを選択する
      2. [ファイル形式] で、[Parquet] を選択します。
    2. [送信先] セクションで、次の詳細を指定します。
      1. [データセット] で、テーブルを作成するデータセットを選択します。
      2. [テーブル] フィールドに、作成するテーブルの名前を入力します。
      3. [テーブルタイプ] フィールドが [ネイティブ テーブル] に設定されていることを確認します。
    3. [スキーマ] セクションでは、何もする必要はありません。スキーマは、Parquet ファイルで自己記述されます。
    4. 省略可: [パーティションとクラスタの設定] を指定します。詳細については、パーティション分割テーブルの作成クラスタ化テーブルの作成と使用をご覧ください。
    5. [詳細オプション] をクリックして、次の操作を行います。
      • [書き込み設定] で、[空の場合に書き込む] を選択したままにします。これにより、新しいテーブルが作成され、データが読み込まれます。
      • テーブルのスキーマに存在しない行の値を無視する場合は、[不明な値] を選択します。
      • Cloud Key Management Service 鍵を使用するには、[暗号化] で [顧客管理の暗号鍵] をクリックします。[Google が管理する暗号鍵] の設定をそのままにすると、BigQuery は保存されているデータを暗号化します。
    6. [テーブルを作成] をクリックします。

SQL

LOAD DATA DDL ステートメントを使用します。 次の例では、Parquet ファイルを新しいテーブル mytable に読み込みます。

  1. コンソールで、BigQuery ページに移動します。

    BigQuery に移動

  2. クエリエディタで次のステートメントを入力します。

    LOAD DATA OVERWRITE mydataset.mytable
    FROM FILES (
      format = 'PARQUET',
      uris = ['gs://bucket/path/file.parquet']);
    

  3. [ 実行] をクリックします。

クエリの実行方法については、インタラクティブ クエリの実行をご覧ください。

bq

bq load コマンドを使用します。--source_format フラグを使用して PARQUET を指定し、Cloud Storage URI を設定します。単一の URI、URI のカンマ区切りのリスト、ワイルドカードを含む URI を指定できます。

(省略可)--location フラグを指定して、その値をロケーションに設定します。

次のフラグを使用することもできます。

  • --time_partitioning_type: テーブルでの時間ベースのパーティショニングを有効にし、パーティション タイプを設定します。有効な値は HOURDAYMONTHYEAR です。DATEDATETIME または TIMESTAMP 列でパーティション分割されたテーブルを作成する場合、このフラグは省略可能です。時間ベースのパーティショニングのデフォルト パーティション タイプは DAY です。既存のテーブルのパーティショニング仕様を変更することはできません。
  • --time_partitioning_expiration: 時間ベースのパーティションを削除する必要があるタイミングを指定する整数(秒単位)。パーティションの日付(UTC)に、この整数値を足した値が有効期限になります。
  • --time_partitioning_field: パーティション分割テーブルの作成に使用される DATE または TIMESTAMP の列。この値を指定せずに時間ベースのパーティショニングを有効にすると、取り込み時間パーティション分割テーブルが作成されます。
  • --require_partition_filter: 有効にすると、クエリの実行時に WHERE 句でパーティションを指定するようユーザーに求めます。パーティション フィルタを必須にすると、コストが削減され、パフォーマンスが向上する場合があります。詳細については、パーティション分割テーブルのクエリをご覧ください。
  • --clustering_fields: クラスタ化テーブルの作成に使用する列名のカンマ区切りのリスト。最大 4 個の列名を指定できます。
  • --destination_kms_key: テーブルデータの暗号化に使用される Cloud KMS 鍵。

    パーティション分割テーブルの詳細については、以下をご覧ください。

    クラスタ化テーブルの詳細については、以下をご覧ください。

    テーブルの暗号化の詳細については、以下をご覧ください。

Parquet データを BigQuery に読み込むには、次のコマンドを入力します。

bq --location=LOCATION load \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

次のように置き換えます。

  • LOCATION: ロケーション。--location フラグは省略可能です。たとえば、BigQuery を東京リージョンで使用している場合は、このフラグの値を asia-northeast1 に設定します。.bigqueryrc ファイルを使用してロケーションのデフォルト値を設定できます。
  • FORMAT: PARQUET
  • DATASET: 既存のデータセット。
  • TABLE: データの読み込み先のテーブル名。
  • PATH_TO_SOURCE: 完全修飾の Cloud Storage URI または URI のカンマ区切りのリスト。ワイルドカードも使用できます。

例:

次のコマンドは、gs://mybucket/mydata.parquet から、mydataset 内の mytable というテーブルにデータを読み込みます。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable という新しい取り込み時間パーティション分割テーブルに追加します。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable というパーティション分割テーブルに追加します。テーブルは mytimestamp 列でパーティション分割されます。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/ の複数のファイルから mydataset 内の mytable という名前のテーブルにデータを読み込みます。Cloud Storage の URI ではワイルドカードを使用しています。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata*.parquet

次のコマンドは、gs://mybucket/ の複数のファイルから mydataset 内の mytable という名前のテーブルにデータを読み込みます。このコマンドでは、Cloud Storage の URI のカンマ区切りのリストをワイルドカード付きで使用しています。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    "gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

API

  1. Cloud Storage のソースデータを参照する load ジョブを作成します。

  2. (省略可)ジョブリソースjobReference セクションにある location プロパティでロケーションを指定します。

  3. source URIs プロパティは、完全修飾の gs://BUCKET/OBJECT の形式にする必要があります。各 URI にワイルドカード文字(*)を 1 つ含めることができます。

  4. sourceFormat プロパティを PARQUET に設定して、Parquet データ形式を指定します。

  5. ジョブのステータスを確認するには、jobs.get(JOB_ID*) を呼び出します。JOB_ID は、最初のリクエストで返されるジョブの ID で置き換えます。

    • status.state = DONE である場合、ジョブは正常に完了しています。
    • status.errorResult プロパティが存在する場合は、リクエストが失敗したことを意味し、該当するオブジェクトにエラーを説明する情報が格納されます。リクエストが失敗した場合、テーブルは作成されず、データは読み込まれません。
    • status.errorResult が存在しない場合、ジョブは正常に完了していますが、一部の行のインポートで問題があったなど、致命的でないエラーが発生した可能性があります。致命的でないエラーは、返されたジョブ オブジェクトの status.errors プロパティに格納されています。

API に関する注:

  • 読み込みジョブはアトミックで整合性があります。読み込みジョブが失敗した場合、データは一切利用できず、読み込みジョブが成功した場合はすべてのデータが利用可能になります。

  • おすすめの方法として、jobs.insert を呼び出して読み込みジョブを作成する際に、一意の ID を生成して、その ID を jobReference.jobId として渡すようにします。この手法を使用すると、ネットワーク障害時にクライアントは既知のジョブ ID を使ってポーリングまたは再試行できるので、頑健性が向上します。

  • 同じジョブ ID に対して jobs.insert を呼び出しても結果は同じになります。同じジョブ ID で何回でも再試行できますが、成功するのは、その中で 1 回だけです。

Go

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Go の手順に沿って設定を行ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquet demonstrates loading Apache Parquet data from Cloud Storage into a table.
func importParquet(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Java の設定手順を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquet {

  public static void runLoadParquet() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquet(datasetName);
  }

  public static void loadParquet(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet loaded successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("GCS Parquet was not loaded. \n" + e.toString());
    }
  }
}

Node.js

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Node.js の手順に沿って設定を行ってください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the Parquet file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.parquet
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadTableGCSParquet() {
  // Imports a GCS file into a table with Parquet source format.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある PHP の設定手順を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET');
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Python の設定手順を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

Client.load_table_from_uri() メソッドを使用して、Cloud Storage から読み込みジョブを開始します。Parquet を使用するには、LoadJobConfig.source_format プロパティを文字列 PARQUET に設定し、ジョブ構成を load_table_from_uri() メソッドの job_config 引数として渡します。
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.PARQUET,
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Parquet データでのテーブルの追加または上書き

テーブルに追加のデータを読み込むには、ソースファイルを使用するか、クエリ結果を追加します。

コンソールでは、[書き込み設定] オプションを使用して、ソースファイルやクエリ結果からデータを読み込むときに行う操作を指定します。

追加のデータをテーブルに読み込む場合、以下のオプションがあります。

Console のオプション bq ツールのフラグ BigQuery API のプロパティ 説明
空の場合に書き込む 非対応 WRITE_EMPTY テーブルが空の場合にのみデータを書き込みます。
テーブルに追加する --noreplace または --replace=false--[no]replace を指定しない場合、デフォルトは追加) WRITE_APPEND デフォルト)テーブルの末尾にデータを追加します。
テーブルを上書きする --replace または --replace=true WRITE_TRUNCATE 新しいデータを書き込む前に、テーブル内の既存のデータをすべて消去します。この操作を行うと、テーブル スキーマと Cloud KMS 鍵も削除されます。

既存のテーブルにデータを読み込む場合、読み込みジョブでデータの追加やテーブルの上書きを行うことができます。

次のいずれかの方法で、テーブルを追加または上書きできます。

  • コンソール
  • bq コマンドライン ツールの bq load コマンド
  • jobs.insert API メソッドと load ジョブの構成
  • クライアント ライブラリ

Parquet データをテーブルに追加または上書きするには、次の手順を行います。

コンソール

  1. コンソールで、[BigQuery] ページに移動します。

    BigQuery に移動

  2. [エクスプローラ] ペインでプロジェクトを展開し、データセットを選択します。
  3. [データセット情報] セクションで、 [テーブルを作成] をクリックします。
  4. [テーブルの作成] パネルで、次の詳細を指定します。
    1. [ソース] セクションの [テーブルの作成元] リストで [Google Cloud Storage] を選択します。次に、以下の操作を行います。
      1. Cloud Storage バケットからファイルを選択するか、Cloud Storage URI を入力します。コンソールで複数の URI を指定することはできませんが、ワイルドカードはサポートされています。Cloud Storage バケットは、作成、追加、または上書きするテーブルを含むデータセットと同じロケーションに存在している必要があります。BigQuery テーブルを作成するためのソースファイルを選択する
      2. [ファイル形式] で、[Parquet] を選択します。
    2. [送信先] セクションで、次の詳細を指定します。
      1. [データセット] で、テーブルを作成するデータセットを選択します。
      2. [テーブル] フィールドに、作成するテーブルの名前を入力します。
      3. [テーブルタイプ] フィールドが [ネイティブ テーブル] に設定されていることを確認します。
    3. [スキーマ] セクションでは、何もする必要はありません。スキーマは、Parquet ファイルで自己記述されます。
    4. 省略可: [パーティションとクラスタの設定] を指定します。詳細については、パーティション分割テーブルの作成クラスタ化テーブルの作成と使用をご覧ください。追加や上書きではテーブルをパーティション分割テーブルまたはクラスタ化テーブルに変換できません。コンソールでは、読み込みジョブでパーティション分割テーブルやクラスタ化テーブルの追加または上書きを行うことはできません。
    5. [詳細オプション] をクリックして、次の操作を行います。
      • [書き込み設定] で、[テーブルに追加する] または [テーブルを上書きする] を選択します。
      • テーブルのスキーマに存在しない行の値を無視する場合は、[不明な値] を選択します。
      • Cloud Key Management Service 鍵を使用するには、[暗号化] で [顧客管理の暗号鍵] をクリックします。[Google が管理する暗号鍵] の設定をそのままにすると、BigQuery は保存されているデータを暗号化します。
    6. [テーブルを作成] をクリックします。

SQL

LOAD DATA DDL ステートメントを使用します。 次の例では、テーブル mytable に Parquet ファイルを追加します。

  1. コンソールで、BigQuery ページに移動します。

    BigQuery に移動

  2. クエリエディタで次のステートメントを入力します。

    LOAD DATA INTO mydataset.mytable
    FROM FILES (
      format = 'PARQUET',
      uris = ['gs://bucket/path/file.parquet']);
    

  3. [ 実行] をクリックします。

クエリの実行方法については、インタラクティブ クエリの実行をご覧ください。

bq

テーブルを上書きするには、--replace フラグを指定して bq load コマンドを入力します。テーブルにデータを追加するには、--noreplace フラグを使用します。フラグを指定しない場合、デフォルトではデータが追加されます。--source_format フラグを指定し、PARQUET に設定します。Parquet スキーマは自己記述型ソースデータから自動的に取得されるため、スキーマ定義を指定する必要はありません。

(省略可)--location フラグを指定して、その値をロケーションに設定します。

次のフラグを使用することもできます。

  • --destination_kms_key: テーブルデータの暗号化に使用される Cloud KMS 鍵。
bq --location=LOCATION load \
--[no]replace \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

次のように置き換えます。

  • location: ロケーション--location フラグは省略可能です。ロケーションのデフォルト値は、.bigqueryrc ファイルを使用して設定できます。
  • format: PARQUET
  • dataset: 既存のデータセット。
  • table: データの読み込み先のテーブル名。
  • path_to_source: 完全修飾の Cloud Storage URI または URI のカンマ区切りのリスト。ワイルドカードも使用できます。

例:

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable というテーブルを上書きします。

    bq load \
    --replace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable というテーブルに追加します。

    bq load \
    --noreplace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

bq コマンドライン ツールを使用したパーティション分割テーブルに対する追加と上書きについては、パーティション分割テーブルのデータに対する追加と上書きをご覧ください。

API

  1. Cloud Storage のソースデータを参照する load ジョブを作成します。

  2. (省略可)ジョブリソースjobReference セクションにある location プロパティでロケーションを指定します。

  3. source URIs プロパティは、完全修飾の gs://BUCKET/OBJECT の形式にする必要があります。複数の URI をカンマ区切りのリストとして含めることができます。ワイルドカードも使用できます。

  4. configuration.load.sourceFormat プロパティを PARQUET に設定して、データ形式を指定します。

  5. configuration.load.writeDisposition プロパティを WRITE_TRUNCATE または WRITE_APPEND に設定して、書き込み設定を指定します。

Go

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Go の手順に沿って設定を行ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquetTruncate demonstrates loading Apache Parquet data from Cloud Storage into a table
// and overwriting/truncating existing data in the table.
func importParquetTruncate(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Java の設定手順を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。


import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquetReplaceTable {

  public static void runLoadParquetReplaceTable() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquetReplaceTable(datasetName);
  }

  public static void loadParquetReplaceTable(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Imports a GCS file into a table and overwrites table data if table already exists.
      // This sample loads CSV file at:
      // https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      // For more information on LoadJobConfiguration see:
      // https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/LoadJobConfiguration.Builder.html
      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              // Set the write disposition to overwrite existing table data.
              .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Load data from a GCS parquet file into the table
      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load into the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet overwrote existing table successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Table extraction job was interrupted. \n" + e.toString());
    }
  }
}

Node.js

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Node.js の手順に沿って設定を行ってください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadParquetFromGCSTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = "my_dataset";
  // const tableId = "my_table";

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    // Set the write disposition to overwrite existing table data.
    writeDisposition: 'WRITE_TRUNCATE',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある PHP の設定手順を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableID = 'The BigQuery table ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET')->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);

// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});

// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Python の設定手順を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

既存のテーブルの行を置換するには、LoadJobConfig.write_disposition プロパティWRITE_TRUNCATE に設定します。
import io

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
)

body = io.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.PARQUET,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Hive パーティション分割 Parquet データの読み込み

BigQuery では、Cloud Storage に保管されている Hive パーティション分割 Parquet データを読み取り可能であり、宛先 BigQuery マネージド テーブルの列として Hive パーティショニング列を取り込みます。詳細については、外部パーティション分割データの読み込みをご覧ください。

Parquet の変換

このセクションでは、Parquet データを読み込むときに BigQuery がさまざまなデータ型を解析する方法について説明します。

一部の Parquet データ型(INT32INT64BYTE_ARRAYFIXED_LEN_BYTE_ARRAY など)は、複数の BigQuery データ型に変換できます。BigQuery が Parquet データ型を正しく変換できるようにするには、Parquet ファイルで適切なデータ型を指定します。

たとえば、Parquet INT32 データ型を BigQuery DATE データ型に変換するには、次のように指定します。

optional int32 date_col (DATE);

BigQuery は、Parquet データ型を以下のセクションで説明する BigQuery データ型に変換します。

型変換

Parquet の型 Parquet の論理型 BigQuery のデータ型
BOOLEAN なし BOOLEAN
INT32 なし、INTEGERUINT_8UINT_16UINT_32INT_8INT_16INT_32 INTEGER
INT32 DECIMAL NUMERIC、BIGNUMERIC、または STRING
INT32 DATE DATE
INT64 なし、INTEGERUINT_64INT_64 INTEGER
INT64 DECIMAL NUMERIC、BIGNUMERIC、または STRING
INT64 TIMESTAMPprecision=MILLISTIMESTAMP_MILLIS TIMESTAMP
INT64 TIMESTAMPprecision=MICROSTIMESTAMP_MICROS TIMESTAMP
INT96 なし TIMESTAMP
FLOAT なし FLOAT
DOUBLE なし FLOAT
BYTE_ARRAY なし BYTES
BYTE_ARRAY STRINGUTF8 STRING
FIXED_LEN_BYTE_ARRAY DECIMAL NUMERIC、BIGNUMERIC、または STRING
FIXED_LEN_BYTE_ARRAY なし BYTES

ネストされたグループは、STRUCT 型に変換されます。Parquet の型と変換される型の他の組み合わせはサポートされていません。

符号なし論理型

Parquet UINT_8UINT_16UINT_32UINT_64 型は符号なしです。 BigQuery は、BigQuery の符号付き INTEGER 列に読み込むときに、これらの型の値を符号なしとして扱います。UINT_64 の場合、符号なし値が INTEGER の最大値 9,223,372,036,854,775,807 を超えるとエラーが返されます。

decimal 論理型

Decimal の論理型は、NUMERICBIGNUMERICSTRING の型に変換できます。変換される型は、decimal 論理型の精度とスケールのパラメータ、また指定されたターゲットの固定小数点型によって異なります。ターゲットの固定小数点型は次のように指定します。

Enum の論理型

Enum の論理型は、STRING または BYTES に変換できます。ターゲットの変換された型は次のように指定します。

LIST の論理型

Parquet の LIST 論理型でスキーマ推定を有効にできます。BigQuery は、LIST ノードが標準形式か、下位互換性ルールに記載されている形式かをチェックします。

// standard form
<optional | required> group <name> (LIST) {
  repeated group list {
    <optional | required> <element-type> element;
  }
}

標準形式である場合、変換されたスキーマの LIST ノードに対応するフィールドは、ノードに次のスキーマがあるものとして処理されます。

repeated <element-type> <name>

ノード「list」と「element」は省略されます。

列名の変換

列名には、英字(a-z、A-Z)、数字(0-9)、アンダースコア(_)のみを使用できます。先頭文字は英字またはアンダースコアにする必要があります。列名の最大長は 300 文字です。列名には、次のいずれの接頭辞も使用できません。

  • _TABLE_
  • _FILE_
  • _PARTITION
  • _ROW_TIMESTAMP
  • __ROOT__
  • _COLIDENTIFIER

大文字と小文字が異なっている場合でも、重複する列名は使用できません。たとえば、Column1 という列は column1 という列と同じとみなされます。

列名にピリオド(.)を持つ列を含む Parquet ファイルを読み込むことはできません。

Parquet の列名に他の文字(ピリオド以外)が含まれている場合、その文字はアンダースコアに置き換えられます。列名の末尾のアンダースコアを追加すると、競合を回避できます。たとえば、Parquet ファイルに Column1column1 の 2 つの列が含まれている場合、それらの列はそれぞれ Column1column1_ として読み込まれます。