Legacy-Streaming-API verwenden

Auf dieser Seite wird erläutert, wie Sie mit der Methode tabledata.insertAll Daten in BigQuery streamen.

Für neue Projekte empfehlen wir die Verwendung der BigQuery Storage Write API anstelle der Methode tabledata.insertAll. Die Storage Write API bietet niedrigere Preise und robustere Features, einschließlich der Semantik genau einer Übermittlung. Wenn Sie ein vorhandenes Projekt von der Methode tabledata.insertAll zur Storage Write API migrieren, empfehlen wir die Auswahl des Standardstreams. Die Methode tabledata.insertAll wird weiterhin vollständig unterstützt.

Hinweis

  1. Sorgen Sie dafür, dass Sie Schreibzugriff auf das Dataset haben, das die Zieltabelle enthält. Die Tabelle muss bereits vorhanden sein, bevor Sie damit beginnen, Daten hineinzuschreiben. Die einzige Ausnahme zu dieser Vorgehensweise sind Vorlagentabellen. Weitere Informationen zu Vorlagentabellen finden Sie unter Tabellen mithilfe von Vorlagentabellen automatisch erstellen.

  2. Prüfen Sie die Kontingentrichtlinie für das Streaming von Daten.

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein.

  4. Das Streaming ist in der kostenlosen Stufe nicht verfügbar. Wenn Sie versuchen, das Streaming ohne aktivierte Abrechnung auszuführen, wird folgende Fehlermeldung angezeigt: BigQuery: Streaming insert is not allowed in the free tier.

  5. Erteilen Sie IAM-Rollen (Identity and Access Management), die Nutzern die erforderlichen Berechtigungen zum Ausführen der einzelnen Aufgaben in diesem Dokument geben.

Erforderliche Berechtigungen

Zum Streamen von Daten in BigQuery benötigen Sie die folgenden IAM-Berechtigungen:

  • bigquery.tables.updateData (kann Daten in die Tabelle einfügen)
  • bigquery.tables.get (kann Tabellenmetadaten abrufen)
  • bigquery.datasets.get (kann Dataset-Metadaten abrufen)
  • bigquery.tables.create (erforderlich, wenn Sie die Tabelle mithilfe einer Vorlagentabelle automatisch erstellen)

Jede der folgenden vordefinierten IAM-Rollen enthält die Berechtigungen, die Sie benötigen, um Daten in BigQuery zu streamen:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Weitere Informationen zu IAM-Rollen und Berechtigungen in BigQuery finden Sie unter Vordefinierte Rollen und Berechtigungen.

Daten nach BigQuery streamen

C#

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der C#-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery C# API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Go-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Go API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Java-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Java API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Node.js-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Node.js API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der PHP-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery PHP API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Python-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Python API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Ruby-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Ruby API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Sie müssen das insertID-Feld nicht ausfüllen, wenn Sie Zeilen hinzufügen. Das folgende Beispiel zeigt, wie Sie vermeiden können, dass beim Streamen für jede Zeile eine insertID gesendet wird.

Java

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Java-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Java API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Python-Einrichtungsanleitung in der BigQuery-Kurzanleitung zur Verwendung von Clientbibliotheken. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Python API.

Richten Sie zur Authentifizierung bei BigQuery die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter Authentifizierung für Clientbibliotheken einrichten.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Datums- und Uhrzeitdaten senden

Formatieren Sie für Datums- und Uhrzeitfelder die Daten in der Methode tabledata.insertAll folgendermaßen:

Typ Format
DATE Ein String im Format "YYYY-MM-DD".
DATETIME Ein String im Format "YYYY-MM-DD [HH:MM:SS]".
TIME Ein String im Format "HH:MM:SS".
TIMESTAMP Die Anzahl der Sekunden seit 01.01.1970 (die Unix-Epoche) oder ein String im Format "YYYY-MM-DD HH:MM[:SS]".

Bereichsdaten senden

Formatieren Sie für Felder vom Typ RANGE<T> die Daten in der Methode tabledata.insertAll als JSON-Objekt mit zwei Feldern: start und end. Nullwerte für die Felder start und end stellen unbegrenzte Grenzen dar. Diese Felder müssen das gleiche unterstützte JSON-Format vom Typ T haben, wobei T entweder DATE, DATETIME oder TIMESTAMP sein kann.

Im folgenden Beispiel stellt das Feld f_range_date eine RANGE<DATE>-Spalte in einer Tabelle dar. Mit der tabledata.insertAll API wird eine Zeile in diese Spalte eingefügt.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Verfügbarkeit von Streamdaten

Daten stehen für Echtzeitanalysen mit GoogleSQL-Abfragen zur Verfügung, unmittelbar nachdem BigQuery eine tabledata.insertAll-Anfrage erfolgreich bestätigt hat.

Kürzlich gestreamte Zeilen in eine nach Aufnahmezeit partitionierte Tabelle haben vorübergehend einen NULL-Wert für die Pseudospalte _PARTITIONTIME. Für solche Zeilen weist BigQuery den endgültigen Nicht-NULL-Wert der Spalte PARTITIONTIME im Hintergrund zu, normalerweise innerhalb weniger Minuten. In seltenen Fällen kann dies bis zu 90 Minuten dauern.

Einige kürzlich gestreamte Zeilen sind in der Regel für einige Minuten nicht zum Kopieren von Tabellen verfügbar. In seltenen Fällen kann dies bis zu 90 Minuten dauern. Wenn Sie herausfinden möchten, ob Daten zum Kopieren von Tabellen verfügbar sind, sehen Sie in der Antwort tables.get nach, ob ein Abschnitt mit dem Namen streamingBuffer vorhanden ist. Wenn der Abschnitt streamingBuffer nicht vorhanden ist, stehen Ihre Daten zum Kopieren zur Verfügung. Sie können das Feld streamingBuffer.oldestEntryTime auch verwenden, um das Alter von Datensätzen im Streaming-Zwischenspeicher zu ermitteln.

Best-Effort-Deduplizierung

Wenn Sie insertId für eine eingefügte Zeile angeben, verwendet BigQuery diese ID, um Best-Effort-Deduplizierung für bis zu eine Minute zu gewährleisten. Das heißt, wenn Sie dieselbe Zeile mit derselben insertId mehr als einmal innerhalb dieses Zeitraums in dieselbe Tabelle streamen, kann BigQuery die mehrfachen Vorkommen dieser Zeile deduplizieren und nur eines beibehalten.

Das System erwartet, dass Zeilen, die mit identischen insertId bereitgestellt werden, identisch sind. Wenn zwei Zeilen identische insertId haben, ist es nicht deterministisch, welche Zeile BigQuery speichert.

Das Entfernen von Duplikaten ist im Allgemeinen für Wiederholungsszenarien in einem verteilten System gedacht, wo bei bestimmten Fehlerbedingungen wie Netzwerkfehlern zwischen Ihrem System und BigQuery oder internen Fehlern in BigQuery keine Möglichkeit zum Ermitteln des Zustands eines Streaming-Insert besteht. Verwenden Sie beim Wiederholen einer Insert-Anweisung dieselbe insertId für denselben Satz Zeilen, damit BigQuery versuchen kann, Ihre Daten zu deduplizieren. Weitere Informationen finden Sie unter Fehlerbehebung bei Streaming-Insert-Anweisungen.

Die Deduplizierung von BigQuery gewährleistet keine bestimmte Qualität der Ergebnisse. Sie sollte daher nicht als Mechanismus verwendet werden, um Ihre Daten garantiert von Duplikaten zu bereinigen. Darüber hinaus kann BigQuery die Qualität der Best-Effort-Deduplizierung jederzeit beeinträchtigen, um eine höhere Zuverlässigkeit und Verfügbarkeit Ihrer Daten zu gewährleisten.

Wenn Sie höhere Anforderungen an die Deduplizierung Ihrer Daten haben, steht mit Google Cloud Datastore ein alternativer Dienst zur Verfügung, der Transaktionen unterstützt.

Best-Effort-Deduplizierung deaktivieren

Sie können die Best-Effort-Deduplizierung deaktivieren, wenn Sie das Feld insertId für keine der eingefügten Zeilen ausfüllen. Dies ist die empfohlene Methode zum Einfügen von Daten.

Apache Beam und Dataflow

Verwenden Sie die Methode ignoreInsertIds(), um die Best-Effort-Deduplizierung bei der Verwendung des BigQuery-E/A-Connectors für Java zu deaktivieren.

Duplikate manuell entfernen

Führen Sie den folgenden manuellen Prozess aus, damit nach dem Abschluss des Streamings keine doppelten Zeilen vorhanden sind:

  1. Fügen Sie Ihrem Tabellenschema die insertId als Spalte hinzu und nehmen Sie den insertId-Wert für jede Zeile in Ihre Daten auf.
  2. Führen Sie nach dem Abschluss des Streamings die folgende Abfrage durch, um nach Duplikaten zu suchen:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Ist das Ergebnis größer als 1, existieren Duplikate.
  3. Führen Sie die folgende Abfrage aus, um Duplikate zu entfernen. Geben Sie eine Zieltabelle an, lassen Sie große Ergebnisse zu und deaktivieren Sie die Ergebnisvereinfachung.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Hinweise zur Abfrage zum Entfernen von Duplikaten:

  • Die sicherere Strategie für die Abfrage zum Entfernen von Duplikaten ist die Anwendung auf eine neue Tabelle. Alternativ kann sie mit der Schreibanordnung WRITE_TRUNCATE auf die Zieltabelle angewendet werden.
  • Die Abfrage zum Entfernen von Duplikaten fügt am Ende des Tabellenschemas eine row_number-Spalte mit dem Wert 1 ein. In der Abfrage wird mit der Anweisung SELECT * EXCEPT aus dem GoogleSQL die Spalte row_number aus der Zieltabelle ausgeschlossen. Mit dem Präfix #standardSQL wird der Google SQL-Dialekt für diese Abfrage aktiviert. Alternativ ist eine Auswahl nach Spaltennamen möglich, um diese Spalte auszulassen.
  • Zum Abfragen von Livedaten mit entfernten Duplikaten können Sie mithilfe der Abfrage zum Entfernen von Duplikaten auch eine Ansicht Ihrer Tabelle erstellen. Beachten Sie, dass die Abfragekosten für die Ansicht basierend auf den in der Ansicht ausgewählten Spalten berechnet werden. Dies kann zu hohen Werten für gescannte Byte führen.

In zeitpartitionierte Tabellen streamen

Wenn Sie Daten in eine zeitpartitionierte Tabelle streamen, hat jede Partition einen Streaming-Zwischenspeicher. Der Streaming-Zwischenspeicher bleibt erhalten, wenn Sie das Attribut writeDisposition auf WRITE_TRUNCATE setzen, um einen Lade-, Abfrage- oder Kopierjob auszuführen, der eine Partition überschreibt. Wenn Sie den Streaming-Zwischenspeicher entfernen möchten, prüfen Sie, ob er leer ist. Dazu rufen Sie tables.get für die Partition auf.

Partitionierung nach Aufnahmezeit

Wenn Sie in eine nach Aufnahmezeit partitionierte Tabelle streamen, leitet BigQuery die Zielpartition aus der aktuellen UTC-Zeit ab.

Neu eintreffende Daten werden vorübergehend im Streaming-Zwischenspeicher der Partition __UNPARTITIONED__ abgelegt. Wenn genügend nicht partitionierte Daten vorhanden sind, partitioniert BigQuery die Daten in die richtige Partition. Es gibt jedoch keine SLA dafür, wie lange es dauert, bis Daten aus der Partition __UNPARTITIONED__ verschoben werden. Bei einer Abfrage können Daten im Streaming-Zwischenspeicher ausgenommen werden. Dazu werden die NULL-Werte aus der Partition __UNPARTITIONED__ mit einer der Pseudospalten (je nach bevorzugtem Datentyp _PARTITIONTIME oder _PARTITIONDATE) herausgefiltert.

Wenn Sie Daten in eine täglich partitionierte Tabelle streamen, können Sie die Datumsinferenz überschreiben. Geben Sie dazu einen Partition-Decorator als Teil der insertAll-Anfrage an. Füge den Decorator in den Parameter tableId ein. Beispielsweise können Sie mit dem Partition-Decorator in die Partition für den 1. März 2021 der Tabelle table1 streamen:

table1$20210301

Mit einem Partition-Decorator können Sie in Partitionen mit einem Datum innerhalb der letzten 31 Tage in der Vergangenheit bzw. 16 Tage in der Zukunft streamen, abhängig vom aktuellen Datum und basierend auf der aktuellen UTC-Zeit. Wenn Sie in Partitionen mit Datumsangaben schreiben möchten, die außerhalb der zulässigen Grenzen liegen, verwenden Sie stattdessen einen Lade- oder Abfragejob, wie in Daten in partitionierten Tabellen anfügen oder überschreiben beschrieben.

Das Streaming mit einem Partitions-Decorator wird nur für täglich partitionierte Tabellen unterstützt. Für stündlich, monatlich oder jährlich partitionierte Tabellen wird es nicht unterstützt.

Zum Testen können Sie den Befehlszeilenbefehl bq insert des bq-Befehlszeilentools verwenden. Mit dem folgenden Befehl wird beispielsweise eine einzelne Zeile in eine Partition für das Datum 1. Januar 2017 ($20170101) in eine partitionierte Tabelle mit dem Namen mydataset.mytable gestreamt:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Spaltenpartitionierung nach Zeiteinheit

Sie können Daten in eine Tabelle streamen, die in einer DATE-, DATETIME oder TIMESTAMP-Spalte partitioniert ist und zwischen fünf Jahren in der Vergangenheit und einem Jahr in der Zukunft liegt. Daten außerhalb dieses Bereichs werden abgelehnt.

Wenn die Daten gestreamt werden, werden sie anfangs in der Partition __UNPARTITIONED__ abgelegt. Wenn genügend nicht partitionierte Daten vorhanden sind, partitioniert BigQuery die Daten automatisch neu und platziert sie in der entsprechenden Partition. Es gibt jedoch kein SLA dafür, wie lange es dauert, bis Daten aus der Partition __UNPARTITIONED__ verschoben werden.

  • Hinweis: Tägliche Partitionen werden anders als stündliche, monatliche und jährliche Partitionen verarbeitet. Nur Daten außerhalb des Datumsbereichs (letzte 7 Tage bis zukünftige 3 Tage) werden in die Partition UNPARTITIONED extrahiert, bis sie neu partitioniert sind. Bei einer stündlich partitionierten Tabelle werden die Daten immer in die Partition UNPARTITIONED extrahiert und später neu partitioniert.

Tabellen automatisch mithilfe von Vorlagentabellen erstellen

Vorlagentabellen bieten einen Mechanismus zum Aufteilen einer logischen Tabelle in viele kleinere Tabellen, um kleinere Datasets zu erstellen (z. B. nach Nutzer-ID). Für Vorlagentabellen gelten einige der unten beschriebenen Einschränkungen. Stattdessen wird dieses Verhalten mit partitionierten Tabellen und geclusterten Tabellen erreicht.

Zum Verwenden einer Vorlagentabelle über die BigQuery API fügen Sie zu Ihrer Anfrage insertAll den Parameter templateSuffix hinzu. Im bq-Befehlszeilentool muss dem Befehl insert das Flag template_suffix hinzugefügt werden. Findet BigQuery den Parameter templateSuffix oder das Flag template_suffix, wird die ausgewählte Tabelle als Basisvorlage behandelt. Es wird eine neue Tabelle erstellt, die das gleiche Schema wie die ausgewählte Tabelle hat und deren Name das angegebene Suffix enthält:

<targeted_table_name> + <templateSuffix>

Durch die Verwendung einer Vorlagentabelle kann der Overhead vermieden werden, der dabei entsteht, wenn jede Tabelle einzeln erstellt wird und für jede Tabelle das Schema angegeben werden muss. Damit BigQuery die neuen Tabellen für Sie erstellen kann, brauchen Sie nur eine einzige Vorlage zu erstellen und verschiedene Suffixe anzugeben. BigQuery speichert die Tabellen im selben Projekt und Dataset.

Tabellen, die mithilfe von Vorlagentabellen erstellt werden, stehen normalerweise innerhalb weniger Sekunden zur Verfügung. In seltenen Fällen kann es länger dauern, bis sie verfügbar sind.

Das Schema der Vorlagentabelle ändern

Wird das Schema einer Vorlagentabelle geändert, verwenden alle in Folge generierten Tabellen das aktualisierte Schema. Zuvor generierte Tabellen werden davon nur dann beeinflusst, wenn die vorhandene Tabelle über einen Streaming-Zwischenspeicher verfügt.

Wird das Vorlagentabellenschema in vorhandenen Tabellen, die einen Streaming-Zwischenspeicher haben, rückwärtskompatibel geändert, dann wird das Schema dieser aktiv gestreamten, generierten Tabellen ebenfalls aktualisiert. Wird das Vorlagentabellenschema jedoch auf nicht rückwärtskompatible Weise geändert, gehen alle zwischengespeicherten Daten verloren, die das alte Schema verwenden. Außerdem können Sie keine neuen Daten in vorhandene generierte Tabellen streamen, die das alte, nun jedoch nicht mehr kompatible Schema verwenden.

Warten Sie nach dem Ändern eines Vorlagentabellenschemas, bis die Änderungen wirksam sind, bevor Sie versuchen, neue Daten einzufügen oder eine Abfrage für die generierten Tabellen auszuführen. Anfragen zum Einfügen neuer Felder sollten innerhalb weniger Minuten möglich sein. Für Abfragen auf die neuen Felder könnte eine längere Wartezeit (bis zu 90 Minuten) erforderlich sein.

Wenn Sie das Schema einer generierten Tabelle ändern möchten, sollten Sie dies erst dann tun, wenn das Streaming über die Vorlagentabelle beendet wurde und die Antwort tables.get() den Streaming-Statistik-Abschnitt der generierten Tabelle nicht mehr enthält, also keine Daten mehr in der Tabelle zwischengespeichert werden.

Partitionierte Tabellen und geclusterte Tabellen unterliegen nicht den vorherigen Einschränkungen und sind der empfohlene Mechanismus.

Vorlagentabellendetails

Wert des Vorlagensuffixes
Der Wert für templateSuffix (oder --template_suffix) darf ausschließlich Buchstaben (a–z, A–Z), Zahlen (0–9) oder Unterstriche (_) enthalten. Die maximale gemeinsame Länge von Tabellenname und Tabellensuffix beträgt 1024 Zeichen.
Kontingent

Vorlagentabellen unterliegen den Einschränkungen für Streamingkontingente. Ihr Projekt kann mit Vorlagentabellen bis zu 10 Tabellen pro Sekunde erstellen, ähnlich wie die tables.insert API. Dieses Kontingent gilt nur für erstellte Tabellen, nicht für geänderte Tabellen.

Wenn Ihre Anwendung mehr als 10 Tabellen pro Sekunde erstellen muss, empfehlen wir die Verwendung von geclusterten Tabellen. Beispiel: Sie können die Tabellen-ID mit hoher Kardinalität in die Schlüsselspalte einer einzelnen Clustering-Tabelle einfügen.

Gültigkeitsdauer

Die generierte Tabelle übernimmt die Ablaufzeit aus dem Dataset. Wie gewöhnliche Streamingdaten können auch generierte Tabellen nicht sofort kopiert werden.

Duplikate entfernen

Das Entfernen von Duplikaten findet nur bei einheitlichen Referenzen auf eine Zieltabelle statt. Wenn Sie beispielsweise gleichzeitig sowohl mithilfe von Vorlagentabellen als auch mit einem regulären insertAll-Befehl in eine generierte Tabelle streamen, wird keine Deduplizierung zwischen den Zeilen durchgeführt, die von Vorlagentabellen und über einen regulären insertAll-Befehl eingefügt wurden.

Aufrufe

Bei der Vorlagentabelle und den generierten Tabellen sollte es sich nicht um Ansichten handeln.

Fehler bei Streaming-Insert-Anweisungen beheben

In den folgenden Abschnitten wird die Fehlerbehebung beim Streamen von Daten in BigQuery mit der Legacy-Streaming-API erörtert. Weitere Informationen zum Beheben von Kontingentfehlern für Streaming-Insert-Anweisungen finden Sie unter Kontingentfehler für Streaming-Insert-Anweisungen.

HTTP-Fehlerantwortcodes

Wenn Sie einen HTTP-Fehlerantwortcode erhalten, z. B. bezüglich eines Netzwerkfehlers, lässt sich nicht feststellen, ob die Streaming-Insert-Anweisung erfolgreich war. Wenn Sie die Anfrage noch einmal senden, könnte eine Tabellenzeile doppelt auftreten. Um eine Duplizierung in Ihrer Tabelle zu verhindern, legen Sie beim Senden der Anfrage das Attribut insertId fest. BigQuery verwendet das Attribut insertId zur Deduplizierung.

Wenn Sie einen Berechtigungsfehler oder einen Fehler bezüglich eines ungültigen Tabellennamens oder eines überschrittenen Kontingents erhalten, werden keine Zeilen eingefügt und die gesamte Anfrage schlägt fehl.

HTTP-Erfolgsantwortcodes

Selbst wenn Sie einen HTTP-Erfolgsantwortcode erhalten, müssen Sie anhand des Attributs insertErrors der Antwort prüfen, ob die Zeileneinfügungen durch BigQuery erfolgreich waren. Es kann nämlich sein, dass sie nur teilweise gelungen sind. Es kann eine der folgenden Szenarien geben:

  • Alle Zeilen wurden erfolgreich eingefügt. Wenn das Attribut insertErrors eine leere Liste ist, wurden alle Zeilen erfolgreich eingefügt.
  • Einige Zeilen wurden erfolgreich eingefügt. Außer in Fällen, in denen in einer der Zeilen ein Schema nicht übereinstimmt, werden die im Attribut insertErrors angegebenen Zeilen nicht eingefügt und alle anderen Zeilen wurden erfolgreich eingefügt. Das Attribut errors enthält detaillierte Informationen darüber, warum die entsprechenden Zeilen nicht eingefügt werden konnten. Das Attribut index gibt den 0-basierten Zeilenindex der Anfrage an, auf die sich der Fehler bezieht.
  • Keine der Zeilen wurde erfolgreich eingefügt. Wenn BigQuery bei einzelnen Zeilen der Anfrage auf einen Schemakonflikt stößt, wird keine der Zeilen eingefügt und für jede Zeile wird ein insertErrors-Eintrag zurückgegeben, selbst für Zeilen ohne Schemakonflikt. Für Zeilen ohne Schemakonflikt wird ein Fehler angegeben, bei dem das Attribut reason auf stopped gesetzt ist. Diese Zeilen können unverändert noch einmal gesendet werden. Fehlgeschlagene Zeilen enthalten detaillierte Informationen zum Schemakonflikt. Weitere Informationen zu den unterstützten Protokollpuffertypen für jeden BigQuery-Datentyp finden Sie unter Datentypkonvertierungen.

Metadatenfehler bei Streaming-Insert-Anweisungen

Da die Streaming-API von BigQuery auf hohe Einfügungsraten ausgelegt ist, sind Änderungen an den zugrunde liegenden Tabellenmetadaten beim Interagieren mit dem Streamingsystem letztendlich konsistent. In den meisten Fällen werden Metadatenänderungen innerhalb von Minuten wirksam. In diesem Zeitraum kann es jedoch vorkommen, dass API-Antworten den uneinheitlichen Zustand der Tabelle widerspiegeln.

Hier einige Szenarien:

  • Schemaänderungen Das Ändern des Schemas einer Tabelle, die kürzlich Streaming-Insert-Anweisungen erhalten hat, kann zu Schemakonfliktfehlern führen, da das Streamingsystem die Schemaänderung unter Umständen nicht sofort übernimmt.
  • Erstellen/Löschen einer Tabelle Beim Streamen an eine nicht vorhandene Tabelle wird eine Variante der notFound-Antwort zurückgegeben. Wenn Sie daraufhin die Tabelle erstellen, wird diese von nachfolgenden Streaming-Insert-Anweisungen unter Umständen nicht sofort erkannt. Ebenso kann das Löschen oder erneute Erstellen einer Tabelle dazu führen, dass für einen bestimmten Zeitraum Streaming-Insert-Anweisungen an die alte Tabelle gesendet werden und in der neu erstellten Tabelle nicht vorhanden sind.
  • Tabellenkürzung: Das Verkürzen der Daten einer Tabelle (z. B. mithilfe eines Abfragejobs, der für das Attribut "writeDisposition" den Wert "WRITE_TRUNCATE" verwendet) kann in gleicher Weise dazu führen, dass nachfolgende Insert-Anweisungen während des Konsistenzzeitraums entfernt werden.

Fehlende bzw. nicht verfügbare Daten

Streaming-Insert-Anweisungen werden vorübergehend im schreiboptimierten Speicher abgelegt, der andere Verfügbarkeitsmerkmale als ein verwalteter Speicher aufweist. Bestimmte Vorgänge in BigQuery interagieren nicht mit dem schreiboptimierten Speicher, z. B. Tabellenkopierjobs und API-Methoden wie tabledata.list. Aktuelle Streamingdaten sind nicht in der Zieltabelle oder -ausgabe enthalten.