以前のストリーミング API を使用する

このドキュメントでは、以前の tabledata.insertAll メソッドを使用して BigQuery にデータをストリーミングする方法について説明します。

新しいプロジェクトでは、tabledata.insertAll メソッドの代わりに BigQuery Storage Write API を使用することをおすすめします。Storage Write API は、1 回限りの配信セマンティクスなど、低価格でより堅牢な機能を備えています。既存のプロジェクトを tabledata.insertAll メソッドから Storage Write API に移行する場合は、デフォルト ストリームを選択することをおすすめします。tabledata.insertAll メソッドは引き続き完全にサポートされます。

始める前に

  1. 宛先テーブルを含んだデータセットへの書き込みアクセス権があることを確認します。テーブルにデータを書き込む前に、まずテーブルが存在している必要があります。ただし、テンプレート テーブルを使用する場合は異なります。テンプレート テーブルについて詳しくは、テンプレート テーブルを使用した自動的なテーブル作成をご覧ください。

  2. データ ストリーミングの割り当てポリシーをチェックします。

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. 無料枠でストリーミングは利用できません。課金を有効にせずストリーミングの使用を試みると、次のエラーが表示されます。BigQuery: Streaming insert is not allowed in the free tier.

  5. このドキュメントの各タスクを実行するために必要な権限をユーザーに与える Identity and Access Management(IAM)のロールを付与します。

必要な権限

BigQuery にデータをストリーミングするには、次の IAM 権限が必要です。

  • bigquery.tables.updateData(テーブルにデータを挿入)
  • bigquery.tables.get(テーブルのメタデータを取得)
  • bigquery.datasets.get(データセットのメタデータを取得)
  • bigquery.tables.createテンプレート テーブルを使用してテーブルを自動的に作成する場合は必須)

次の各 IAM 事前定義ロールには、BigQuery にデータをストリーミングするために必要な権限が含まれています。

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

BigQuery での IAM のロールと権限について詳しくは、事前定義ロールと権限をご覧ください。

BigQuery へのデータのストリーミング

C#

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートC# の手順に沿って設定を行ってください。詳細については、BigQuery C# API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートGo の手順に沿って設定を行ってください。詳細については、BigQuery Go API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートJava の手順に沿って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートNode.js の手順に沿って設定を行ってください。詳細については、BigQuery Node.js API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートPHP の手順に沿って設定を行ってください。詳細については、BigQuery PHP API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートPython の手順に沿って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートRuby の手順に沿って設定を行ってください。詳細については、BigQuery Ruby API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

行の挿入時に insertID フィールドに値を入力する必要はありません。次の例は、ストリーミング時に各行の insertID の送信を防ぐ方法を示しています。

Java

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートJava の手順に沿って設定を行ってください。詳細については、BigQuery Java API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

このサンプルを試す前に、クライアント ライブラリを使用した BigQuery クイックスタートPython の手順に沿って設定を行ってください。詳細については、BigQuery Python API のリファレンス ドキュメントをご覧ください。

BigQuery に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

日時データの送信

日時フィールドの場合、tabledata.insertAll メソッドで次のようにデータをフォーマットします。

種類 形式
DATE "YYYY-MM-DD" 形式の文字列
DATETIME "YYYY-MM-DD [HH:MM:SS]" 形式の文字列
TIME "HH:MM:SS" 形式の文字列
TIMESTAMP 1970-01-01(Unix エポック)からの経過秒数、または "YYYY-MM-DD HH:MM[:SS]" 形式の文字列

ストリーミング データの可用性

GoogleSQL クエリを使用したリアルタイム分析にデータは、BigQuery が tabledata.insertAll リクエストの確認応答に成功した直後に使用できるようになります。

最近取り込み時間パーティション分割テーブルにストリーミングした行は、_PARTITIONTIME 疑似列の値が一時的に NULL になります。このような行に対して、BigQuery はバックグラウンドで PARTITIONTIME 列の最終的な NULL 以外の値を割り当てます。これは通常、数分以内に行われます。まれに最長 90 分かかることがあります。

最近ストリーミングされた行の一部は、通常、数分間はテーブルのコピーに使用できない可能性があります。まれに最長 90 分かかることがあります。データがテーブルのコピーに使用できるかどうかを確認するには、tables.get に対するレスポンスで streamingBuffer というセクションをチェックします。streamingBuffer セクションがなければ、データはコピーに利用できます。streamingBuffer.oldestEntryTime フィールドを使用して、ストリーミング バッファ内のレコードの経過時間を特定することもできます。

ベスト エフォート型の重複排除

挿入された行に対して insertId を指定した場合、BigQuery はこの ID を使用して、ベスト エフォート型の重複排除を最大 1 分間サポートします。つまり、その期間内に同じ insertId の同じ行を同じテーブルに複数回ストリーミングしようとすると、BigQuery はその行の複数のオカレンスを重複排除して、それらのオカレンスの一つだけを保持する可能性があります。

このシステムでは、同じ insertId が指定された行も同一のものであると想定されます。2 つの行が同じ insertId を持つ場合、どちらの行を BigQuery が保持するかは非決定的になります。

重複排除は一般に、システムと BigQuery 間のネットワーク エラーや BigQuery の内部エラーといった特定のエラー状態でストリーミング挿入の状態を判断する方法がない分散システムでの再試行シナリオ向けです。挿入を再試行する場合は、BigQuery がデータの重複排除を試行できるよう、同じ行セットに同じ insertId を使用するようにしてください。詳細については、ストリーミング挿入に関するトラブルシューティングをご覧ください。

BigQuery の重複排除はベスト エフォート型であり、データの重複がないことを保証するメカニズムとしての使用には適していません。さらに、データの高い信頼性と可用性を保証するために、BigQuery はベスト エフォート型の重複排除の品質を低下させる可能性があります。

データの重複排除に関して厳密な要件がある場合は、Google Cloud Datastoreトランザクションをサポートする代替サービスとなります。

ベスト エフォート型の重複排除の無効化

ベスト エフォート型の重複排除を無効にするには、挿入された各行の insertId フィールドに値を設定しないようにします。これは、データの挿入を行う場合に推奨される方法です。

Apache Beam と Dataflow

Apache Beam の Java 用 BigQuery I/O コネクタを使用しているときにベスト エフォート型の重複排除を無効にするには、ignoreInsertIds() メソッドを使用します。

手動の重複排除

ストリーミングの実行後に重複行が残らないようにするには、次の手動プロセスを使用します。

  1. テーブル スキーマ内の列として insertId を追加し、各行のデータに insertId 値を含めます。
  2. ストリーミングが停止した後に、次のクエリを実行して重複をチェックします。

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    結果が 1 より大きい場合は、重複が存在します。
  3. 重複を排除するには、次のクエリを実行します。宛先テーブルを指定してサイズの大きい結果を許容し、結果のフラット化を無効にします。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

重複排除クエリに関する注:

  • 重複排除クエリの戦略として、新規テーブルをターゲットにした方が安全です。なお、書き込み処理 WRITE_TRUNCATE を使用して、ソーステーブルをターゲットにすることもできます。
  • 重複排除クエリは、テーブル スキーマの末尾に値を 1 に設定した row_number 列を追加します。クエリでは、GoogleSQLSELECT * EXCEPT ステートメントを使用して、この row_number 列を宛先テーブルから除外します。#standardSQL 接頭辞を使用すると、このクエリで GoogleSQL が有効になります。また、特定の列名を指定して、この列を省略することもできます。
  • ライブデータのクエリで重複を排除するには、重複排除クエリを使用してテーブルのビューを作成するという方法もあります。なお、ビューに対するクエリコストはビュー内の選択列に基づいて計算されるため、スキャンされるバイトサイズが大きくなる可能性があります。

時間パーティション分割テーブルへのストリーミング

時間パーティション分割テーブルにデータをストリーミングする場合は、各パーティションにストリーミング バッファが存在します。writeDisposition プロパティを WRITE_TRUNCATE に設定すると、パーティションを上書きする読み込み、クエリ、またはコピーのジョブを実行したときにストリーミング バッファが保持されます。ストリーミング バッファを削除するには、そのパーティションに対して tables.get を呼び出し、ストリーミング バッファが空であることを確認します。

取り込み時間パーティショニング

取り込み時間パーティション分割テーブルにストリーミングすると、BigQuery は現在の UTC 時間から宛先パーティションを推測します。

新しく到着したデータは、ストリーミング バッファ内にある間、一時的に __UNPARTITIONED__ パーティションに配置されます。パーティション分割されていないデータが十分蓄積されると、BigQuery はデータを正しいパーティションに分割します。ただし、データが __UNPARTITIONED__ パーティションから移動するために要する時間に関する SLA はありません。いずれかの疑似列(優先するデータ型に応じて _PARTITIONTIME または _PARTITIONDATE)を使用して、__UNPARTITIONED__ パーティションからの NULL 値を除外することにより、ストリーミング バッファ内のデータをクエリの対象外にできます。

日次パーティション分割テーブルにデータをストリーミングする場合は、insertAll リクエストの一部としてパーティション デコレータを指定することで、日付の推定をオーバーライドできます。tableId パラメータにデコレータを含めます。たとえば、次のようにパーティション デコレータを使用して、table1 テーブルの 2021-03-01 に対応するパーティションにストリーミングできます。

table1$20210301

パーティション デコレータを使用してストリーミングを行う際は、現在の UTC 時間に基づき、過去 31 日以内のパーティションと現在の日付から 16 日後までのパーティションにストリーミングできます。この範囲に含まれない日付のパーティションに書き込むには、パーティション分割テーブルデータの追加と上書きで説明するように、読み込みジョブまたはクエリジョブを使用します。

パーティション デコレータを使用したストリーミングは、日単位のパーティション分割テーブルでのみサポートされています。時間単位、月単位、年単位のパーティション分割テーブルではサポートされていません。

テストには、 コマンドライン ツールの bq insert CLI コマンドを使用できます。たとえば、次のコマンドを実行すると、2017 年 1 月 1 日($20170101)のパーティション全体のデータを、mydataset.mytable という名前のパーティション分割テーブルに 1 行がストリーミングされます。

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

時間単位列パーティショニング

DATE 列、DATETIME 列、または TIMESTAMP 列で分割されたテーブルに、過去 5 年間、向こう 1 年間のデータをストリーミングできます。この範囲外のデータは拒否されます。

データがストリーミングされると、最初に __UNPARTITIONED__ パーティションに配置されます。パーティション分割されていないデータが十分蓄積されると、BigQuery はデータのパーティション再設定を自動的に行い、適切なパーティションに配置します。ただし、データが __UNPARTITIONED__ パーティションから移動するために要する時間に関する SLA はありません。

  • 注: 日単位のパーティションは、時間、月、年単位のパーティションとは異なる方法で処理されます。期間(過去 7 日間から将来の 3 日間まで)外のデータのみがパーティション分割されていないパーティションに抽出され、パーティションの再分割を待ちます。一方、時間単位のパーティション分割テーブルの場合、データは常にパーティション分割されていないパーティションに抽出され、後でパーティションの再分割が行われます。

テンプレート テーブルを使用したテーブルの自動作成

テンプレート テーブルは、論理テーブルを多数の小さなテーブルに分割して、より小さなデータの集合(たとえば、ユーザー ID ごと)を作成するメカニズムを提供します。テンプレート テーブルには、以下に説明するいくつかの制限があります。代わりにパーティション分割テーブルクラスタ化テーブルを使用してこの動作を実現することをおすすめします。

BigQuery API を介してテンプレート テーブルを使用するには、insertAll リクエストに templateSuffix パラメータを追加します。bq コマンドライン ツールの場合は、insert コマンドに template_suffix フラグを追加します。BigQuery は、templateSuffix パラメータまたは template_suffix フラグを検出した場合、ターゲット テーブルをベース テンプレートとして扱います。さらに、ターゲット テーブルと同じスキーマを共有する新しいテーブルを、指定されたサフィックスを含む名前で作成します。

<targeted_table_name> + <templateSuffix>

テンプレート テーブルを使用すると、各テーブルを個別に作成し、各テーブルのスキーマを指定するためのオーバーヘッドを回避できます。テンプレートを 1 つ作成し、複数のサフィックスを指定するだけで、新規テーブルを BigQuery に自動作成させることができます。BigQuery は、各テーブルを同じプロジェクトとデータセット内に配置します。

テンプレート テーブルを通じて作成されたテーブルは、通常数秒以内に利用可能になります。ただし、それより長い時間を要する場合もまれにあります。

テンプレート テーブル スキーマの変更

テンプレート テーブル スキーマを変更した場合は、それ以降に生成されるすべてのテーブルで、更新後のスキーマが使用されます。以前に生成されたテーブルには影響はありません(既存のテーブルにストリーミング バッファが残っている場合を除く)。

ストリーミング バッファが残っている既存のテーブルについては、テンプレート テーブル スキーマに対する変更が後方互換性のあるものであれば、現にストリーミングが行われているそれらの生成済テーブルのスキーマも更新されます。ただし、テンプレート テーブル スキーマに対する変更が後方互換性のないものである場合は、古いスキーマを使用するバッファデータがすべて失われます。また、互換性がなくなった古いスキーマを使用する既存の生成済テーブルに、新しいデータをストリーミングすることはできません。

テンプレート テーブル スキーマを変更した後は、その変更が伝播されるまで、新しいデータの挿入や、生成されたテーブルに対するクエリを行わないでください。新しいフィールドを挿入するリクエストは、数分以内に処理されます。新規フィールドに対するクエリ実行には、最大 90 分の待機時間を要する場合があります。

生成されたテーブルのスキーマを変更する場合は、テンプレート テーブルを経由したストリーミングが停止し、生成済みテーブルのストリーミング統計セクションが tables.get() レスポンスからなくなる(テーブルでバッファされているデータがなくなる)まで、スキーマを変更しないようにしてください。

パーティション分割テーブルクラスタ化テーブルには前述の制限がないため、おすすめのメカニズムです。

テンプレート テーブルの詳細

テンプレート サフィックス値
templateSuffix(または --template_suffix)値には、英字(a~z、A~Z)、数字(0~9)、アンダースコア(_)のみを含める必要があります。テーブル名とテーブル サフィックスの最大連結文字数は 1,024 文字です。
割り当て

テンプレート テーブルには、ストリーミング割り当ての制限が適用されます。tables.insert API と同様に、プロジェクトでは、テンプレート テーブルを使用して、1 秒あたり最大 10 個のテーブルを作成できます。この割り当ては作成されるテーブルにのみ適用され、変更されるテーブルには適用されません。

アプリケーションで 1 秒あたり 10 個を超えるテーブルを作成する必要がある場合は、クラスタ化テーブルの使用をおすすめします。たとえば、カーディナリティが高いテーブル ID を単一のクラスタリング テーブルのキー列に配置できます。

有効期間

生成されたテーブルの有効期間はデータセットから継承されます。通常のストリーミング データと同様、生成されたテーブルをすぐにコピーすることはできません。

重複排除

重複排除は、宛先テーブルへの同質な参照間でのみ実行されます。たとえば、テンプレート テーブルと通常の insertAll コマンドの両方を使用して生成済テーブルへのストリーミングを同時に実行した場合、テンプレート テーブルと通常の insertAll コマンドによって挿入された各行の間で重複排除は実行されません。

ビュー

ビューをテンプレート テーブルとして使用することはできません。また、テンプレート テーブルを元にビューを自動生成させることもできません。

ストリーミング挿入のトラブルシューティング

以降のセクションでは、以前のストリーミング API を使用して BigQuery にデータをストリーミングする際に発生するエラーのトラブルシューティングについて説明します。ストリーミング挿入の割り当てエラーを解決する詳しい方法については、ストリーミング挿入の割り当てエラーをご覧ください。

失敗 HTTP レスポンス コード

ネットワーク エラーなどの失敗 HTTP レスポンス コードが返された場合、ストリーミング挿入が成功したかどうかを確認する方法はありません。リクエストを再送信しようとすると、テーブル内に重複行が発生する可能性があります。テーブルを重複から保護するには、リクエストの送信時に insertId プロパティを設定します。insertId プロパティは BigQuery によって重複排除に使用されます。

権限エラー、無効なテーブル名エラー、または割り当て超過エラーが返された場合、行は挿入されず、リクエスト全体が失敗します。

成功 HTTP レスポンス コード

成功 HTTP レスポンス コードが返された場合でも、BigQuery による行の挿入が部分的にしか成功しなかった可能性があります。レスポンスの insertErrors プロパティをチェックして、行の挿入が成功したかどうかを確認する必要があります。次のシナリオのいずれかが発生することがあります。

  • すべての行が正常に挿入されている。insertErrors プロパティが空のリストになっている場合は、すべての行が正常に挿入されています。
  • 一部の行が正常に挿入されている。いずれかの行にスキーマの不一致がある場合を除き、insertErrors プロパティで示された行が挿入されておらず、それ以外の行はすべて正常に挿入されています。errors プロパティには、行の挿入が失敗した理由に関する詳細情報が含まれます。index プロパティは、エラーに該当するリクエストの 0 ベースの行インデックスを示します。
  • 挿入された行がない。BigQuery がリクエスト内で個別行のスキーマ不一致を検出した場合は、いずれの行も挿入されず、スキーマ不一致がなかった行も含めて、各行に対して insertErrors エントリが返されます。スキーマ不一致がなかった行のエラーは reason プロパティが stopped に設定され、そのまま再送信することができます。失敗した行には、スキーマ不一致に関する詳細情報が含まれます。BigQuery データ型でサポートされているプロトコル バッファの型については、データ型の変換をご覧ください。

ストリーミング挿入のメタデータ エラー

BigQuery のストリーミング API はインサート率が高くなるように設計されているため、ストリーミング システムを操作する際、基盤となるテーブルのメタデータの変更は結果整合が保たれます。ほとんどの場合、メタデータの変更は数分以内にプロパゲートされますが、その間は API レスポンスで、対象テーブルの不整合な状態が反映されることがあります。

次のようなシナリオがあります。

  • スキーマの変更。スキーマの変更がストリーミング システムにすぐに反映されない場合があるため、ストリーミング挿入を最近受け取ったテーブルのスキーマを変更すると、スキーマの不一致エラーが発生することがあります。
  • テーブルの作成 / 削除。存在しないテーブルへのストリーミングによって、notFound レスポンスのバリエーションが返されます。レスポンスで作成されたテーブルは、後続のストリーミング挿入ですぐには認識されない可能性があります。同様に、テーブルを削除または、再作成すると、古いテーブルにストリーミング挿入が実質的に配信される期間が生じることがあります。このストリーミング挿入は新しいテーブルには存在しない可能性があります。
  • テーブルの切り捨て - (WRITE_TRUNCATE の writeDisposition を使用するクエリジョブによって)テーブルのデータを切り捨てる場合も同様に、整合性期間中の後続の挿入が破棄されることがあります。

データが見つからない、または利用できない

ストリーミング挿入は、書き込み用に最適化されたストレージに一時的に存在します。このストレージは、マネージド ストレージと可用性の特性が異なります。BigQuery の一部のオペレーション(テーブル コピー ジョブや tabledata.list などの API メソッドなど)では、書き込み用に最適化されたストレージは操作されません。最新のストリーミング データが宛先テーブルまたは出力に存在しません。