BigQuery へのデータのストリーミング

ジョブを使用して BigQuery 内にデータを読み込む代わりに、tabledata().insertAll() メソッドを使用して、一度に 1 レコードずつデータを BigQuery にストリーミングすることができます。このアプローチを使用すると、読み込みジョブの実行遅延を発生させることなく、データのクエリを実行できます。このドキュメントでは、アプローチを選択する前に考慮する必要がある、重要なトレードオフ(ストリーミング割り当て、データ可用性、データ整合性など)について説明します。

始める前に

  1. 宛先テーブルを含んだデータセットへの書き込みアクセス権があることを確認します。テーブルにデータを書き込む前に、まずテーブルが存在している必要があります。ただしテンプレート テーブルを使用する場合は異なります。テンプレート テーブルについて詳しくは、テンプレート テーブルを使用した自動的なテーブル作成をご覧ください。

  2. データ ストリーミングの割り当てポリシーをチェックします。

データ可用性のチェック

ストリーミングされたデータは、テーブル内への最初のストリーミング挿入から数秒以内に、リアルタイム分析に使用できるようになります。

データをコピーやエクスポートのオペレーションに使用できるようになるまでには、最大 90 分かかることがあります。また、分割テーブルにストリーミングする場合、ストリーミング バッファのデータは、_PARTITIONTIME 疑似列の値が NULL になります。データをコピーやエクスポートに使用できるかどうかを確認するには、tables.get というセクションの streamingBuffer レスポンスをチェックします。このセクションがない場合は、データをコピーやエクスポートに使用することができ、_PARTITIONTIME 疑似列が非 NULL 値になります。

データ整合性の維持

データの整合性を維持するために、挿入された各行の insertId を提供できます。BigQuery では、この ID は少なくとも 1 分間記憶されます。この時間内に同じ行セットのストリーミングを試行した場合、insertId プロパティが設定されていれば、BigQuery は insertId プロパティを使用してデータの重複をベスト エフォートで排除します。

特定のエラー状況下(システムと BigQuery 間のネットワーク エラーや、BigQuery 内の内部エラーなど)では、ストリーミング挿入の状態を確認することができないため、挿入を再試行しなければならない場合があります。挿入を再試行する場合は、BigQuery がデータの重複排除を試行できるよう、同じ行セットに同じ insertId を使用するようにしてください。詳細については、ストリーミング挿入に関するトラブルシューティングをご覧ください。

Google データセンターの接続が予期せず失われる稀なケースにおいては、自動重複排除を実行できない場合があります。

データの要件がより厳格な場合は、Google Cloud Datastoreトランザクションをサポートする代替サービスとなります。

複数のデータ ロケーションにわたるデータ ストリーミング

データは米国と EU の両方のデータセットにストリーミングできます。BigQuery が insertAll リクエストを処理する際、データはデータセットのロケーションの外部にあるマシンを通過することがあります。データセットのロケーションの外部からデータをストリーミングした場合は、レイテンシやエラー率が高まる可能性があります。

分割テーブルへのストリーミング

分割テーブルにストリーミングを行う場合、テーブルに直接ストリーミングできます。この場合、行の [_PARTITIONTIME] 疑似列に NULL 値が設定されます。ストリーミング バッファから送信された時間の UTC タイムスタンプに応じて [_PARTITIONTIME] の値が行に割り当てられます。分割テーブルのパーティションに直接ストリーミングすることもできます。たとえば、パーティション デコレータを使用すると、テーブル mydataset.table の 2017-03-01 に対応するパーティションにストリーミングできます。

mydataset.table$20170301

パーティション デコレータを使用してストリーミングを行うと、現在の UTC 時間に基づき、過去 30 日以内のパーティションと現在の日付から 5 日後までのパーティションにストリーミングできます。この範囲に含まれない日付のパーティションに書き込むには、パーティションのデータの書き換えの手順に沿ってジョブを読み込むか、クエリを実行します。

テンプレート テーブルを使用した自動的なテーブル作成

BigQuery にデータをストリーミングする場合の典型的な使用ケースは、論理テーブルを多数の小規模テーブルに分割するケースです。これは、より小さいデータセット(日付別やユーザー ID 別など)を作成したり、スケーラビリティを確保したりする(たとえば、1 秒あたり 10 万行という現在の制限を超えてストリーミングする)ために行われます。クライアント側に複雑なコードを追加することなく、テーブルを多数の小さなテーブルに分割するには、BigQuery のテンプレート テーブル機能を使用して、BigQuery にテーブルを作成させるようにします。

テンプレート テーブルを BigQuery API 経由で使用するには、insertAll リクエストに templateSuffix パラメータを追加します。bq コマンドライン ツールの場合は、insert コマンドに template_suffix フラグを追加します。BigQuery は、templateSuffix パラメータまたは template_suffix フラグを検出した場合、ターゲット テーブルをベース テンプレートとして扱い、ターゲット テーブルと同じスキーマを共有する新しいテーブルを、指定されたサフィックスを含んだ名前で作成します。

<targeted_table_name> + <templateSuffix>

テンプレート テーブルを使用すると、各テーブルを個別に作成したり、各テーブルのスキーマを指定したりするためのオーバーヘッドを回避できます。テンプレートを 1 つ作成し、複数のサフィックスを指定するだけで、新規テーブルを BigQuery に自動作成させることができます。BigQuery は、各テーブルを同じプロジェクトとデータセット内に配置します。また、テンプレートを使用した場合、ユーザーはテンプレート テーブルを更新するだけでよいので、スキーマの更新も容易になります。

テンプレート テーブルを通じて作成されたテーブルは、通常数秒以内に利用可能になります。ただし、それより長い時間を要する場合もまれにあります。

テンプレート テーブル スキーマの変更

テンプレート テーブル スキーマを変更した場合は、それ以降に生成されるすべてのテーブルで、更新後のスキーマが使用されます。以前に生成されたテーブルには影響はありません(既存のテーブルにストリーミング バッファが残っている場合を除く)。

ストリーミング バッファが残っている既存のテーブルについては、テンプレート テーブル スキーマを下位互換性のある方法で変更すれば、活発にストリーミングされるそれらの生成済テーブルのスキーマも更新されます。ただし、下位互換性のない方法でテンプレート テーブル スキーマを変更した場合は、古いスキーマを使用するバッファデータがすべて失われます。また、互換性がなくなった古いスキーマを使用する既存の生成済テーブルに、新しいデータをストリーミングすることはできません。

テンプレート テーブル スキーマを変更した後は、その変更が伝播されるまで、新しいデータやクエリで生成されたテーブルを挿入しないでください。新しいフィールドを挿入するリクエストは、数分以内に処理されます。新規フィールドに対するクエリ実行には、最大 90 分の待機時間を要する場合があります。

生成済みテーブルのスキーマを変更する場合は、テンプレート テーブルを経由したストリーミングが停止し、生成済みテーブルのストリーミング統計セクションが tables.get() レスポンスからなくなる(テーブルでバッファされているデータがなくなる)まで、スキーマを変更しないようにしてください。

テンプレート テーブルの詳細

テンプレート サフィックス値
templateSuffix(または --template_suffix)の値に使用できるのは、英字(a-z、A-Z)、数字(0-9)、アンダースコア(_)です。テーブル名とテーブル サフィックスの最大連結文字数は 1,024 文字です。
割り当て
すべてのテーブルには、同じ割り当てが適用されます(テーブルがテンプレートに基づいているか、手動で作成されたかにかかわらない)。
有効期間
生成されたテーブルの有効期間はデータセットから継承されます。通常のストリーミング データの場合、生成されたテーブルを直ちにコピーしたりエクスポートしたりすることはできません。
重複排除
重複排除は、宛先テーブルへの均一なリファレンス間でのみ実行されます。たとえば、テンプレート テーブルと通常の insertAll コマンドの両方を使用して生成済テーブルへのストリーミングを同時に実行した場合、テンプレート テーブルと通常の insertAll コマンドによって挿入された各行の間で重複排除は実行されません。
ビュー
テンプレート テーブルと生成済テーブルはビューにはなりません。

サンプル ユースケース

大容量のイベント ロギング

大量のデータをリアルタイムに収集するアプリケーションがある場合には、ストリーミング挿入が有効な選択肢となります。一般的に、この種のアプリケーションには次の基準が存在します。

  • 非トランザクショナル。 行数が大量で、継続的に追加される。重複が発生したり、データが一時的に利用できなくなる稀なケースにアプリケーションが対応できる。
  • 集計分析。 クエリは通常、傾向分析のために実行される(単一または少数のレコード選択ではなく)。

大容量のイベント ロギングの一例として挙げられるのが、イベント追跡です。たとえば、イベントを追跡するモバイルアプリを使用している場合、そのアプリ(またはモバイル サーバー)は、ユーザーとのやりとりやシステムエラーを自立的に記録し、それらを BigQuery にストリーミングします。管理者はそれらのデータを分析して、全体的な傾向(やりとりや問題の多い領域など)を判断したり、エラーの状態をリアルタイムにモニタリングしたりすることができます。

リアルタイムのダッシュボードとクエリ

特定の状況においては、BigQuery にデータをストリーミングすることで、トランザクショナル データに対するリアルタイム分析を実行できます。データのストリーミングにはデータ重複の可能性が伴うため、必ず、BigQuery の外部にトランザクショナル データのプライマリ データストアを確保するようにしてください。

トランザクショナル データの分析を確実に実行し、最新のデータビューを確保するためには、次の対策を講じることが有効です。

  1. 同一のスキーマを使用して 2 つのテーブルを作成します。1 つ目のテーブルは調整済データ用で、2 つ目のテーブルはリアルタイムの未調整データ用です。
  2. クライアント側で、記録用のトランザクショナル データストアを管理します。
  3. これらのレコードに対する insertAll() リクエストをファイア アンド フォーゲット方式で送ります。insertAll() リクエストでは、リアルタイムの未調整テーブルを宛先テーブルとして指定します。
  4. 一定の間隔で、トランザクショナル データストアから調整済データを追加し、未調整データテーブルを切り捨てます。
  5. リアルタイムのダッシュボードやクエリについては、両方のテーブルからデータを選択できます。未調整データテーブルには、重複するレコードや破棄されたレコードが含まれる場合があります。

手動の重複排除

次の手動プロセスを使用して、ストリーミングの実行後に重複行が残らないようにすることができます。

  1. テーブル スキーマ内の列として insertId を追加し、各行のデータに insertId 値を含めます。
  2. ストリーミングが停止した後に、次のクエリを実行して重複をチェックします。

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        [ID_COLUMN],
        count(*) as count
      FROM
        `[TABLE_NAME]`
      GROUP BY
        [ID_COLUMN])

    結果が 1 より大きい場合は、重複が存在します。
  3. 重複を排除するには、次のクエリを実行します。宛先テーブルを指定し、サイズの大きい結果を許容し、結果のフラット化を無効にしてください。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY [ID_COLUMN]) row_number
      FROM
        `[TABLE_NAME]`)
    WHERE
      row_number = 1

重複排除クエリに関する注記:

  • 重複排除クエリの戦略として、新規テーブルをターゲットにしたほうが安全です。なお、書き込み処理 WRITE_TRUNCATE を使用して、ソーステーブルをターゲットにすることもできます。
  • 重複排除クエリは、テーブル スキーマの末尾に値が 1row_number 列を追加します。このクエリでは標準 SQLSELECT * EXCEPT ステートメントを使用して、宛先のテーブルから row_number 列を削除します。#standardSQL のプレフィックスを使用することで、このクエリに標準 SQL が使用できるようになります。または固有の列名を指定して、この列を省略することもできます。
  • 重複排除されたライブデータの照会用に、重複排除クエリを使用してテーブルのビューを作成することもできます。なお、ビューに対するクエリコストはビュー内の選択列に基づいて計算されるため、スキャンされるバイトサイズが大きくなる可能性があります。

ストリーミング挿入に関するトラブルシューティング

ストリーミング挿入時に発生したエラーの解決方法については、エラーのトラブルシューティング ページのストリーミング挿入に関するトラブルシューティングをご覧ください。

トップへ戻る

ストリーミング挿入の例

C#

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

public void UploadJson(string datasetId, string tableId, BigQueryClient client)
{
    // Note that there's a single line per JSON object. This is not a JSON array.
    IEnumerable<string> jsonRows = new string[]
    {
        "{ 'title': 'exampleJsonFromStream', 'unique_words': 1}",
        "{ 'title': 'moreExampleJsonFromStream', 'unique_words': 1}",
        //add more rows here...
    }.Select(row => row.Replace('\'', '"')); // Simple way of representing C# in JSON to avoid escaping " everywhere.

    // Normally we'd be uploading from a file or similar. Any readable stream can be used.
    var stream = new MemoryStream(Encoding.UTF8.GetBytes(string.Join("\n", jsonRows)));

    // This example uploads data to an existing table. If the upload will create a new table
    // or if the schema in the JSON isn't identical to the schema in the table,
    // create a schema to pass into the call instead of passing in a null value.
    BigQueryJob job = client.UploadJson(datasetId, tableId, null, stream);
    // Use the job to find out when the data has finished being inserted into the table,
    // report errors etc.

    // Wait for the job to complete.
    job.PollUntilCompleted();
}

Go

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

u := client.Dataset(datasetID).Table(tableID).Uploader()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "n1", Count: 7},
	{Name: "n2", Count: 2},
	{Name: "n3", Count: 1},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response = bigquery.insertAll(InsertAllRequest.newBuilder(tableId)
    .addRow("rowId", rowContent)
    // More rows can be added in the same RPC by invoking .addRow() on the builder
    .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

// Imports the Google Cloud client library
const BigQuery = require('@google-cloud/bigquery');

// The project ID to use, e.g. "your-project-id"
// const projectId = "your-project-id";

// The ID of the dataset of the table into which data should be inserted, e.g. "my_dataset"
// const datasetId = "my_dataset";

// The ID of the table into which data should be inserted, e.g. "my_table"
// const tableId = "my_table";

// Instantiates a client
const bigquery = BigQuery({
  projectId: projectId
});

// Inserts data into a table
bigquery
  .dataset(datasetId)
  .table(tableId)
  .insert(rows)
  .then((insertErrors) => {
    console.log('Inserted:');
    rows.forEach((row) => console.log(row));

    if (insertErrors && insertErrors.length > 0) {
      console.log('Insert errors:');
      insertErrors.forEach((err) => console.error(err));
    }
  })
  .catch((err) => {
    console.error('ERROR:', err);
  });

PHP

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream a row of data into your BigQuery table
 * Example:
 * ```
 * $data = [
 *     "field1" => "value1",
 *     "field2" => "value2",
 * ];
 * stream_row($projectId, $datasetId, $tableId, $data);
 * ```.
 *
 * @param string $projectId The Google project ID.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId   The BigQuery table ID.
 * @param string $data      An associative array representing a row of data.
 * @param string $insertId  An optional unique ID to guarantee data consistency.
 */
function stream_row($projectId, $datasetId, $tableId, $data, $insertId = null)
{
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
        'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $insertResponse = $table->insertRows([
        ['insertId' => $insertId, 'data' => $data],
        // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

def stream_data(dataset_name, table_name, json_data):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset_name)
    table = dataset.table(table_name)
    data = json.loads(json_data)

    # Reload the table to get the schema.
    table.reload()

    rows = [data]
    errors = table.insert_data(rows)

    if not errors:
        print('Loaded 1 row into {}:{}'.format(dataset_name, table_name))
    else:
        print('Errors:')
        pprint(errors)

Ruby

BigQuery クライアントのインストールと作成について詳しくは、BigQuery クライアント ライブラリをご覧ください。

# project_id = "Your Google Cloud project ID"
# dataset_id = "ID of the dataset containing table"
# table_id   = "ID of the table to import data into"
# row_data   = [{ column1: value, column2: value }, ...]

require "google/cloud/bigquery"

bigquery = Google::Cloud::Bigquery.new project: project_id
dataset  = bigquery.dataset dataset_id
table    = dataset.table table_id

response = table.insert row_data

if response.success?
  puts "Inserted rows successfully"
else
  puts "Failed to insert #{response.error_rows.count} rows"
end

トップへ戻る

外出先でもリソースをモニタリング

Google Cloud Console アプリを入手して、プロジェクトの管理にお役立てください。

フィードバックを送信...

BigQuery のドキュメント