以串流方式將資料傳入 BigQuery

除了透過工作將資料載入 BigQuery,您也可以選擇使用 tabledata.insertAll 方法進行資料串流,一次將一筆記錄傳入 BigQuery。這個方法讓您不必執行載入作業,即可查詢資料。本文件說明在選擇方法前需要考慮的幾個重要優缺點,包括串流配額、資料可用性以及資料一致性。

事前準備

  1. 確認您具備目的地資料表所屬資料集的寫入權限。除非您使用範本資料表,否則該資料表在開始寫入資料之前就必須存在。如要進一步瞭解範本資料表,請參閱使用範本資料表自動建立資料表

  2. 查看串流資料的配額政策

  3. 請確認您已啟用 Google Cloud Platform 專案的計費功能。

    瞭解如何啟用計費功能

    免費方案並不支援資料串流功能。如果不啟用計費功能,當您嘗試進行資料串流作業時,系統會顯示下列錯誤訊息:BigQuery: Streaming insert is not allowed in the free tier.

檢查資料可用性

在首次對資料表執行串流資料插入後的幾秒鐘內,串流資料即可用於即時分析。在少數情況下 (例如服務中斷),串流緩衝區中的資料可能暫時無法使用。如果資料無法使用,查詢會繼續執行,但會略過仍在串流緩衝區中的一些資料。這些查詢會在 bigquery.jobs.getQueryResults 中的 errors 欄位、bigquery.jobs.query 的回應,或 bigquery.jobs.getstatus.errors 欄位中包含警告。

您最多可能需要等待 90 分鐘的處理時間,才能複製及匯出資料。此外,將資料以串流方式傳入分區資料表時,串流緩衝區中的資料在 _PARTITIONTIME 虛擬資料欄的值為 NULL。如要確認資料可否用於複製和匯出作業,請檢查 tables.get 回應中是否有名稱為 streamingBuffer 的部分。假如缺少該部分,則代表您的資料處於可複製或匯出的狀態,而且 _PARTITIONTIME 虛擬資料欄應該會有非空值。此外,streamingBuffer.oldestEntryTime 欄位可用來識別串流緩衝區中的記錄存續時間。

確保資料一致性

為確保資料一致性,建議您針對每個插入列提供 insertId。BigQuery 會將這組 ID 記住至少一分鐘。假如您嘗試在一分鐘內對同一組資料列進行串流處理,而且 insertId 屬性已經設定,則 BigQuery 會運用 insertId 屬性儘可能地刪除重複的資料。

由於在某些錯誤情況下 (例如系統與 BigQuery 之間的網路發生錯誤,或是 BigQuery 發生內部錯誤),無法確定串流資料插入作業的狀態,因此您可能必須重試插入作業。假如您要重試插入作業,請針對同一組資料列使用相同的 insertId,這樣 BigQuery 才能嘗試刪除重複的資料。詳情請參閱排解串流資料插入的相關問題一節。

若 Google 資料中心意外失去連線能力 (極少發生),系統將無法自動刪除重複的資料。

如果您對資料有更高的要求,可改為使用支援交易作業Google Cloud Datastore 服務。

停用盡可能清除重複的功能

您可以透過不在每個插入的資料列中填入 insertId 欄位來停用盡可能清除重複的功能。當您選擇不填入 insertId 時,您可以讓 US 地區取得更多的串流擷取配額。詳情請參閱配額與限制頁面。

在資料位置之間以串流方式傳送資料

您能夠以串流方式將資料傳入位於美國和歐盟的資料集。當 BigQuery 處理 insertAll 要求時,資料可能會流經資料集所在位置以外的機器。假如您從資料集所在位置以外的地方以串流方式傳輸資料,可能會遭遇較高的延遲和錯誤率。

以串流方式將資料傳入擷取時間分區資料表

您可以使用 insertAll 要求以串流方式將個別資料列傳入分區資料表。根據預設,系統會根據世界標準時間的目前日期推斷插入資料的目的地分區。

如果您要以串流方式將資料傳入擷取時間分區資料表,可以在 insertAll 要求中加入分區修飾符來覆寫推斷的日期。例如,您可以使用以下分區修飾符,以串流方式將資料傳輸至資料表 mydataset.table 中與 2017-03-01 對應的分區:

mydataset.table$20170301

當新到達的資料位於串流緩衝區時,會暫時與 UNPARTITIONED 分區建立關聯。因此,查詢可以使用虛擬資料欄 ([_PARTITIONTIME] 或 [_PARTITIONDATE],視您偏好的資料類型而定) 從 UNPARTITIONED 分區篩選出 NULL 值,藉此從查詢中排除串流緩衝區的資料。

使用分區修飾符以串流方式傳輸資料時,可以根據目前的世界標準時間,以串流方式將資料傳輸至過去 31 天和未來 16 天之間 (相對於目前日期) 的分區。如要針對前述允許範圍外的日期寫入分區,您可以使用載入或查詢工作,如附加並覆寫分區資料表資料一文所述。

以串流方式將資料傳入分區資料表

您可以串流方式將資料傳入過去 1 年和未來 6 個月之間以 DATETIMESTAMP 資料欄分區的資料表。超出這個範圍的資料會遭拒。

當資料進行串流時,過去 7 天和未來 3 天之間的資料會放在串流緩衝區中,之後系統會擷取至對應的分區。超出這個時間範圍 (但在過去 1 年和未來 6 個月之間) 的資料會放在串流緩衝區中,之後系統會擷取至 UNPARTITIONED 分區。未分區資料累積到足夠的量以後,系統就會把資料載入對應的分區。

使用範本資料表自動建立資料表

以串流方式將資料傳入 BigQuery 的常見使用模式,是將邏輯資料表分成許多較小的資料表,以便建立較小的資料集 (例如,透過使用者 ID)。如要依照日期建立較小的資料集,請使用分區資料表。如果不想根據日期建立較小的資料表,請使用「範本資料表」,讓 BigQuery 為您建立資料表。

如要透過 BigQuery API 使用範本資料表,請將 templateSuffix 參數新增到 insertAll 要求。如需 bq 指令列工具,請將 template_suffix 標記新增到 insert 指令。假如 BigQuery 偵測到 templateSuffix 參數或 template_suffix 標記,就會將指定資料表視為基礎範本,並建立一份結構定義與指定資料表相同的新資料表,而且該資料表具有包含指定後置字元的名稱:

<targeted_table_name> + <templateSuffix>

只要使用範本資料表,您就可以省去個別建立資料表以及為每個資料表指定結構定義的負擔。您只需要建立一個範本,並提供不同的後置字元,BigQuery 就能為您建立新資料表。BigQuery 會將資料表放到相同的專案和資料集。範本還可協助您輕鬆更新結構定義,這是因為您只需要更新範本資料表。

透過範本資料表建立的資料表通常在幾秒鐘內即可使用。只有在極少數情況下,才需要等待較長的時間。

變更範本資料表結構定義

假如您變更範本資料表結構定義,所有後續產生的資料表皆會使用新的結構定義。除非現有的資料表仍有串流緩衝區,否則先前產生的資料表不受影響。

就仍有串流緩衝區的現有資料表而言,假如您以回溯相容的方式修改範本資料表結構定義,主動串流產生的資料表結構定義也會隨之更新。不過,如果您以非回溯相容的方式修改範本資料表結構定義,使用舊結構定義的所有緩衝資料都將會遺失。此外,如果已產生的現有資料表使用現已不相容的舊結構定義,您就無法將新資料串流傳入表內。

變更範本資料表結構定義之後,請等待系統推送變更內容後,再試著插入新資料或查詢已產生的資料表。插入新欄位的要求應該會在幾分鐘內成功,而查詢新欄位的嘗試可能需要最多 90 分鐘。

假如您想變更已產生資料表的結構定義,請等到透過範本資料表進行的串流作業停止,且已產生資料表的串流統計資料區塊已不存在 tables.get() 回應中 (表示資料表已沒有緩衝資料),否則請勿變更結構定義。

範本資料表詳細資料

範本後置值
templateSuffix (或 --template_suffix) 值只能包含英文字母 (a-z、A-Z)、數字 (0-9) 或底線 (_)。資料表名稱和資料表後置字元的合併長度上限為 1024 個字元。
配額
不論是透過範本或手動建立的資料表,都適用相同的配額。
存續時間
已產生的資料表會沿用資料集的到期時間。已產生的資料表跟一般的串流資料一樣,無法立即複製或匯出。
刪除重複資料
系統只會針對目的地資料表的相同參照內容刪除重複資料。 舉例來說,假設您同時使用範本資料表和一般 insertAll 指令,將資料串流至已產生的資料表,則系統不會針對由範本資料表和一般 insertAll 指令所插入的資料列刪除重複資料。
資料檢視畫面
範本資料表和已產生的資料表不得設為資料檢視畫面。

使用範例

大量事件記錄

如果您的應用程式會即時收集大量資料,串流資料插入可能是不錯的選擇。一般來說,這類應用程式具有下列標準:

  • 非交易型。 大量、連續的附加資料列。應用程式可容忍以下罕見的可能性:可能發生重複或資料暫時不可用。
  • 匯總分析。 查詢一般會用於趨勢分析,而不是單一或限定記錄選擇。

大量事件記錄的其中一個例子是事件追蹤。假設您有一個會追蹤事件的行動應用程式。您的應用程式 (或行動伺服器) 可獨立記錄使用者互動或系統錯誤,並且以串流方式將這些資料傳入 BigQuery。您可以分析這些資料來判斷整體趨勢,例如高度互動或問題的領域,並即時監控錯誤情況。

手動移除重複內容

您可以透過以下手動移除程序,確保在串流作業完成後,不會有重複的資料列。

  1. 在資料表結構定義中,將 insertId 新增為資料欄並在每列資料中納入 insertId 值。
  2. 在串流作業停止後,執行下列查詢以檢查是否有重複內容:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    假如結果大於 1,表示有重複的項目。
  3. 如要移除重複的項目,請執行下列查詢。您必須指定目的地資料表、允許大量結果,並且停用結果整併功能。

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

以下是重複項目移除查詢的相關注意事項:

  • 保守的重複項目移除查詢策略是指定一份新資料表,也可以指定包含 WRITE_TRUNCATE 寫入配置的來源資料表。
  • 重複項目移除查詢會將 row_number 資料欄 (包含 1 這個值) 加到資料表結構定義尾端。這項查詢會使用標準 SQLSELECT * EXCEPT 陳述式,從目標資料表中排除 row_number 資料欄,而 #standardSQL 前置字串會為該查詢啟用標準 SQL。此外,您也可以選取特定的資料欄名稱來略過此資料欄。
  • 如要在移除重複項目後查詢即時資料,您也可以使用重複項目移除查詢來對資料表建立資料檢視。請注意,資料檢視的查詢成本是根據在資料檢視中選取的資料欄計算,這可能會導致位元組掃描大小過大。

排解插入串流資料的相關問題

如要進一步瞭解如何排解串流資料插入的錯誤,請參閱「排解相關錯誤」頁面的排解串流資料插入疑難部分。

返回頁首

串流資料插入範例

C#

在試行此示例之前,請前往 BigQuery 快速入門導覽課程:使用用戶端程式庫頁面,按照 C# 設定說明進行操作。詳情請參閱 BigQuery C# API 參考說明文件


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

在試行此示例之前,請前往 BigQuery 快速入門導覽課程:使用用戶端程式庫頁面,按照 Go 設定說明進行操作。詳情請參閱 BigQuery Go API 參考說明文件

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Uploader()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

在試行此示例之前,請前往 BigQuery 快速入門導覽課程:使用用戶端程式庫頁面,按照 Java 設定說明進行操作。詳情請參閱 BigQuery Java API 參考說明文件

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow(rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder.
            // You can also supply optional unique row keys to support de-duplication scenarios.
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

在試行此示例之前,請前往 BigQuery 快速入門導覽課程:使用用戶端程式庫頁面,按照 Node.js 設定說明進行操作。詳情請參閱 BigQuery Node.js API 參考說明文件

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

在試行此示例之前,請前往 BigQuery 快速入門導覽課程:使用用戶端程式庫頁面,按照 PHP 設定說明進行操作。詳情請參閱 BigQuery PHP API 參考說明文件

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

在試行此示例之前,請前往 BigQuery 快速入門導覽課程:使用用戶端程式庫頁面,按照 Python 設定說明進行操作。詳情請參閱 BigQuery Python API 參考說明文件

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

在試用這個範例程式之前,請至 BigQuery 快速入門導覽課程:使用用戶端程式庫,按照 Ruby 設定說明進行操作。詳情請參閱 BigQuery Ruby API 參考說明文件

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

返回頁首

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁