Menggunakan API streaming lama

Dokumen ini menjelaskan cara melakukan streaming data ke BigQuery menggunakan metode tabledata.insertAll yang lama.

Untuk project baru, sebaiknya gunakan BigQuery Storage Write API, bukan metode tabledata.insertAll. Storage Write API memiliki harga yang lebih rendah dan fitur yang lebih canggih, termasuk semantik pengiriman tepat satu kali. Jika Anda memigrasikan project yang ada dari metode tabledata.insertAll ke Storage Write API, sebaiknya pilih aliran default. Metode tabledata.insertAll masih didukung sepenuhnya.

Sebelum memulai

  1. Pastikan Anda memiliki akses tulis ke set data yang berisi tabel tujuan. Tabel tersebut harus ada sebelum Anda mulai menulis data ke tabel tersebut, kecuali jika Anda menggunakan tabel template. Untuk informasi selengkapnya tentang tabel template, lihat Membuat tabel secara otomatis menggunakan tabel template.

  2. Periksa kebijakan kuota untuk data streaming.

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Streaming tidak tersedia melalui paket gratis. Jika Anda mencoba menggunakan streaming tanpa mengaktifkan penagihan, Anda akan menerima error berikut: BigQuery: Streaming insert is not allowed in the free tier.

  5. Berikan peran Identity and Access Management (IAM) yang memberi pengguna izin yang diperlukan untuk melakukan setiap tugas dalam dokumen ini.

Izin yang diperlukan

Untuk melakukan streaming data ke BigQuery, Anda memerlukan izin IAM berikut:

  • bigquery.tables.updateData (memungkinkan Anda menyisipkan data ke dalam tabel)
  • bigquery.tables.get (memungkinkan Anda mendapatkan metadata tabel)
  • bigquery.datasets.get (memungkinkan Anda mendapatkan metadata set data)
  • bigquery.tables.create (wajib jika Anda menggunakan tabel template untuk membuat tabel secara otomatis)

Setiap peran IAM yang telah ditetapkan berikut mencakup izin yang diperlukan untuk melakukan streaming data ke BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Untuk mengetahui informasi lebih lanjut tentang peran dan izin IAM di BigQuery, baca Peran dan izin bawaan.

Melakukan streaming data ke BigQuery

C#

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery C# API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Go API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Java API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Node.js di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Node.js API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan PHP di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery PHP API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Python API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Ruby di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Ruby API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Anda tidak perlu mengisi kolom insertID saat menyisipkan baris. Contoh berikut menunjukkan cara menghindari pengiriman insertID untuk setiap baris saat melakukan streaming.

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Java API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Panduan memulai BigQuery menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi BigQuery Python API.

Untuk melakukan autentikasi ke BigQuery, siapkan Kredensial Default Aplikasi. Untuk informasi selengkapnya, lihat Menyiapkan autentikasi untuk library klien.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Mengirim data tanggal dan waktu

Untuk kolom tanggal dan waktu, format data tersebut dalam metode tabledata.insertAll sebagai berikut:

Jenis Format
DATE String dalam format "YYYY-MM-DD"
DATETIME String dalam format "YYYY-MM-DD [HH:MM:SS]"
TIME String dalam format "HH:MM:SS"
TIMESTAMP Jumlah detik sejak 1970-01-01 (epoch Unix), atau string dalam format "YYYY-MM-DD HH:MM[:SS]"

Mengirim data rentang

Untuk kolom dengan jenis RANGE<T>, format data dalam metode tabledata.insertAll sebagai objek JSON dengan dua kolom, start dan end. Nilai yang tidak ada atau NULL untuk kolom start dan end mewakili batas yang tidak terbatas. Kolom ini harus memiliki format JSON yang didukung dengan jenis T yang sama, dengan T dapat berupa salah satu dari DATE, DATETIME, dan TIMESTAMP.

Pada contoh berikut, kolom f_range_date mewakili kolom RANGE<DATE> dalam tabel. Baris disisipkan ke dalam kolom ini menggunakan tabledata.insertAll API.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Ketersediaan data streaming

Data tersedia untuk analisis real-time menggunakan kueri GoogleSQL segera setelah BigQuery berhasil mengonfirmasi permintaan tabledata.insertAll.

Baris yang baru-baru ini di-streaming ke tabel berpartisi berdasarkan waktu penyerapan untuk sementara memiliki nilai NULL untuk kolom semu _PARTITIONTIME. Untuk baris semacam itu, BigQuery menetapkan nilai non-NULL akhir kolom PARTITIONTIME di latar belakang, biasanya dalam beberapa menit. Dalam kasus yang jarang terjadi, proses ini dapat memerlukan waktu hingga 90 menit.

Beberapa baris yang baru-baru ini di-streaming mungkin tidak tersedia untuk salinan tabel, biasanya selama beberapa menit. Dalam kasus yang jarang terjadi, proses ini dapat memerlukan waktu hingga 90 menit. Untuk melihat apakah data tersedia untuk penyalinan tabel, periksa respons tables.get untuk bagian yang bernama streamingBuffer. Jika bagian streamingBuffer tidak ada, data Anda tersedia untuk disalin. Anda juga dapat menggunakan kolom streamingBuffer.oldestEntryTime untuk mengidentifikasi usia kumpulan data dalam buffering streaming.

Penghapusan duplikat dengan upaya terbaik

Saat Anda menyediakan insertId untuk baris yang disisipkan, BigQuery akan menggunakan ID ini untuk mendukung upaya terbaik penghapusan duplikat hingga satu menit. Artinya, jika Anda melakukan streaming baris yang sama dengan insertId yang sama lebih dari sekali dalam jangka waktu tersebut ke tabel yang sama, BigQuery mungkin akan menghapus duplikat beberapa kemunculan dari baris tersebut, dan hanya mempertahankan satu dari kemunculan tersebut.

Sistem mengharapkan bahwa baris yang disediakan dengan insertId yang identik juga identik. Jika dua baris memiliki insertId yang identik, baris mana yang dipertahankan BigQuery tidak akan bersifat deterministik.

Penghapusan duplikat umumnya dimaksudkan untuk skenario percobaan ulang dalam sistem terdistribusi tanpa cara untuk menentukan status streaming insert dalam kondisi error tertentu, seperti error jaringan antara sistem Anda dan BigQuery atau internal error di dalam BigQuery. Jika Anda mencoba menyisipkannya lagi, gunakan insertId yang sama untuk kumpulan baris yang sama, sehingga BigQuery dapat mencoba menghapus duplikat data Anda. Untuk mengetahui informasi selengkapnya, lihat memecahkan masalah streaming insert.

Penghapusan duplikat yang ditawarkan oleh BigQuery adalah upaya terbaik, dan tidak boleh diandalkan sebagai mekanisme untuk menjamin tidak adanya duplikat dalam data Anda. Selain itu, BigQuery dapat menurunkan kualitas upaya terbaik penghapusan duplikat kapan saja untuk menjamin keandalan dan ketersediaan yang lebih tinggi untuk data Anda.

Jika Anda memiliki persyaratan penghapusan duplikat yang ketat untuk data, Google Cloud Datastore adalah layanan alternatif yang mendukung transaksi.

Menonaktifkan penghapusan duplikat dengan upaya terbaik

Anda dapat menonaktifkan penghilangan duplikat dengan upaya terbaik dengan tidak mengisi kolom insertId untuk setiap baris yang disisipkan. Ini adalah cara yang disarankan untuk menyisipkan data.

Apache Beam dan Dataflow

Untuk menonaktifkan penghapusan duplikat dengan upaya terbaik saat Anda menggunakan konektor I/O BigQuery Apache Beam untuk Java, gunakan metode ignoreInsertIds().

Menghapus duplikat secara manual

Untuk memastikan tidak ada baris duplikat setelah Anda selesai streaming, gunakan proses manual berikut:

  1. Tambahkan insertId sebagai kolom dalam skema tabel Anda dan sertakan nilai insertId dalam data untuk setiap baris.
  2. Setelah streaming dihentikan, lakukan kueri berikut untuk memeriksa apakah ada duplikat:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Jika hasilnya lebih besar dari 1, akan ada duplikat.
  3. Untuk menghapus duplikat, jalankan kueri berikut. Tentukan tabel tujuan, izinkan hasil yang besar, dan nonaktifkan perataan hasil.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Catatan tentang kueri penghapusan duplikat:

  • Strategi yang lebih aman untuk kueri penghapusan duplikat adalah menargetkan tabel baru. Atau, Anda dapat menargetkan tabel sumber dengan disposisi tulis WRITE_TRUNCATE.
  • Kueri penghapusan duplikat menambahkan kolom row_number dengan nilai 1 ke akhir skema tabel. Kueri tersebut menggunakan pernyataan SELECT * EXCEPT dari GoogleSQL untuk mengecualikan kolom row_number dari tabel tujuan. Awalan #standardSQL memungkinkan GoogleSQL untuk kueri ini. Atau, Anda dapat memilih berdasarkan nama kolom tertentu untuk menghilangkan kolom ini.
  • Untuk membuat kueri data langsung dengan menghapus duplikat, Anda juga dapat membuat tampilan atas tabel menggunakan kueri penghapusan duplikat. Perhatikan bahwa biaya kueri terhadap tampilan dihitung berdasarkan kolom yang dipilih dalam tampilan Anda, yang dapat menghasilkan ukuran pemindaian byte yang besar.

Streaming ke tabel berpartisi waktu

Saat Anda melakukan streaming data ke tabel berpartisi waktu, setiap partisi memiliki buffering streaming. Buffer streaming dipertahankan saat Anda melakukan tugas pemuatan, kueri, atau penyalinan yang menimpa partisi dengan menetapkan properti writeDisposition ke WRITE_TRUNCATE. Jika Anda ingin menghapus buffering streaming, pastikan buffer streaming kosong dengan memanggil tables.get pada partisi.

Partisi waktu penyerapan

Saat Anda melakukan streaming ke tabel berpartisi berdasarkan waktu penyerapan, BigQuery akan menyimpulkan partisi tujuan dari waktu UTC saat ini.

Data yang baru tiba akan ditempatkan untuk sementara di partisi __UNPARTITIONED__ saat berada di buffering streaming. Jika tersedia cukup data yang tidak dipartisi, BigQuery akan mempartisi data tersebut ke partisi yang benar. Namun, tidak ada SLA untuk waktu yang diperlukan data untuk dipindahkan dari partisi __UNPARTITIONED__. Kueri dapat mengecualikan data dalam buffering streaming dari kueri dengan memfilter nilai NULL dari partisi __UNPARTITIONED__ dengan menggunakan salah satu kolom pseudo (_PARTITIONTIME atau _PARTITIONDATE bergantung pada jenis data yang Anda inginkan).

Jika melakukan streaming data ke tabel berpartisi harian, Anda dapat mengganti inferensi tanggal dengan menyediakan dekorator partisi sebagai bagian dari permintaan insertAll. Sertakan dekorator dalam parameter tableId. Misalnya, Anda dapat melakukan streaming ke partisi yang sesuai dengan 2021-03-01 untuk tabel table1 menggunakan dekorator partisi:

table1$20210301

Saat melakukan streaming menggunakan dekorator partisi, Anda dapat melakukan streaming ke partisi dalam 31 hari terakhir dan 16 hari ke depan, relatif terhadap tanggal saat ini, berdasarkan waktu UTC saat ini. Untuk menulis ke partisi untuk tanggal di luar batas yang diizinkan ini, gunakan tugas pemuatan atau kueri, seperti yang dijelaskan dalam Menambahkan dan menimpa data tabel berpartisi.

Streaming menggunakan dekorator partisi hanya didukung untuk tabel berpartisi harian. Parameter ini tidak didukung untuk tabel berpartisi per jam, bulanan, atau tahunan.

Untuk melakukan pengujian, Anda dapat menggunakan alat command line bq dari perintah CLI bq insert. Misalnya, perintah berikut mengalirkan satu baris ke sebuah partisi untuk tanggal 1 Januari 2017 ($20170101) ke dalam tabel berpartisi bernama mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partisi kolom satuan waktu

Anda dapat mengalirkan data ke tabel yang dipartisi pada kolom DATE, DATETIME, atau TIMESTAMP yang berdurasi antara 5 tahun terakhir dan 1 tahun ke depan. Data di luar rentang ini ditolak.

Saat di-streaming, data awalnya ditempatkan di partisi __UNPARTITIONED__. Jika tersedia cukup data yang tidak dipartisi, BigQuery akan otomatis mempartisi ulang data tersebut, dan menempatkannya ke partisi yang sesuai. Namun, tidak ada SLA untuk waktu yang diperlukan data untuk dipindahkan dari partisi __UNPARTITIONED__.

  • Catatan: Partisi harian diproses secara berbeda dengan partisi per jam, bulanan, dan tahunan. Hanya data di luar rentang tanggal (7 hari terakhir hingga 3 hari mendatang) yang diekstrak ke partisi TIDAK BERPARTISI, menunggu untuk dipartisi ulang. Di sisi lain, untuk tabel berpartisi per jam, data selalu diekstrak ke partisi tanpa partisi, lalu dipartisi ulang.

Membuat tabel secara otomatis menggunakan tabel template

Tabel template menyediakan mekanisme untuk membagi tabel logis menjadi banyak tabel yang lebih kecil untuk membuat kumpulan data yang lebih kecil (misalnya, berdasarkan ID pengguna). Tabel template memiliki sejumlah batasan yang dijelaskan di bawah ini. Sebagai gantinya, tabel berpartisi dan tabel yang dikelompokkan adalah cara yang direkomendasikan untuk mencapai perilaku ini.

Untuk menggunakan tabel template melalui BigQuery API, tambahkan parameter templateSuffix ke permintaan insertAll Anda. Untuk alat command line bq, tambahkan flag template_suffix ke perintah insert Anda. Jika mendeteksi parameter templateSuffix atau flag template_suffix, BigQuery akan memperlakukan tabel yang ditargetkan sebagai template dasar. Opsi ini akan membuat tabel baru yang memiliki skema yang sama dengan tabel yang ditargetkan, dan memiliki nama yang menyertakan akhiran yang ditentukan:

<targeted_table_name> + <templateSuffix>

Dengan menggunakan tabel template, Anda dapat menghindari overhead pembuatan setiap tabel satu per satu dan menentukan skema untuk setiap tabel. Anda hanya perlu membuat satu template dan menyediakan akhiran yang berbeda sehingga BigQuery dapat membuat tabel baru untuk Anda. BigQuery menempatkan tabel dalam project dan set data yang sama.

Tabel yang dibuat menggunakan tabel template biasanya tersedia dalam beberapa detik. Terkadang, mungkin perlu waktu lebih lama untuk tersedia.

Mengubah skema tabel template

Jika Anda mengubah skema tabel template, semua tabel yang dihasilkan selanjutnya akan menggunakan skema yang diperbarui. Tabel yang dihasilkan sebelumnya tidak akan terpengaruh, kecuali jika tabel yang ada masih memiliki buffer streaming.

Untuk tabel yang sudah ada yang masih memiliki buffering streaming, jika Anda mengubah skema tabel template dengan cara yang kompatibel dengan versi lama, skema tabel yang dihasilkan yang di-streaming secara aktif tersebut juga akan diperbarui. Namun, jika Anda mengubah skema tabel template dengan cara yang tidak kompatibel dengan versi sebelumnya, semua data yang di-buffer dan menggunakan skema lama akan hilang. Selain itu, Anda tidak dapat melakukan streaming data baru ke tabel yang dihasilkan dan menggunakan skema lama yang kini sudah tidak kompatibel.

Setelah Anda mengubah skema tabel template, tunggu hingga perubahan diterapkan sebelum Anda mencoba menyisipkan data baru atau membuat kueri tabel yang dihasilkan. Permintaan untuk menyisipkan kolom baru akan berhasil dalam beberapa menit. Upaya untuk mengkueri kolom baru mungkin memerlukan waktu tunggu lebih lama hingga 90 menit.

Jika Anda ingin mengubah skema tabel yang dihasilkan, jangan ubah skema hingga streaming melalui tabel template berhenti dan bagian statistik streaming tabel yang dihasilkan tidak ada dalam respons tables.get(), yang menunjukkan bahwa tidak ada data yang di-buffer di tabel.

Tabel berpartisi dan tabel yang dikelompokkan tidak mengalami batasan sebelumnya dan merupakan mekanisme yang direkomendasikan.

Detail tabel template

Nilai akhiran template
Nilai templateSuffix (atau --template_suffix) hanya boleh berisi huruf (az, AZ), angka (0-9), atau garis bawah (_). Panjang gabungan maksimum nama tabel dan akhiran tabel adalah 1.024 karakter.
Kuota

Tabel template tunduk pada batasan kuota streaming. Project Anda dapat membuat hingga 10 tabel per detik dengan tabel template, mirip dengan tables.insert API. Kuota ini hanya berlaku untuk tabel yang sedang dibuat, bukan untuk tabel yang diubah.

Jika aplikasi Anda perlu membuat lebih dari 10 tabel per detik, sebaiknya gunakan tabel yang dikelompokkan. Misalnya, Anda dapat memasukkan ID tabel kardinalitas tinggi ke dalam kolom utama dari satu tabel pengelompokan.

Time to live (TTL)

Tabel yang dihasilkan akan mewarisi waktu habis masa berlakunya dari set data. Seperti data streaming normal, tabel yang dihasilkan tidak dapat langsung disalin.

Penghapusan Duplikat

Penghapusan duplikat hanya terjadi di antara referensi seragam ke tabel tujuan. Misalnya, jika Anda secara bersamaan melakukan streaming ke tabel yang dihasilkan menggunakan tabel template dan perintah insertAll reguler, tidak ada penghapusan duplikat yang terjadi antara baris yang disisipkan oleh tabel template dan perintah insertAll reguler.

Dilihat

Tabel template dan tabel yang dihasilkan tidak boleh berupa tabel virtual.

Memecahkan masalah streaming insert

Bagian berikut membahas cara memecahkan masalah yang terjadi saat Anda melakukan streaming data ke BigQuery menggunakan API streaming lama. Untuk informasi lebih lanjut tentang cara mengatasi error kuota untuk streaming insert, lihat Error kuota streaming insert.

Kode respons HTTP gagal

Jika Anda menerima kode respons HTTP yang gagal seperti error jaringan, tidak ada cara untuk mengetahui apakah streaming insert berhasil atau tidak. Jika Anda mencoba mengirim ulang permintaan, Anda mungkin akan mendapatkan baris duplikat di tabel Anda. Untuk membantu melindungi tabel Anda dari duplikasi, tetapkan properti insertId saat mengirim permintaan. BigQuery menggunakan properti insertId untuk penghapusan duplikat.

Jika Anda menerima pesan error izin, error nama tabel yang tidak valid, atau error kuota terlampaui, tidak ada baris yang disisipkan dan seluruh permintaan akan gagal.

Kode respons HTTP berhasil

Meskipun Anda menerima kode respons HTTP yang berhasil, Anda harus memeriksa properti insertErrors respons untuk menentukan apakah penyisipan baris berhasil atau tidak karena mungkin BigQuery hanya berhasil sebagian dalam menyisipkan baris. Anda mungkin mengalami salah satu skenario berikut:

  • Semua baris berhasil disisipkan. Jika properti insertErrors adalah daftar kosong, semua baris berhasil disisipkan.
  • Beberapa baris berhasil disisipkan. Kecuali jika ada ketidakcocokan skema di salah satu baris, baris yang ditunjukkan dalam properti insertErrors tidak akan disisipkan, dan semua baris lainnya berhasil disisipkan. Properti errors berisi informasi mendetail tentang penyebab kegagalan setiap baris yang gagal. Properti index menunjukkan indeks baris berbasis 0 pada permintaan tempat error diterapkan.
  • Tidak ada baris yang berhasil disisipkan. Jika BigQuery menemukan ketidakcocokan skema di setiap baris dalam permintaan, tidak ada satu pun baris yang disisipkan dan entri insertErrors ditampilkan untuk setiap baris, bahkan untuk baris yang tidak memiliki skema yang tidak cocok. Baris yang tidak memiliki ketidakcocokan skema akan mengalami error dengan properti reason yang disetel ke stopped, dan dapat dikirim kembali sebagaimana adanya. Baris yang gagal menyertakan informasi mendetail tentang ketidakcocokan skema. Untuk mempelajari jenis buffering protokol yang didukung untuk setiap jenis data BigQuery, lihat Konversi jenis data.

Error metadata untuk streaming insert

Karena streaming API BigQuery dirancang untuk tingkat penyisipan yang tinggi, modifikasi pada tampilan metadata tabel yang mendasarinya pada akhirnya akan konsisten saat berinteraksi dengan sistem streaming. Biasanya, perubahan metadata diterapkan dalam hitungan menit, tetapi selama periode ini, respons API mungkin mencerminkan status tabel yang tidak konsisten.

Beberapa skenario mencakup:

  • Perubahan Skema. Mengubah skema tabel yang baru-baru ini menerima streaming insert dapat menyebabkan respons dengan ketidakcocokan skema menjadi error karena sistem streaming mungkin tidak segera menerima perubahan skema.
  • Pembuatan/Penghapusan Tabel. Streaming ke tabel yang tidak ada akan menampilkan variasi respons notFound. Tabel yang dibuat sebagai respons mungkin tidak segera dikenali oleh streaming insert berikutnya. Demikian pula, menghapus atau membuat ulang tabel dapat menghasilkan jangka waktu saat streaming insert secara efektif dikirim ke tabel lama. Streaming insert mungkin tidak ada di tabel baru.
  • Pemotongan Tabel. Memotong data tabel (dengan menggunakan tugas kueri yang menggunakan writeDisposition WRITE_TRUNCATE) juga dapat menyebabkan penyisipan berikutnya selama periode konsistensi dihapus.

Data Tidak ada/Tidak tersedia

Streaming insert berada untuk sementara di penyimpanan yang dioptimalkan untuk penulisan, yang memiliki karakteristik ketersediaan berbeda dengan penyimpanan terkelola. Operasi tertentu di BigQuery tidak berinteraksi dengan penyimpanan yang dioptimalkan untuk penulisan, seperti tugas penyalinan tabel dan metode API seperti tabledata.list. Data streaming terbaru tidak akan ada di tabel atau output tujuan.