Cloud Storage からの Parquet データの読み込み

このページでは、Cloud Storage から BigQuery への Parquet データの読み込みの概要を説明します。

Parquet は、Apache Hadoop エコシステムで広く使用されているオープンソースの列指向のデータ形式です。

Parquet データを Cloud Storage から読み込む際に、新しいテーブルまたはパーティションにデータを読み込むことができます。また、既存のテーブルまたはパーティションに対してデータの追加や上書きを行うこともできます。BigQuery に読み込まれたデータは Capacitor の列型(BigQuery のストレージ形式)に変換されます。

Cloud Storage から BigQuery テーブルにデータを読み込むとき、テーブルを含むデータセットが Cloud Storage バケットと同じリージョンまたはマルチリージョンのロケーションに存在している必要があります。

ローカル ファイルから Parquet データを読み込む方法については、ローカル ファイルからのデータの読み込みをご覧ください。

Parquet のスキーマ

Parquet ファイルを BigQuery に読み込むと、自己記述型ソースデータから自動的にテーブル スキーマが取得されます。BigQuery がソースデータからスキーマを取得する際は、アルファベット順で最後のファイルが使用されます。

たとえば、Cloud Storage に次の Parquet ファイルがあるとします。

gs://mybucket/00/
  a.parquet
  z.parquet
gs://mybucket/01/
  b.parquet

bq コマンドライン ツールでこのコマンドを実行すると、すべてのファイルがカンマ区切りのリストとして読み込まれ、mybucket/01/b.parquet からスキーマが取得されます。

bq load \
--source_format=PARQUET \
dataset.table \
"gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

異なるスキーマを持つ複数の Parquet ファイルを読み込む場合、複数のスキーマで指定された同一の列は、各スキーマ定義内で同じモードである必要があります。

BigQuery がスキーマを検出すると、一部の Parquet データ型は、BigQuery SQL 構文に対応するように BigQuery データ型に変換されます。詳細については、Parquet の変換をご覧ください。

Parquet 圧縮

BigQuery は、Parquet ファイル内のデータブロックに対して次の圧縮コードをサポートしています。

  • Snappy
  • GZip
  • LZO_1C and LZO_1X

必要な権限

BigQuery にデータを読み込むには、読み込みジョブを実行する権限が必要です。また、新規または既存の BigQuery テーブルやパーティションへのデータの読み込みが可能な権限も必要です。Cloud Storage からデータを読み込む場合は、データを含むバケットに対するアクセス権限も必要です。

BigQuery の権限

BigQuery にデータを読み込むには、少なくとも以下の権限が必要です。これらの権限は、データを新しいテーブルまたはパーティションに読み込む場合や、テーブルまたはパーティションに対してデータの追加や上書きを行う場合に必要になります。

  • bigquery.tables.create
  • bigquery.tables.updateData
  • bigquery.jobs.create

bigquery.tables.create 権限および bigquery.tables.updateData 権限はいずれも、事前定義された以下の IAM ロールに含まれています。

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

次の事前定義済みの IAM ロールには bigquery.jobs.create 権限が含まれています。

  • bigquery.user
  • bigquery.jobUser
  • bigquery.admin

また、bigquery.datasets.create 権限を持つユーザーがデータセットを作成すると、そのデータセットに対する bigquery.dataOwner アクセス権がユーザーに付与されます。bigquery.dataOwner アクセス権により、読み込みジョブを使用してデータセット内のテーブルを作成または更新できます。

BigQuery での IAM のロールと権限の詳細については、アクセス制御をご覧ください。

Cloud Storage の権限

Cloud Storage バケットからデータを読み込むには、storage.objects.get 権限が付与されている必要があります。URI のワイルドカードを使用する場合は storage.objects.list 権限も必要です。

事前定義された IAM ロール storage.objectViewer を付与して、storage.objects.get 権限と storage.objects.list 権限の両方を与えることができます。

Parquet データの新しいテーブルへの読み込み

次のいずれかの方法で、Parquet データを新しいテーブルに読み込むことができます。

  • Cloud Console
  • bq コマンドライン ツールの bq load コマンド
  • jobs.insert API メソッドと load ジョブの構成
  • クライアント ライブラリ

Parquet データを Cloud Storage から新しい BigQuery テーブルに読み込むには:

Console

  1. Cloud Console で [BigQuery] ページを開きます。

    [BigQuery] ページに移動

  2. ナビゲーション パネルの [リソース] セクションで、Google Cloud プロジェクトを展開し、データセットを選択します。

  3. ウィンドウの右側の詳細パネルで、[テーブルを作成] をクリックします。データを読み込むプロセスは、空のテーブルを作成するプロセスと同じです。

    テーブルの作成。

  4. [テーブルの作成] ページの [ソース] セクションで、次の操作を行います。

    • [テーブルの作成元] で [Cloud Storage] を選択します。

    • ソース フィールドで Cloud Storage URI を参照するかまたは入力します。Cloud Console で複数の URI を指定することはできませんが、ワイルドカードはサポートされています。Cloud Storage バケットは、作成するテーブルを含むデータセットと同じロケーションに存在する必要があります。

      ファイルを選択

    • [ファイル形式] で、[Parquet] を選択します。

  5. [テーブルの作成] ページの [送信先] セクションで、次の操作を行います。

    • [データセット名] で、該当するデータセットを選択します。

      データセットを表示

    • [テーブルタイプ] が [ネイティブ テーブル] に設定されていることを確認します。

    • [テーブル名] フィールドに、BigQuery で作成するテーブルの名前を入力します。

  6. [スキーマ] セクションでは、何もする必要はありません。スキーマは、Parquet ファイルで自己記述されます。

  7. (省略可)テーブルを分割するには、[パーティションとクラスタの設定] で次のオプションを選択します。

    • パーティション分割テーブルを作成するには、[パーティショニングなし] をクリックして [フィールドにより分割] を選択し、DATE または TIMESTAMP の列を選択します。スキーマに DATE または TIMESTAMP の列が含まれていない場合、このオプションは使用できません。
    • 取り込み時間パーティション分割テーブルを作成するには、[パーティショニングなし] をクリックして [取り込み時間により分割] を選択します。
  8. (省略可)クエリを実行するパーティションを指定する WHERE 句の使用を必須にするには、[パーティショニング フィルタ] で [パーティション フィルタを要求] ボックスをクリックします。パーティション フィルタを必須にすると、コストが削減され、パフォーマンスが向上する場合があります。詳細については、パーティション分割テーブルのクエリをご覧ください。[パーティショニングなし] を選択している場合、このオプションは使用できません。

  9. (省略可)テーブルをクラスタ化するには、[クラスタリング順序] ボックスに 1~4 個のフィールド名を入力します。

  10. (省略可)[詳細オプション] をクリックします。

    • [書き込み設定] で、[空の場合に書き込む] を選択したままにします。これにより、新しいテーブルが作成され、データが読み込まれます。
    • [許可されているエラー数] で、デフォルト値の 0 を使用するか、無視できる最大行数を入力します。エラーを含む行数がこの値を超えると、ジョブは invalid メッセージとなり、失敗します。
    • [不明な値] で [不明な値を無視する] をオフのままにします。このオプションは、CSV ファイルと JSON ファイルにのみ適用されます。
    • Cloud Key Management Service 鍵を使用するには、[暗号化] で [お客様が管理する鍵] クリックします。[Google が管理する鍵] の設定をそのままにすると、BigQuery は保存されているデータを暗号化します。
  11. [テーブルを作成] をクリックします。

bq

bq load コマンドを使用します。--source_format フラグを使用して PARQUET を指定し、Cloud Storage URI を設定します。単一の URI、URI のカンマ区切りのリスト、ワイルドカードを含む URI を指定できます。

(省略可)--location フラグを指定して、その値をロケーションに設定します。

次のフラグを使用することもできます。

  • --max_bad_records: ジョブ全体が失敗する前に許容される不良レコードの最大数を指定する整数。デフォルト値は 0 です。--max_bad_records の値にかかわらず、最大で 5 つの任意のタイプのエラーが返されます。
  • --time_partitioning_type: テーブルでの時間ベースのパーティショニングを有効にし、パーティション タイプを設定します。現在、唯一の有効な値は、1 日に 1 つのパーティションを生成する DAY です。DATE または TIMESTAMP の列でパーティション分割されたテーブルを作成する場合、このフラグは省略可能です。
  • --time_partitioning_expiration時間ベースのパーティションを削除する必要があるタイミングを指定する整数(秒単位)。パーティションの日付(UTC)に、この整数値を足した値が有効期限になります。
  • --time_partitioning_field: パーティション分割テーブルの作成に使用される DATE または TIMESTAMP の列。この値を指定せずに時間ベースのパーティショニングを有効にすると、取り込み時間パーティション分割テーブルが作成されます。
  • --require_partition_filter: 有効にすると、クエリの実行時に WHERE 句でパーティションを指定するようユーザーに求めます。パーティション フィルタを必須にすると、コストが削減され、パフォーマンスが向上する場合があります。詳細については、パーティション分割テーブルのクエリをご覧ください。
  • --clustering_fields: クラスタ化テーブルの作成に使用する列名のカンマ区切りのリスト。最大 4 個の列名を指定できます。
  • --destination_kms_key: テーブルデータの暗号化に使用される Cloud KMS 鍵。

    パーティション分割テーブルの詳細については、以下をご覧ください。

    クラスタ化テーブルの詳細については、以下をご覧ください。

    テーブルの暗号化の詳細については、以下をご覧ください。

Parquet データを BigQuery に読み込むには、次のコマンドを入力します。

bq --location=LOCATION load \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

次のように置き換えます。

  • LOCATION: ロケーション。--location フラグは省略可能です。たとえば、BigQuery を東京リージョンで使用している場合は、このフラグの値を asia-northeast1 に設定します。.bigqueryrc ファイルを使用してロケーションのデフォルト値を設定できます。
  • FORMAT: PARQUET
  • DATASET: 既存のデータセット。
  • TABLE: データの読み込み先のテーブル名。
  • PATH_TO_SOURCE: 完全修飾の Cloud Storage URI または URI のカンマ区切りのリスト。ワイルドカードも使用できます。

例:

次のコマンドは、gs://mybucket/mydata.parquet から、mydataset 内の mytable というテーブルにデータを読み込みます。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/mydata.parquet から mydataset 内の mytable という取り込み時間パーティション分割テーブルにデータを読み込みます。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_type=DAY \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable というパーティション分割テーブルに追加します。テーブルは mytimestamp 列でパーティション分割されます。

    bq load \
    --source_format=PARQUET \
    --time_partitioning_field mytimestamp \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/ の複数のファイルから mydataset 内の mytable という名前のテーブルにデータを読み込みます。Cloud Storage の URI ではワイルドカードを使用しています。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata*.parquet

次のコマンドは、gs://mybucket/ の複数のファイルから mydataset 内の mytable という名前のテーブルにデータを読み込みます。このコマンドでは、Cloud Storage の URI のカンマ区切りのリストをワイルドカード付きで使用しています。

    bq load \
    --source_format=PARQUET \
    mydataset.mytable \
    "gs://mybucket/00/*.parquet","gs://mybucket/01/*.parquet"

API

  1. Cloud Storage のソースデータを参照する load ジョブを作成します。

  2. (省略可)ジョブリソースjobReference セクションにある location プロパティでロケーションを指定します。

  3. source URIs プロパティは、完全修飾の gs://BUCKET/OBJECT の形式にする必要があります。各 URI にワイルドカード文字(*)を 1 つ含めることができます。

  4. sourceFormat プロパティを PARQUET に設定して、Parquet データ形式を指定します。

  5. ジョブのステータスを確認するには、jobs.get(JOB_ID*) を呼び出します。JOB_ID は、最初のリクエストで返されるジョブの ID で置き換えます。

    • status.state = DONE である場合、ジョブは正常に完了しています。
    • status.errorResult プロパティが存在する場合は、リクエストが失敗したことを意味し、該当するオブジェクトにエラーを説明する情報が格納されます。リクエストが失敗した場合、テーブルは作成されず、データは読み込まれません。
    • status.errorResult が存在しない場合、ジョブは正常に完了していますが、一部の行のインポートで問題があったなど、致命的でないエラーが発生した可能性があります。致命的でないエラーは、返されたジョブ オブジェクトの status.errors プロパティに格納されています。

API に関する注:

  • 読み込みジョブはアトミックで整合性があります。読み込みジョブが失敗した場合、データは一切利用できず、読み込みジョブが成功した場合はすべてのデータが利用可能になります。

  • おすすめの方法として、jobs.insert を呼び出して読み込みジョブを作成する際に、一意の ID を生成して、その ID を jobReference.jobId として渡すようにします。この手法を使用すると、ネットワーク障害時にクライアントは既知のジョブ ID を使ってポーリングまたは再試行できるので、頑健性が向上します。

  • 同じジョブ ID に対して jobs.insert を呼び出しても結果は同じになります。同じジョブ ID で何回でも再試行できますが、成功するのは、その中で 1 回だけです。

Go

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Go の手順に従って設定を行ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquet demonstrates loading Apache Parquet data from Cloud Storage into a table.
func importParquet(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquet {

  public static void runLoadParquet() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquet(datasetName);
  }

  public static void loadParquet(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet loaded successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("GCS Parquet was not loaded. \n" + e.toString());
    }
  }
}

Node.js

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the Parquet file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.parquet
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadTableGCSParquet() {
  // Imports a GCS file into a table with Parquet source format.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);

  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId  = 'The Google project ID';
// $datasetId  = 'The BigQuery dataset ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table('us_states');

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET');
$job = $table->runJob($loadConfig);
// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});
// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Python の手順に従って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

Client.load_table_from_uri() メソッドを使用して、Cloud Storage から読み込みジョブを開始します。Parquet を使用するには、LoadJobConfig.source_format プロパティを SourceFormat 定数 PARQUET に設定し、ジョブ構成を load_table_from_uri() メソッドの job_config 引数として渡します。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name"

job_config = bigquery.LoadJobConfig(source_format=bigquery.SourceFormat.PARQUET,)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"

load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Parquet データでのテーブルの追加または上書き

テーブルに追加のデータを読み込むには、ソースファイルを使用するか、クエリ結果を追加します。

Cloud Console では、[書き込み設定] オプションを使用して、ソースファイルやクエリ結果からデータを読み込むときに行う操作を指定します。

追加のデータをテーブルに読み込む場合、以下のオプションがあります。

Console のオプション bq ツールのフラグ BigQuery API のプロパティ 説明
Write if empty なし WRITE_EMPTY テーブルが空の場合にのみデータを書き込みます。
テーブルに追加する --noreplace または --replace=false--[no]replace を指定しない場合、デフォルトは追加) WRITE_APPEND (デフォルト)テーブルの末尾にデータを追加します。
テーブルを上書きする --replace または --replace=true WRITE_TRUNCATE 新しいデータを書き込む前に、テーブル内の既存のデータをすべて消去します。この操作を行うと、テーブル スキーマと Cloud KMS 鍵も削除されます。

既存のテーブルにデータを読み込む場合、読み込みジョブでデータの追加やテーブルの上書きを行うことができます。

次のいずれかの方法で、テーブルを追加または上書きできます。

  • Cloud Console
  • bq コマンドライン ツールの bq load コマンド
  • jobs.insert API メソッドと load ジョブの構成
  • クライアント ライブラリ

Parquet データをテーブルに追加または上書きするには、次の手順を行います。

Console

  1. Cloud Console で [BigQuery] ページを開きます。

    [BigQuery] ページに移動

  2. ナビゲーション パネルの [リソース] セクションで Cloud プロジェクトを展開し、データセットを選択します。

  3. 詳細パネルで [テーブルを作成] をクリックします。読み込みジョブでデータを追加または上書きするプロセスは、読み込みジョブでテーブルを作成するプロセスと同じです。

    テーブルの作成

  4. [テーブルの作成] ページの [ソース] セクションで、次の操作を行います。

    • [テーブルの作成元] で [Cloud Storage] を選択します。

    • ソース フィールドで Cloud Storage URI を参照するかまたは入力します。Cloud Console で複数の URI を指定することはできませんが、ワイルドカードはサポートされています。Cloud Storage バケットは、データを追加または上書きするテーブルを含むデータセットと同じロケーションに存在している必要があります。

      ファイルを選択

    • [ファイル形式] で、[Parquet] を選択します。

  5. [テーブルの作成] ページの [送信先] セクションで、次の操作を行います。

    • [データセット名] で、該当するデータセットを選択します。

      データセットの選択

    • [テーブル名] フィールドに、BigQuery で追加または上書きするテーブルの名前を入力します。

    • [テーブルタイプ] が [ネイティブ テーブル] に設定されていることを確認します。

  6. [スキーマ] セクションでは、何もする必要はありません。スキーマは、Parquet ファイルで自己記述されます。

  7. [パーティションとクラスタの設定] はデフォルト値のままにします。追加や上書きではテーブルをパーティション分割テーブルまたはクラスタ化テーブルに変換できません。Cloud Console では、読み込みジョブでパーティション分割テーブルやクラスタ化テーブルの追加または上書きを行うことはできません。

  8. [詳細オプション] をクリックします。

    • [書き込み設定] で、[テーブルに追加する] または [テーブルを上書きする] を選択します。
    • [許可されているエラー数] で、デフォルト値の 0 を使用するか、無視できる最大行数を入力します。エラーを含む行数がこの値を超えると、ジョブは invalid メッセージとなり、失敗します。
    • [不明な値] で [不明な値を無視する] をオフのままにします。このオプションは、CSV ファイルと JSON ファイルにのみ適用されます。
    • Cloud Key Management Service 鍵を使用するには、[暗号化] で [お客様が管理する鍵] クリックします。[Google が管理する鍵] の設定をそのままにすると、BigQuery は保存されているデータを暗号化します。

      テーブルを上書きする

  9. [テーブルを作成] をクリックします。

bq

テーブルを上書きするには、--replace フラグを指定して bq load コマンドを入力します。テーブルにデータを追加するには、--noreplace フラグを使用します。フラグを指定しない場合、デフォルトではデータが追加されます。--source_format フラグを指定し、PARQUET に設定します。Parquet スキーマは自己記述型ソースデータから自動的に取得されるため、スキーマ定義を指定する必要はありません。

(省略可)--location フラグを指定して、その値をロケーションに設定します。

次のフラグを使用することもできます。

  • --max_bad_records: ジョブ全体が失敗する前に許容される不良レコードの最大数を指定する整数。デフォルト値は 0 です。--max_bad_records の値にかかわらず、最大で 5 つの任意のタイプのエラーが返されます。
  • --destination_kms_key: テーブルデータの暗号化に使用される Cloud KMS 鍵。
bq --location=LOCATION load \
--[no]replace \
--source_format=FORMAT \
DATASET.TABLE \
PATH_TO_SOURCE

次のように置き換えます。

  • location: ロケーション--location フラグは省略可能です。ロケーションのデフォルト値は、.bigqueryrc ファイルを使用して設定できます。
  • format: PARQUET
  • dataset: 既存のデータセット。
  • table: データの読み込み先のテーブル名。
  • path_to_source: 完全修飾の Cloud Storage URI または URI のカンマ区切りのリスト。ワイルドカードも使用できます。

例:

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable というテーブルを上書きします。

    bq load \
    --replace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

次のコマンドは、gs://mybucket/mydata.parquet からデータを読み込んで mydataset 内の mytable というテーブルに追加します。

    bq load \
    --noreplace \
    --source_format=PARQUET \
    mydataset.mytable \
    gs://mybucket/mydata.parquet

bq コマンドライン ツールを使用したパーティション分割テーブルに対する追加と上書きについては、パーティション分割テーブルのデータに対する追加と上書きをご覧ください。

API

  1. Cloud Storage のソースデータを参照する load ジョブを作成します。

  2. (省略可)ジョブリソースjobReference セクションにある location プロパティでロケーションを指定します。

  3. source URIs プロパティは、完全修飾の gs://BUCKET/OBJECT の形式にする必要があります。複数の URI をカンマ区切りのリストとして含めることができます。ワイルドカードも使用できます。

  4. configuration.load.sourceFormat プロパティを PARQUET に設定して、データ形式を指定します。

  5. configuration.load.writeDisposition プロパティを WRITE_TRUNCATE または WRITE_APPEND に設定して、書き込み設定を指定します。

Go

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Go の手順に従って設定を行ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// importParquetTruncate demonstrates loading Apache Parquet data from Cloud Storage into a table
// and overwriting/truncating existing data in the table.
func importParquetTruncate(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	gcsRef := bigquery.NewGCSReference("gs://cloud-samples-data/bigquery/us-states/us-states.parquet")
	gcsRef.SourceFormat = bigquery.Parquet
	gcsRef.AutoDetect = true
	loader := client.Dataset(datasetID).Table(tableID).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}
	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if status.Err() != nil {
		return fmt.Errorf("job completed with error: %v", status.Err())
	}
	return nil
}

Java

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Java の設定手順を実施してください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。


import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import java.math.BigInteger;

public class LoadParquetReplaceTable {

  public static void runLoadParquetReplaceTable() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    loadParquetReplaceTable(datasetName);
  }

  public static void loadParquetReplaceTable(String datasetName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Imports a GCS file into a table and overwrites table data if table already exists.
      // This sample loads CSV file at:
      // https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
      String sourceUri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet";
      TableId tableId = TableId.of(datasetName, "us_states");

      // For more information on LoadJobConfiguration see:
      // https://googleapis.dev/java/google-cloud-clients/latest/com/google/cloud/bigquery/LoadJobConfiguration.Builder.html
      LoadJobConfiguration configuration =
          LoadJobConfiguration.builder(tableId, sourceUri)
              .setFormatOptions(FormatOptions.parquet())
              // Set the write disposition to overwrite existing table data.
              .setWriteDisposition(WriteDisposition.WRITE_TRUNCATE)
              .build();

      // For more information on Job see:
      // https://googleapis.dev/java/google-cloud-clients/latest/index.html?com/google/cloud/bigquery/package-summary.html
      // Load the table
      Job job = bigquery.create(JobInfo.of(configuration));

      // Load data from a GCS parquet file into the table
      // Blocks until this load table job completes its execution, either failing or succeeding.
      Job completedJob = job.waitFor();
      if (completedJob == null) {
        System.out.println("Job not executed since it no longer exists.");
        return;
      } else if (completedJob.getStatus().getError() != null) {
        System.out.println(
            "BigQuery was unable to load into the table due to an error: \n"
                + job.getStatus().getError());
        return;
      }

      // Check number of rows loaded into the table
      BigInteger numRows = bigquery.getTable(tableId).getNumRows();
      System.out.printf("Loaded %d rows. \n", numRows);

      System.out.println("GCS parquet overwrote existing table successfully.");
    } catch (BigQueryException | InterruptedException e) {
      System.out.println("Table extraction job was interrupted. \n" + e.toString());
    }
  }
}

Node.js

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Node.js の設定手順を実施してください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。

// Import the Google Cloud client libraries
const {BigQuery} = require('@google-cloud/bigquery');
const {Storage} = require('@google-cloud/storage');

// Instantiate clients
const bigquery = new BigQuery();
const storage = new Storage();

/**
 * This sample loads the CSV file at
 * https://storage.googleapis.com/cloud-samples-data/bigquery/us-states/us-states.csv
 *
 * TODO(developer): Replace the following lines with the path to your file.
 */
const bucketName = 'cloud-samples-data';
const filename = 'bigquery/us-states/us-states.parquet';

async function loadParquetFromGCSTruncate() {
  /**
   * Imports a GCS file into a table and overwrites
   * table data if table already exists.
   */

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = "my_dataset";
  // const tableId = "my_table";

  // Configure the load job. For full list of options, see:
  // https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad
  const metadata = {
    sourceFormat: 'PARQUET',
    // Set the write disposition to overwrite existing table data.
    writeDisposition: 'WRITE_TRUNCATE',
    location: 'US',
  };

  // Load data from a Google Cloud Storage file into the table
  const [job] = await bigquery
    .dataset(datasetId)
    .table(tableId)
    .load(storage.bucket(bucketName).file(filename), metadata);
  // load() waits for the job to finish
  console.log(`Job ${job.id} completed.`);

  // Check the job's status for errors
  const errors = job.status.errors;
  if (errors && errors.length > 0) {
    throw errors;
  }
}

PHP

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;
use Google\Cloud\Core\ExponentialBackoff;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableID = 'The BigQuery table ID';

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$table = $bigQuery->dataset($datasetId)->table($tableId);

// create the import job
$gcsUri = 'gs://cloud-samples-data/bigquery/us-states/us-states.parquet';
$loadConfig = $table->loadFromStorage($gcsUri)->sourceFormat('PARQUET')->writeDisposition('WRITE_TRUNCATE');
$job = $table->runJob($loadConfig);

// poll the job until it is complete
$backoff = new ExponentialBackoff(10);
$backoff->execute(function () use ($job) {
    print('Waiting for job to complete' . PHP_EOL);
    $job->reload();
    if (!$job->isComplete()) {
        throw new Exception('Job has not yet completed', 500);
    }
});

// check if the job has errors
if (isset($job->info()['status']['errorResult'])) {
    $error = $job->info()['status']['errorResult']['message'];
    printf('Error running job: %s' . PHP_EOL, $error);
} else {
    print('Data imported successfully' . PHP_EOL);
}

Python

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Python の手順に従って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

既存のテーブルの行を置き換えるには、LoadJobConfig.write_disposition プロパティを WriteDisposition 定数 WRITE_TRUNCATE に設定します。

import six

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

job_config = bigquery.LoadJobConfig(
    schema=[
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("post_abbr", "STRING"),
    ],
)

body = six.BytesIO(b"Washington,WA")
client.load_table_from_file(body, table_id, job_config=job_config).result()
previous_rows = client.get_table(table_id).num_rows
assert previous_rows > 0

job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    source_format=bigquery.SourceFormat.PARQUET,
)

uri = "gs://cloud-samples-data/bigquery/us-states/us-states.parquet"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.

load_job.result()  # Waits for the job to complete.

destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

Hive パーティション分割 Parquet データの読み込み

BigQuery では、Cloud Storage に保管されている Hive パーティション分割 Parquet データを読み取り可能であり、宛先 BigQuery マネージド テーブルの列として Hive パーティショニング列を取り込みます。詳細については、外部パーティション分割データの読み込みをご覧ください。

Parquet の変換

BigQuery は、次のように Parquet のデータ型を BigQuery のデータ型に変換します。

型変換

Parquet の型 Parquet の変換される型 BigQuery のデータ型
BOOLEAN NONE ブール値
INT32 NONE、UINT_8、UINT_16、UINT_32、INT_8、INT_16、INT_32 整数
INT32 DECIMAL(DECIMAL アノテーションを参照) 数値
INT32 DATE 日付
INT64 NONE、UINT_64、INT_64 整数
INT64 DECIMAL(DECIMAL アノテーションを参照) 数値
INT64 TIMESTAMP_MILLIS タイムスタンプ
INT64 TIMESTAMP_MICROS タイムスタンプ
INT96 NONE タイムスタンプ
FLOAT NONE 浮動小数点数
DOUBLE NONE 浮動小数点数
BYTE_ARRAY NONE バイト
BYTE_ARRAY UTF8 文字列
FIXED_LEN_BYTE_ARRAY DECIMAL(DECIMAL アノテーションを参照) 数値
FIXED_LEN_BYTE_ARRAY NONE バイト

Parquet の型と変換される型の上記以外の組み合わせはサポートされていません。

Decimal アノテーション

DECIMAL アノテーションを持つ Parquet 型の場合、precision の最大値は 38(合計桁数)、scale の最大値は 9(小数点以下の桁数)です。整数の桁数(precision から scale を減算したもの)。最大値は 29 です。たとえば DECIMAL(38, 9) は、precision が 38 であり、scale が 9 であるためサポートされます。この例では、整数の桁数は 29 です。DECIMAL(38, 5) は precision が 38 で、scale が 5 であるため、サポートされません。この例では、整数の桁数は 33 です。

列名の変換

列名には、英字(a~z、A~Z)、数字(0~9)、アンダースコア(_)のみを含める必要があり、英字またはアンダースコアで始まる必要があります。列名の最大長は 128 文字です。列名には、次のいずれの接頭辞も使用できません。

  • _TABLE_
  • _FILE_
  • _PARTITION

大文字と小文字が異なっている場合でも、重複する列名は使用できません。たとえば、Column1 という列は column1 という列と同じとみなされます。

現時点では、列名にピリオド(.)を持つ列を含む Parquet ファイルを読み込むことはできません。

Parquet の列名に他の文字(ピリオド以外)が含まれている場合、その文字はアンダースコアに置き換えられます。列名の末尾のアンダースコアを追加すると、競合を回避できます。たとえば、Parquet ファイルに Column1column1 の 2 つの列が含まれている場合、それらの列はそれぞれ Column1column1_ として読み込まれます。