Daten in BigQuery streamen

Wenn Sie Daten in BigQuery laden möchten, benötigen Sie dazu nicht unbedingt einen eigenen Job. Sie können Ihre Daten auch mit der Methode tabledata.insertAll datensatzweise in BigQuery streamen. Bei diesem Ansatz können Sie ohne die Verzögerung, die durch die Ausführung eines Ladejobs entsteht, Daten abfragen. In diesem Dokument werden mehrere wichtige Alternativen behandelt, die vor der Auswahl eines Ansatzes berücksichtigt werden sollten, darunter Streamingkontingente, Datenverfügbarkeit und Datenkonsistenz.

Vorbereitung

  1. Sorgen Sie dafür, dass Sie Schreibzugriff auf das Dataset haben, das die Zieltabelle enthält. Die Tabelle muss bereits vorhanden sein, bevor Sie damit beginnen, Daten hineinzuschreiben. Die einzige Ausnahme zu dieser Vorgehensweise sind Vorlagentabellen. Weitere Informationen zu Vorlagentabellen finden Sie unter Tabellen mithilfe von Vorlagentabellen automatisch erstellen.

  2. Prüfen Sie die Kontingentrichtlinie für das Streaming von Daten.

  3. Prüfen Sie, ob die Abrechnung für Ihr Google Cloud Platform-Projekt aktiviert ist.

    Informationen zum Aktivieren der Abrechnung

    Das Streaming ist in der kostenlose Stufe nicht verfügbar. Wenn Sie versuchen, das Streaming ohne aktivierte Abrechnung auszuführen, wird folgende Fehlermeldung angezeigt: BigQuery: Streaming insert is not allowed in the free tier.

Datenverfügbarkeit prüfen

Gestreamte Daten stehen innerhalb weniger Sekunden nach dem Streamen der ersten Insert-Anweisung für Echtzeitanalysen zur Verfügung. In seltenen Fällen (z. B. bei einem Ausfall) sind Daten möglicherweise im Streaming-Zwischenspeicher vorübergehend nicht verfügbar. Wenn nicht alle Daten verfügbar sind, werden Abfragen zwar weiterhin erfolgreich ausgeführt, jedoch werden einige der Daten übersprungen, die sich noch im Streaming-Zwischenspeicher befinden. Diese Abfragen enthalten im Feld errors von bigquery.jobs.getQueryResults, in der Antwort auf bigquery.jobs.query oder im Feld status.errors von bigquery.jobs.get eine Warnung.

Bei Kopier- und Exportvorgängen kann es bis zu 90 Minuten dauern, bis die Daten verfügbar sind. Beim Streaming in eine partitionierte Tabelle kommt hinzu, dass die Daten im Streaming-Zwischenspeicher für die Pseudospalte _PARTITIONTIME den Wert NULL haben. Um herauszufinden, ob Daten zum Kopieren und Exportieren zur Verfügung stehen, sehen Sie in der tables.get-Antwort nach, ob sie einen Abschnitt namens streamingBuffer enthält. Fehlt der Abschnitt, sollten Ihre Daten zum Kopieren und Exportieren verfügbar sein und in der Pseudospalte _PARTITIONTIME einen Nicht-Null-Wert haben. Darüber hinaus kann das Feld streamingBuffer.oldestEntryTime verwendet werden, um das Alter von Datensätzen im Streaming-Zwischenspeicher zu ermitteln.

Datenkonsistenz sicherstellen

Um für Datenkonsistenz zu sorgen, können Sie für jede eingefügte Zeile die insertId angeben. BigQuery merkt sich diese ID mindestens eine Minute lang. Wenn Sie innerhalb dieses Zeitraums versuchen, denselben Satz Zeilen noch einmal zu streamen und das Attribut insertId definiert ist, verwendet BigQuery das Attribut insertId, um Duplikate auf einer Best-Effort-Basis aus Ihren Daten zu entfernen.

Unter Umständen müssen Sie eine Insert-Anweisung wiederholen, da bei bestimmten Fehlerbedingungen wie Netzwerkfehlern zwischen Ihrem System und BigQuery oder internen Fehlern in BigQuery keine Möglichkeit zum Ermitteln des Zustands einer Streaming-Insert-Anweisung besteht. Verwenden Sie beim Wiederholen einer Insert-Anweisung dieselbe insertId für denselben Satz Zeilen, damit BigQuery versuchen kann, Duplikate aus Ihren Daten zu entfernen. Weitere Informationen finden Sie unter Fehlerbehebung bei Streaming-Insert-Anweisungen.

In den seltenen Fällen, in denen es zu einer unerwarteten Unterbrechung der Verbindung zu einem Rechenzentrum von Google kommt, ist ein automatisches Entfernen von Duplikaten ggf. nicht möglich.

Wenn Sie höhere Anforderungen an Ihre Daten haben, steht mit Google Cloud Datastore ein alternativer Dienst zur Verfügung, der Transaktionen unterstützt.

Daten zonenübergreifend streamen

Daten können sowohl in Datasets in den USA als auch in der EU gestreamt werden. Während BigQuery die insertAll-Anfrage verarbeitet, können die Daten über Rechner bewegt werden, die sich nicht innerhalb der Zone des Datasets befinden. Werden Daten von einer Zone außerhalb der Zone des Datasets gestreamt, könnte dies zu erhöhter Latenz und höheren Fehlerquoten führen.

In nach Aufnahmezeit partitionierte Tabellen streamen

Mit insertAll-Anfragen können Sie einzelne Zeilen in eine partitionierte Tabelle streamen. Die Zielpartition für eingefügte Daten wird standardmäßig vom aktuellen Datum basierend auf der UTC-Zeit abgeleitet.

Wenn Sie Daten in eine partitionierte Tabelle für die Aufnahmezeit streamen, können Sie die Datumsinferenz überschreiben. Geben Sie dazu einen Partition-Decorator als Teil der insertAll-Anfrage an. Beispielsweise können Sie mit dem Partition-Decorator in die Partition für den 1. März 2017 der Tabelle mydataset.table streamen:

mydataset.table$20170301

Neu eintreffende Daten werden im Streaming-Zwischenspeicher vorübergehend der Partition UNPARTITIONED zugeordnet. Bei einer Abfrage können daher Daten im Streaming-Zwischenspeicher ausgenommen werden, indem die NULL-Werte aus der Partition UNPARTITIONED mit einer der Pseudospalten (je nach bevorzugtem Datentyp [_PARTITIONTIME] oder [_PARTITIONDATE]) herausgefiltert werden.

Mit einem Partition-Decorator können Sie in Partitionen mit einem Datum innerhalb der letzten 31 Tage in der Vergangenheit bzw. 16 Tage in der Zukunft streamen, abhängig vom aktuellen Datum und basierend auf der aktuellen UTC-Zeit. Zum Schreiben in Partitionen für Datumsangaben außerhalb dieser zulässigen Grenzen können Sie Lade- oder Abfragejobs verwenden, wie in Daten in partitionierten Tabellen anfügen oder überschreiben beschrieben.

Streaming in partitionierte Tabellen

Sie können Daten in eine Tabelle streamen, die in einer DATE- oder TIMESTAMP-Spalte partitioniert ist und zwischen einem Jahr in der Vergangenheit und sechs Monaten in der Zukunft liegt. Daten außerhalb dieses Bereichs werden abgelehnt.

Wenn die Daten gestreamt werden, werden die Daten zwischen sieben Tagen in der Vergangenheit und drei Tagen in der Zukunft in den Streamingpuffer gestellt und dann in die entsprechenden Partitionen extrahiert. Daten außerhalb dieses Fensters (aber innerhalb eines Zeitraums von einem Jahr/sechs Monaten) werden in den Streamingpuffer gestellt und dann in die UNPARTITIONED-Partition extrahiert. Wenn genügend nicht partitionierte Daten vorhanden sind, werden sie in die entsprechenden Partitionen geladen.

Tabellen automatisch mithilfe von Vorlagentabellen erstellen

Beim Streaming von Daten in BigQuery werden logische Tabellen vom Nutzer häufig in viele kleinere Tabellen aufgeteilt, um kleinere Datasets (z. B. nach Nutzer-ID) zu erstellen. Wenn Sie kleinere Datasets nach Datum erstellen möchten, verwenden Sie partitionierte Tabellen. Wenn Sie kleinere Tabellen unabhängig vom Datum erstellen möchten, verwenden Sie Vorlagentabellen. Die Tabellen werden dann von BigQuery entsprechend erstellt.

Zum Verwenden einer Vorlagentabelle über die BigQuery API fügen Sie zu Ihrer Anfrage insertAll den Parameter templateSuffix hinzu. Im bq-Befehlszeilentool muss dem Befehl insert das Flag template_suffix hinzugefügt werden. Findet BigQuery den Parameter templateSuffix oder das Flag template_suffix, wird die ausgewählte Tabelle als Basistabelle behandelt und eine neue Tabelle erstellt, die das gleiche Schema wie die ausgewählte Tabelle hat und deren Name das angegebene Suffix enthält:

<targeted_table_name> + <templateSuffix>

Durch die Verwendung einer Vorlagentabelle kann der Overhead vermieden werden, der dabei entsteht, wenn jede Tabelle einzeln erstellt wird und für jede Tabelle das Schema angegeben werden muss. Damit BigQuery die neuen Tabellen für Sie erstellen kann, brauchen Sie nur eine einzige Vorlage zu erstellen und verschiedene Suffixe anzugeben. BigQuery speichert die Tabellen im selben Projekt und Dataset. Auch können die Schemas mithilfe von Vorlagen einfacher aktualisiert werden, da nur die Vorlagentabelle aktualisiert werden muss.

Tabellen, die mithilfe von Vorlagentabellen erstellt werden, stehen normalerweise innerhalb weniger Sekunden zur Verfügung. In seltenen Fällen kann es länger dauern, bis sie verfügbar sind.

Das Schema der Vorlagentabelle ändern

Wird das Schema einer Vorlagentabelle geändert, verwenden alle in Folge generierten Tabellen das aktualisierte Schema. Zuvor generierte Tabellen werden davon nur dann beeinflusst, wenn die vorhandene Tabelle über einen Streaming-Zwischenspeicher verfügt.

Wird das Vorlagentabellenschema in vorhandenen Tabellen, die einen Streaming-Zwischenspeicher haben, rückwärtskompatibel geändert, dann wird das Schema dieser aktiv gestreamten, generierten Tabellen ebenfalls aktualisiert. Wird das Vorlagentabellenschema jedoch auf nicht rückwärtskompatible Weise geändert, gehen alle zwischengespeicherten Daten verloren, die das alte Schema verwenden. Auch können neue Daten nicht in vorhandene generierte Tabellen gestreamt werden, die das alte, nun jedoch nicht mehr kompatible Schema verwenden.

Warten Sie nach dem Ändern eines Vorlagentabellenschemas, bis die Änderungen wirksam sind, bevor Sie versuchen, neue Daten einzufügen oder eine Abfrage für die generierten Tabellen auszuführen. Anfragen zum Einfügen neuer Felder sollten innerhalb weniger Minuten möglich sein. Für Abfragen auf die neuen Felder könnte eine längere Wartezeit (bis zu 90 Minuten) erforderlich sein.

Wenn Sie das Schema einer generierten Tabelle ändern möchten, sollten Sie dies erst dann tun, wenn das Streaming über die Vorlagentabelle beendet wurde und die Antwort tables.get() den Streaming-Statistik-Abschnitt der generierten Tabelle nicht mehr enthält, also keine Daten mehr in der Tabelle zwischengespeichert werden.

Vorlagentabellendetails

Wert des Vorlagensuffixes
Der Wert für templateSuffix (oder --template_suffix) darf ausschließlich Buchstaben (a-z, A-Z), Zahlen (0-9) oder Unterstriche (_) enthalten. Die maximale gemeinsame Länge von Tabellenname und Tabellensuffix beträgt 1024 Zeichen.
Kontingent
Für alle Tabellen gelten die gleichen Kontingente. Dabei spielt es keine Rolle, ob sie auf Vorlagen basieren oder manuell erstellt werden.
Gültigkeitsdauer
Die generierte Tabelle übernimmt die Ablaufzeit aus dem Dataset. Wie gewöhnliche Streamingdaten können auch generierte Tabellen nicht sofort kopiert oder exportiert werden.
Entfernen von Duplikaten
Das Entfernen von Duplikaten findet nur bei einheitlichen Referenzen auf eine Zieltabelle statt. Wenn Sie beispielsweise gleichzeitig sowohl mithilfe von Vorlagentabellen als auch mit einem regulären insertAll-Befehl in eine generierte Tabelle streamen, werden keine Duplikate zwischen den Zeilen, die von Vorlagentabellen und über einen regulären insertAll-Befehl eingefügt wurden, entfernt.
Ansichten
Bei der Vorlagentabelle und den generierten Tabellen sollte es sich nicht um Ansichten handeln.

Beispielanwendungsfälle

Protokollierung von Ereignissen mit hohem Volumen

Wenn Sie eine Anwendung nutzen, die große Mengen an Daten in Echtzeit sammelt, können Streaming-Insert-Anweisungen eine gute Wahl sein. Anwendungen dieser Art entsprechen normalerweise den folgenden Kriterien:

  • Nicht transaktional: Hohes Volumen, ständiges Anhängen von Zeilen. Die Anwendung kann es tolerieren, dass es in seltenen Fällen zu einer Duplikatbildung kommt oder dass Daten temporär nicht zur Verfügung stehen.
  • Aggregatanalyse: Abfragen werden normalerweise für Trendanalysen ausgeführt. Es findet keine Auswahl einzelner Datensätze oder kleiner Datenmengen statt.

Ein Beispiel für das Logging von Ereignissen mit hohem Volumen ist das Ereignis-Tracking. Angenommen, Sie haben eine mobile App, die Ereignisse verfolgt. Ihre App – oder mobilen Server – könnten Benutzerinteraktionen oder Systemfehler unabhängig aufzeichnen und an BigQuery streamen. Sie könnten diese Daten analysieren, um Gesamttrends wie Bereiche mit hoher Interaktion oder Probleme zu ermitteln und Fehlerbedingungen in Echtzeit zu überwachen.

Duplikate manuell entfernen

Mithilfe des folgenden manuellen Prozesses können Sie nach dem Abschluss des Streamings sicherstellen, dass keine Duplikatzeilen vorliegen.

  1. Fügen Sie Ihrem Tabellenschema die insertId als Spalte hinzu und nehmen Sie den insertId-Wert für jede Zeile in Ihre Daten auf.
  2. Führen Sie nach dem Abschluss des Streamings die folgende Abfrage durch, um nach Duplikaten zu suchen:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Ist das Ergebnis größer als 1, existieren Duplikate.
  3. Führen Sie die folgende Abfrage durch, um Duplikate zu entfernen. Sie sollten eine Zieltabelle angeben, große Ergebnisse zulassen und die Ergebnisvereinfachung deaktivieren.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Hinweise zur Abfrage zum Entfernen von Duplikaten:

  • Die sicherere Strategie für die Abfrage zum Entfernen von Duplikaten ist die Anwendung auf eine neue Tabelle. Alternativ kann sie mit der Schreibanordnung WRITE_TRUNCATE auf die Zieltabelle angewendet werden.
  • Die Abfrage zum Entfernen von Duplikaten fügt am Ende des Tabellenschemas eine Spalte row_number mit dem Wert 1 ein. In der Abfrage wird mit der Anweisung SELECT * EXCEPT von Standard-SQL die Spalte row_number aus der Zieltabelle ausgeschlossen. Mit dem Präfix #standardSQL wird der Standard-SQL-Dialekt für diese Abfrage aktiviert. Alternativ ist eine Auswahl nach Spaltennamen möglich, um diese Spalte auszulassen.
  • Zum Abfragen von Livedaten mit entfernten Duplikaten können Sie mithilfe der Abfrage zum Entfernen von Duplikaten auch eine Ansicht Ihrer Tabelle erstellen. Beachten Sie, dass die Abfragekosten für die Ansicht basierend auf den in der Ansicht ausgewählten Spalten berechnet werden. Dies kann zu hohen Werten für gescannte Byte führen.

Fehlerbehebung bei Streaming-Insert-Anweisungen

Weitere Informationen zum Beheben von Fehlern beim Streamen von Insert-Anweisungen finden Sie auf der Seite "Fehlerbehebung" im Bereich Fehlerbehebung bei Streaming-Insert-Anweisungen.

Nach oben

Beispiele für Streaming-Insert-Anweisungen

C#

Lesen Sie die Anleitung zur Einrichtung von C# in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery C# API.

using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Folgen Sie der Anleitung zur Einrichtung von Go in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Go API.

// To run this sample, you will need to create (or reuse) a context and
// an instance of the bigquery client.  For example:
// import "cloud.google.com/go/bigquery"
// ctx := context.Background()
// client, err := bigquery.NewClient(ctx, "your-project-id")
u := client.Dataset(datasetID).Table(tableID).Uploader()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "Phred Phlyntstone", Age: 32},
	{Name: "Wylma Phlyntstone", Age: 29},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

Folgen Sie der Anleitung zur Einrichtung von Java in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Java API.

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response =
    bigquery.insertAll(
        InsertAllRequest.newBuilder(tableId)
            .addRow("rowId", rowContent)
            // More rows can be added in the same RPC by invoking .addRow() on the builder
            .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

Lesen Sie die Anleitung zur Einrichtung von Node.js in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Node.js API.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

Lesen Sie die Anleitung zur Einrichtung von PHP in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery PHP API.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

Folgen Sie der Anleitung zur Einrichtung von Python in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Python API.

# TODO(developer): Uncomment the lines below and replace with your values.
# from google.cloud import bigquery
# client = bigquery.Client()
# dataset_id = 'my_dataset'  # replace with your dataset ID
# For this sample, the table must already exist and have a defined schema
# table_id = 'my_table'  # replace with your table ID
# table_ref = client.dataset(dataset_id).table(table_id)
# table = client.get_table(table_ref)  # API request

rows_to_insert = [
    (u'Phred Phlyntstone', 32),
    (u'Wylma Phlyntstone', 29),
]

errors = client.insert_rows(table, rows_to_insert)  # API request

assert errors == []

Ruby

Lesen Sie die Anleitung zur Einrichtung von Ruby in der BigQuery-Schnellstart-Anleitung zur Verwendung von Clientbibliotheken, bevor Sie dieses Beispiel ausprobieren. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Ruby API.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Nach oben

Hat Ihnen diese Seite weitergeholfen? Teilen Sie uns Ihr Feedback mit:

Feedback geben zu...