Streaming data into BigQuery

Instead of using a job to load data into BigQuery, you can choose to stream your data into BigQuery one record at a time by using the tabledata.insertAll method. This approach enables querying data without the delay of running a load job.

This document discusses several important trade-offs to consider before choosing an approach, including streaming quotas, data availability, and data consistency.

Before you begin

  1. Ensure that you have write access to the dataset that contains your destination table. The table must exist before you begin writing data to it unless you are using template tables. For more information on template tables, see Creating tables automatically using template tables.

  2. Check the quota policy for streaming data.

  3. Make sure that billing is enabled for your Google Cloud project. Learn how to confirm billing is enabled for your project.

Streaming is not available via the free tier. If you attempt to use streaming without enabling billing, you receive the following error: BigQuery: Streaming insert is not allowed in the free tier.

Checking for data availability

Streamed data is available for real-time analysis within a few seconds of the first streaming insertion into a table. In rare circumstances (such as an outage), data in the streaming buffer may be temporarily unavailable. When data is unavailable, queries continue to run successfully, but they skip some of the data that is still in the streaming buffer. These queries will contain a warning in the errors field of, in the response to or in the status.errors field of

Data can take up to 90 minutes to become available for copy operations. Also, when streaming to a partitioned table, data in the streaming buffer has a NULL value for the _PARTITIONTIME pseudo column. To see whether data is available for copy, check the tables.get response for a section named streamingBuffer. If that section is absent, your data should be available for copy, and should have a non-null value for the _PARTITIONTIME pseudo column. Additionally, the streamingBuffer.oldestEntryTime field can be leveraged to identify the age of records in the streaming buffer.

Ensuring data consistency

To help ensure data consistency, you can supply insertId for each inserted row. BigQuery remembers this ID for at least one minute. If you try to stream the same set of rows within that time period and the insertId property is set, BigQuery uses the insertId property to de-duplicate your data on a best effort basis.

You might have to retry an insert because there's no way to determine the state of a streaming insert under certain error conditions, such as network errors between your system and BigQuery or internal errors within BigQuery. If you retry an insert, use the same insertId for the same set of rows so that BigQuery can attempt to de-duplicate your data. For more information, see troubleshooting streaming inserts.

In the rare instance of a Google datacenter losing connectivity unexpectedly, automatic deduplication may not be possible.

If you have stronger requirements for your data, Google Cloud Datastore is an alternative service that supports transactions.

Disabling best effort de-duplication

You can disable best effort de-duplication by not populating the insertId field for each row inserted. When you do not populate insertId, you get higher streaming ingest quotas in the us and eu multi-regions, and in the asia-northeast1 region. For more information, see Quotas and limits.

Apache Beam and Dataflow

To disable best effort de-duplication when you use Apache Beam's BigQuery I/O connector for Java, use the ignoreInsertIds() method.

Streaming data across data locations

You can stream data to datasets in both the US and EU. Data can flow through machines outside the dataset's location while BigQuery processes the insertAll request. If you are streaming data from a location outside of the dataset's location, you might experience increased latency and error rates.

Streaming into ingestion-time partitioned tables

You can stream individual rows into a partitioned table by using insertAll requests. By default, the destination partition for inserted data is inferred from the current date based on UTC time.

If you are streaming data into an ingestion-time partitioned table, you can override the date inference by supplying a partition decorator as part of the insertAll request. For example, you can stream to the partition corresponding to 2017-03-01 for table mydataset.table using the partition decorator:


Newly arriving data will be temporarily associated with the UNPARTITIONED partition while in the streaming buffer. A query can therefore exclude data in the streaming buffer from a query by filtering out the NULL values from the UNPARTITIONED partition by using one of the pseudocolumns ([_PARTITIONTIME] or [_PARTITIONDATE] depending on your preferred data type).

When streaming using a partition decorator, you can stream to partitions within the last 31 days in the past and 16 days in the future relative to the current date, based on current UTC time. To write to partitions for dates outside these allowed bounds, you can use load or query jobs, as described in Appending to and overwriting partitioned table data.

Streaming into partitioned tables

You can stream data into a table partitioned on a DATE or TIMESTAMP column that is between 1 year in the past and 6 months in the future. Data outside this range is rejected.

When the data is streamed, data between 7 days in the past and 3 days in the future is placed in the streaming buffer, and then it is extracted to the corresponding partitions. Data outside of this window (but inside the 1 year, 6 month range) is placed in streaming buffer, and then it is extracted to the UNPARTITIONED partition. When there's enough unpartitioned data, it is loaded to the corresponding partitions.

Creating tables automatically using template tables

A common usage pattern for streaming data into BigQuery is to split a logical table into many smaller tables to create smaller sets of data (for example, by user ID). To create smaller sets of data by date, use partitioned tables. To create smaller tables that are not date-based, use template tables and BigQuery creates the tables for you.

To use a template table via the BigQuery API, add a templateSuffix parameter to your insertAll request. For the bq command-line tool, add the template_suffix flag to your insert command. If BigQuery detects a templateSuffix parameter or the template_suffix flag, it treats the targeted table as a base template, and creates a new table that shares the same schema as the targeted table and has a name that includes the specified suffix:

<targeted_table_name> + <templateSuffix>

By using a template table, you avoid the overhead of creating each table individually and specifying the schema for each table. You need only create a single template, and supply different suffixes so that BigQuery can create the new tables for you. BigQuery places the tables in the same project and dataset. Templates also make it easier to update the schema because you need only update the template table.

Tables created via template tables are usually available within a few seconds. On rare occasions they may take longer to become available.

Changing the template table schema

If you change a template table schema, all subsequently generated tables will use the updated schema. Previously generated tables will not be affected, unless the existing table still has a streaming buffer.

For existing tables that still have a streaming buffer, if you modify the template table schema in a backward compatible way, the schema of those actively streamed generated tables will also be updated. However, if you modify the template table schema in a non-backward compatible way, any buffered data that uses the old schema will be lost. Additionally, you will not be able to stream new data to existing generated tables that use the old, but now incompatible, schema.

After you change a template table schema, wait until the changes have propagated before you try to insert new data or query generated tables. Requests to insert new fields should succeed within a few minutes. Attempts to query the new fields might require a longer wait of up to 90 minutes.

If you want to change a generated table's schema, do not change the schema until streaming via the template table has ceased and the generated table's streaming statistics section is absent from the tables.get() response, which indicates that no data is buffered on the table.

Template table details

Template suffix value
The templateSuffix (or --template_suffix) value must contain only letters (a-z, A-Z), numbers (0-9), or underscores (_). The maximum combined length of the table name and the table suffix is 1024 characters.
The same quotas apply to all tables, whether they are based on templates or created manually.
Time to live
The generated table inherits its expiration time from the dataset. As with normal streaming data, generated tables cannot be copied or exported immediately.
Deduplication only happens between uniform references to a destination table. For example, if you simultaneously stream to a generated table using both template tables and a regular insertAll command, no deduplication occurs between rows inserted by template tables and a regular insertAll command.
The template table and the generated tables should not be views.

Example use cases

High volume event logging

If you have an app that collects a large amount of data in real-time, streaming inserts can be a good choice. Generally, these types of apps have the following criteria:

  • Not transactional. High volume, continuously appended rows. The app can tolerate a rare possibility that duplication might occur or that data might be temporarily unavailable.
  • Aggregate analysis. Queries generally are performed for trend analysis, as opposed to single or narrow record selection.

One example of high volume event logging is event tracking. Suppose you have a mobile app that tracks events. Your app, or mobile servers, could independently record user interactions or system errors and stream them into BigQuery. You could analyze this data to determine overall trends, such as areas of high interaction or problems, and monitor error conditions in real-time.

Manually removing duplicates

You can use the following manual process to ensure that no duplicate rows exist after you are done streaming.

  1. Add the insertId as a column in your table schema and include the insertId value in the data for each row.
  2. After streaming has stopped, perform the following query to check for duplicates:

      MAX(count) FROM(
        count(*) as count
      GROUP BY

    If the result is greater than 1, duplicates exist.
  3. To remove duplicates, perform the following query. You should specify a destination table, allow large results, and disable result flattening.

      * EXCEPT(row_number)
    FROM (
              OVER (PARTITION BY ID_COLUMN) row_number
      row_number = 1

Notes about the duplicate removal query:

  • The safer strategy for the duplicate removal query is to target a new table. Alternatively, you can target the source table with write disposition WRITE_TRUNCATE.
  • The duplicate removal query adds a row_number column with the value 1 to the end of the table schema. The query uses a SELECT * EXCEPT statement from standard SQL to exclude the row_number column from the destination table. The #standardSQL prefix enables standard SQL for this query. Alternatively, you can select by specific column names to omit this column.
  • For querying live data with duplicates removed, you can also create a view over your table using the duplicate removal query. Be aware that query costs against the view will be calculated based on the columns selected in your view, which can result in large bytes scanned sizes.

Troubleshooting streaming inserts

For information about how to troubleshoot errors during streaming inserts, see Troubleshooting streaming inserts on the Troubleshooting Errors page.

Back to top

Streaming insert examples


Before trying this sample, follow the C# setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery C# API reference documentation.

using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
        client.InsertRows(datasetId, tableId, rows);


Before trying this sample, follow the Go setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery Go API reference documentation.

import (


// Item represents a row item.
type Item struct {
	Name string
	Age  int

// Save implements the ValueSaver interface.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, "", nil

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	if err := inserter.Put(ctx, items); err != nil {
		return err
	return nil


Before trying this sample, follow the Java setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery Java API reference documentation.

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
            // More rows can be added in the same RPC by invoking .addRow() on the builder.
            // You can also supply optional unique row keys to support de-duplication scenarios.
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error


Before trying this sample, follow the Node.js setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery Node.js API reference documentation.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

   * TODO(developer): Uncomment the following lines before running the sample.
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
  console.log(`Inserted ${rows.length} rows`);


Before trying this sample, follow the PHP setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery PHP API reference documentation.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);


Before trying this sample, follow the Python setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery Python API reference documentation.

# TODO(developer): Import the client library.
# from import bigquery

# TODO(developer): Construct a BigQuery client object.
# client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the model to fetch.
# table_id = "your-project.your_dataset.your_table"

table = client.get_table(table_id)  # Make an API request.
rows_to_insert = [(u"Phred Phlyntstone", 32), (u"Wylma Phlyntstone", 29)]

errors = client.insert_rows(table, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")


Before trying this sample, follow the Ruby setup instructions in the BigQuery Quickstart Using Client Libraries. For more information, see the BigQuery Ruby API reference documentation.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery =
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
    puts "Failed to insert #{response.error_rows.count} rows"

Back to top