BigQuery へのデータのストリーミング

BigQuery にデータを読み込むジョブを使用する代わりに tabledata.insertAll メソッドを使用して、一度に 1 レコードずつ BigQuery にデータをストリーミングできます。このアプローチを使用すると、読み込みジョブの実行遅延を発生させることなく、データのクエリを実行できます。このドキュメントでは、アプローチを選択する前に考慮する必要がある、重要なトレードオフ(ストリーミング割り当て、データ可用性、データ整合性など)について説明します。

始める前に

  1. 宛先テーブルを含んだデータセットへの書き込みアクセス権があることを確認します。テーブルにデータを書き込む前に、まずテーブルが存在している必要があります。ただし、テンプレート テーブルを使用する場合は異なります。テンプレート テーブルについて詳しくは、テンプレート テーブルを使用した自動的なテーブル作成をご覧ください。

  2. データ ストリーミングの割り当てポリシーをチェックします。

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。

    課金を有効にする方法について

    ストリーミングは、無料枠では利用できません。課金を有効にせずにストリーミングを使用しようとすると、次のエラーが表示されます。BigQuery: Streaming insert is not allowed in the free tier.

データ可用性のチェック

ストリーミングされたデータは、テーブルへの最初のストリーミング挿入から数秒以内にリアルタイム分析に使用可能になります。まれに(停電時など)、ストリーミング バッファ内のデータが一時的に使用できなくなることがあります。データが使用可能でなくてもクエリは正常に動作し続けますが、ストリーミング バッファに残っている一部のデータはスキップされます。このような場合は、bigquery.jobs.getQueryResultserrors フィールド、bigquery.jobs.query に対するレスポンス、または bigquery.jobs.getstatus.errors フィールドに警告が格納されます。

データがコピーやエクスポートのオペレーションに使用可能になるまでに最大 90 分かかることがあります。また、分割テーブルにストリーミングする場合、ストリーミング バッファ内のデータに含まれる _PARTITIONTIME 疑似列の値は NULL になります。データがコピーやエクスポートに使用できるかどうかを確認するには、tables.get に対するレスポンスで streamingBuffer というセクションをチェックします。このセクションがない場合、データはコピーやエクスポートに使用可能であり、_PARTITIONTIME 疑似列には非 NULL 値が入っています。また、streamingBuffer.oldestEntryTime フィールドを使用してストリーミング バッファ内のレコードの滞在時間を調べることができます。

データ整合性の維持

データの整合性を維持するために、挿入された各行に対し insertId を指定することができます。BigQuery は、この ID を少なくとも 1 分間記憶します。この時間内に同じ行セットのストリーミングを試行した場合、insertId プロパティが設定されていれば、BigQuery は insertId プロパティを使用してデータの重複をベスト エフォートで排除します。

特定のエラー状況下(システムと BigQuery 間のネットワーク エラーや、BigQuery の内部エラーなど)では、ストリーミング挿入の状態を確認することができないため、挿入を再試行しなければならない場合があります。挿入を再試行する場合は、BigQuery がデータの重複排除を試行できるよう、同じ行セットに同じ insertId を使用するようにしてください。詳細については、ストリーミング挿入に関するトラブルシューティングをご覧ください。

Google データセンターの接続が予期せず失われる稀なケースにおいては、自動重複排除を実行できない場合があります。

データの要件が厳しい場合は、代替サービスとして、トランザクションをサポートする Google Cloud Datastore の使用を検討してください。

複数のデータ ロケーションをまたいだデータ ストリーミング

データは米国と EU の両方のデータセットにストリーミングできます。BigQuery が insertAll リクエストを処理する際、データはデータセットのロケーションの外部にあるマシンを通過することがあります。データセットのロケーションの外部からデータをストリーミングした場合は、レイテンシやエラー率が高まる可能性があります。

取り込み時間分割テーブルへのデータのストリーミング

insertAll リクエストを使用して、個々の行を分割テーブルにストリーミングできます。挿入されるデータの宛先パーティションはデフォルトで UTC 時間に基づく現在の日付から推定されます。

取り込み時間分割テーブルにデータをストリーミングする場合は、insertAll リクエストの一部としてパーティション デコレータを指定することで、推定された日付をオーバーライドできます。たとえば、次のようにパーティション デコレータを使用して、mydataset.table テーブルの 2017-03-01 に対応するパーティションにストリーミングできます。

mydataset.table$20170301

新しく到着したデータは、ストリーミング バッファ内にある間、一時的に UNPARTITIONED パーティションに関連付けられます。その結果、疑似列([_PARTITIONTIME] と [_PARTITIONDATE] のいずれかをデータ型により選択)を使って UNPARTITIONED パーティション由来の NULL 値を除外することにより、ストリーミング バッファ内のデータをクエリから除くことができます。

パーティション デコレータを使用してストリーミングを行う際は、現在の UTC 時間に基づき、過去 31 日以内のパーティションと現在の日付から 16 日後までのパーティションにストリーミングできます。これらの許容範囲に収まらない日付のパーティションに書き込むには、分割テーブルデータの追加と上書きの説明に沿って、読み込みジョブまたはクエリジョブを使用します。

分割テーブルへのストリーミング

DATE 列または TIMESTAMP 列で分割されたテーブルに、過去 1 年間と将来の 6 か月間のデータをストリーミングできます。この範囲外のデータは拒否されます。

データがストリーミングされると、過去 7 日間と将来の 3 日間のデータがストリーミング バッファに置かれ、対応するパーティションに抽出されます。この範囲外のデータ(ただし過去 1 年、将来 6 か月の範囲内)は、ストリーミング バッファに配置され、次に UNPARTITIONED パーティションへ抽出されます。パーティショニングされていないデータが十分蓄積されると、対応するパーティションに読み込まれます。

テンプレート テーブルを使用したテーブルの自動作成

BigQuery にデータをストリーミングする場合の典型的な使用パターンは、論理テーブルを多数の小さいテーブルに分割して、より小さいデータセット(ユーザー ID 別のデータセットなど)を作成することです。日付別の小さいデータセットを作成するには、分割テーブルを使用します。日付ベースでない小さいテーブルを作成するには、テンプレート テーブルを使用し、テーブルを BigQuery に自動作成させます。

BigQuery API を介してテンプレート テーブルを使用するには、insertAll リクエストに templateSuffix パラメータを追加します。bq コマンドライン ツールの場合は、insert コマンドに template_suffix フラグを追加します。BigQuery は、templateSuffix パラメータまたは template_suffix フラグを検出した場合、ターゲット テーブルをベース テンプレートとして扱い、ターゲット テーブルと同じスキーマを共有する新しいテーブルを、指定されたサフィックスを含んだ名前で作成します。

<targeted_table_name> + <templateSuffix>

テンプレート テーブルを使用すると、各テーブルを個別に作成したり、各テーブルのスキーマを指定したりするためのオーバーヘッドを回避できます。テンプレートを 1 つ作成し、複数のサフィックスを指定するだけで、新規テーブルを BigQuery に自動作成させることができます。BigQuery は、各テーブルを同じプロジェクトとデータセット内に配置します。また、テンプレートを使用した場合、ユーザーはテンプレート テーブルを更新するだけでよいので、スキーマの更新も容易になります。

テンプレート テーブルを通じて作成されたテーブルは、通常数秒以内に利用可能になります。ただし、それより長い時間を要する場合もまれにあります。

テンプレート テーブル スキーマの変更

テンプレート テーブル スキーマを変更した場合は、それ以降に生成されるすべてのテーブルで、更新後のスキーマが使用されます。以前に生成されたテーブルには影響はありません(既存のテーブルにストリーミング バッファが残っている場合を除く)。

ストリーミング バッファが残っている既存のテーブルについては、テンプレート テーブル スキーマに対する変更が後方互換性のあるものであれば、現にストリーミングが行われているそれらの生成済テーブルのスキーマも更新されます。ただし、テンプレート テーブル スキーマに対する変更が後方互換性のないものである場合は、古いスキーマを使用するバッファデータがすべて失われます。また、互換性がなくなった古いスキーマを使用する既存の生成済テーブルに、新しいデータをストリーミングすることはできません。

テンプレート テーブル スキーマを変更した後は、その変更が伝播されるまで、新しいデータの挿入や、生成されたテーブルに対するクエリを行わないでください。新しいフィールドを挿入するリクエストは、数分以内に処理されます。新規フィールドに対するクエリ実行には、最大 90 分の待機時間を要する場合があります。

生成済みテーブルのスキーマを変更する場合は、テンプレート テーブルを経由したストリーミングが停止し、生成済みテーブルのストリーミング統計セクションが tables.get() レスポンスからなくなる(テーブルでバッファされているデータがなくなる)まで、スキーマを変更しないようにしてください。

テンプレート テーブルの詳細

テンプレート サフィックス値
templateSuffix(または --template_suffix)の値に使用できるのは、英字(a-z、A-Z)、数字(0-9)、アンダースコア(_)です。テーブル名とテーブル サフィックスの最大連結文字数は 1,024 文字です。
割り当て
すべてのテーブルには、同じ割り当てが適用されます(テーブルがテンプレートに基づいているか、手動で作成されたかにかかわらない)。
有効期間
生成されたテーブルの有効期間はデータセットから継承されます。通常のストリーミング データと同様、生成されたテーブルを直ちにコピーしたりエクスポートしたりすることはできません。
重複排除
重複排除は、宛先テーブルへの同質な参照間でのみ実行されます。たとえば、テンプレート テーブルと通常の insertAll コマンドの両方を使用して生成済テーブルへのストリーミングを同時に実行した場合、テンプレート テーブルと通常の insertAll コマンドによって挿入された各行の間で重複排除は実行されません。
ビュー
ビューをテンプレート テーブルとして使用することはできません。また、テンプレート テーブルを元にビューを自動生成させることもできません。

サンプル ユースケース

大容量のイベント ロギング

大量のデータをリアルタイムに収集するアプリがある場合には、ストリーミング挿入が有効な選択肢となります。一般的に、この種のアプリには次の基準が存在します。

  • 非トランザクショナル。 行数が大量で、継続的に追加される。重複が発生する、データが一時的に利用できなくなるといったケースが稀に発生してもアプリにとって許容できる。
  • 集計分析。 クエリは一般に、(単一または少数のレコード選択ではなく)傾向分析のために実行される。

大容量のイベント ロギングの一例として挙げられるのが、イベント追跡です。たとえば、イベントを追跡するモバイルアプリを使用しているとします。そのアプリ(またはモバイル サーバー)は、ユーザーとのやりとりやシステムエラーを自立的に記録し、それらを BigQuery にストリーミングします。管理者はそれらのデータを分析して、全体的な傾向(やりとりや問題の多い領域など)を判断したり、エラーの状態をリアルタイムにモニタリングしたりすることができます。

手動の重複排除

次の手動プロセスを使用して、ストリーミングの実行後に重複行が残らないようにすることができます。

  1. テーブル スキーマ内の列として insertId を追加し、各行のデータに insertId 値を含めます。
  2. ストリーミングが停止した後に、次のクエリを実行して重複をチェックします。

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    結果が 1 より大きい場合は、重複が存在します。
  3. 重複を排除するには、次のクエリを実行します。宛先テーブルを指定し、サイズの大きい結果を許容し、結果のフラット化を無効にしてください。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

重複排除クエリに関する注記:

  • 重複排除クエリの戦略として、新規テーブルをターゲットにしたほうが安全です。なお、書き込み処理 WRITE_TRUNCATE を使用して、ソーステーブルをターゲットにすることもできます。
  • 重複排除クエリは、テーブル スキーマの末尾に値を 1 に設定した row_number 列を追加します。クエリでは、標準 SQLSELECT * EXCEPT ステートメントを使用して、この row_number 列を宛先テーブルから除外します。#standardSQL プレフィックス接頭辞を使用することにより、このクエリで標準 SQL が有効になります。また、特定の列名を指定して、この列を省略することもできます。
  • ライブデータのクエリで重複を排除するには、重複排除クエリを使用してテーブルのビューを作成するという方法もあります。なお、ビューに対するクエリの料金はビュー内の選択列に基づいて計算されるため、スキャンされるバイトサイズが大きくなる可能性があります。

ストリーミング挿入に関するトラブルシューティング

ストリーミング挿入時に発生したエラーの解決方法については、エラーのトラブルシューティング ページのストリーミング挿入に関するトラブルシューティングをご覧ください。

トップへ戻る

ストリーミング挿入の例

C#

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。詳細については、BigQuery C# API のリファレンス ドキュメントをご覧ください。

using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Go の設定手順に従ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Uploader()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow("rowId", rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用の Node.js の設定手順に従ってください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある PHP 向けの手順に従って設定を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Python 向けの手順に従って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

このサンプルを試す前に、BigQuery クイックスタート: クライアント ライブラリの使用にある Ruby 向けの手順に従って設定を行ってください。詳細については、BigQuery Ruby API のリファレンス ドキュメントをご覧ください。

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

トップへ戻る

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。