Daten in BigQuery streamen

Wenn Sie Daten in BigQuery laden möchten, können Sie anstelle eines Jobs auch entscheiden, Ihre Daten mithilfe der Methode tabledata().insertAll() datensatzweise in BigQuery zu streamen. Bei diesem Ansatz können Sie ohne die Verzögerung, die durch die Ausführung eines Lastjobs entsteht, eine Abfrage auf die Daten ausführen. In diesem Dokument werden mehrere wichtige Alternativen behandelt, die vor der Auswahl eines Ansatzes berücksichtigt werden sollten, darunter Streamingquoten, Datenverfügbarkeit und Datenkonsistenz.

Vorbereitung

  1. Sorgen Sie dafür, dass Sie Schreibzugriff auf das Dataset haben, das die Zieltabelle enthält. Die Tabelle muss bereits vorhanden sein, bevor Sie damit beginnen, Daten hineinzuschreiben. Die einzige Ausnahme zu dieser Vorgehensweise sind Vorlagentabellen. Weitere Informationen zu Vorlagentabellen finden Sie unter Tabellen mithilfe von Vorlagentabellen automatisch erstellen.

  2. Prüfen Sie die Kontingentrichtlinie für das Streamen von Daten.

Datenverfügbarkeit prüfen

Gestreamte Daten stehen innerhalb weniger Sekunden nach dem Streamen der ersten Insert-Anweisung für Echtzeitanalysen zur Verfügung.

Bei Kopier- und Export-Vorgängen kann es bis zu 90 Minuten dauern, bis die Daten verfügbar sind. Beim Streamen in eine partitionierte Tabelle kommt hinzu, dass die Daten im Streaming-Zwischenspeicher für die Pseudospalte _PARTITIONTIME den Wert NULL besitzen. Um herauszufinden, ob Daten zum Kopieren und Exportieren zur Verfügung stehen, sehen Sie in der tables.get-Antwort nach, ob sie einen Abschnitt namens streamingBuffer enthält. Fehlt der Abschnitt, sollten Ihre Daten zum Kopieren und Exportieren verfügbar sein und in der Pseudospalte _PARTITIONTIME über einen Nicht-Null-Wert verfügen.

Datenkonsistenz sicherstellen

Zum Sicherstellen der Datenkonsistenz können Sie für jede eingefügte Zeile insertId angeben. BigQuery merkt sich diese ID mindestens eine Minute lang. Wenn Sie innerhalb dieses Zeitraums versuchen, denselben Satz Zeilen noch einmal zu streamen und die insertId-Property definiert ist, verwendet BigQuery die insertId-Eigenschaft um Duplikate auf einer Best-Effort-Basis aus Ihren Daten zu entfernen.

Unter Umständen müssen Sie eine Insert-Anweisung wiederholen, da bei bestimmten Fehlerbedingungen wie Netzwerkfehlern zwischen Ihrem System und BigQuery oder internen Fehlern in BigQuery keine Möglichkeit zum Ermitteln des Zustands eines Streaming-Insert besteht. Verwenden Sie beim Wiederholen eines Insert dieselbe insertID für denselben Satz Zeilen, damit BigQuery versuchen kann, Duplikate aus Ihren Daten zu entfernen. Weitere Informationen finden Sie unter Fehlerbehebung bei Streaming-Insert-Anweisungen.

In den seltenen Fällen, in denen es zu einer unerwarteten Unterbrechung der Verbindung mit einem Google-Rechenzentrum kommt, ist ein automatisches Entfernen von Duplikaten ggf. nicht möglich.

Sollten Sie höhere Anforderungen in Bezug auf Ihre Daten haben, ist Google Cloud Datastore ein alternativer Service, der Transaktionen unterstützt.

Daten zonenübergreifend streamen

Daten können sowohl in Datasets in den USA als auch in der EU gestreamt werden. Während BigQuery die insertAll-Anfrage verarbeitet, können die Daten über Rechner bewegt werden, die sich nicht innerhalb der Zone des Datasets befinden. Werden Daten von einer Zone außerhalb der Zone des Datasets gestreamt, könnte dies zu erhöhter Latenz und höheren Fehlerquoten führen.

Tabellen automatisch mithilfe von Vorlagentabellen erstellen

Ein häufiges Nutzverhalten des Streamings von Daten in BigQuery ist das Aufteilen einer logischen Tabelle in viele kleinere Tabellen; entweder zum Erstellen kleinerer Datasets (z. B. pro Datum oder Nutzer-ID) oder aus Skalierbarkeitsgründen (z. B. zum Streamen von mehr als dem aktuellen Grenzwert von 100.000 Zeilen pro Sekunde). Um eine Tabelle in viele kleinere Tabellen aufzuteilen, ohne dass dies zusätzlichen komplexen Code auf Client-Seite verursacht, können Sie die Vorlagentabellen-Funktion in BigQuery nutzen, damit die Anwendung die Tabellen für Sie erstellt.

Um eine Vorlagentabelle über die BigQuery-API zu verwenden, fügen Sie zu Ihrer insertAll-Anfrage einen templateSuffix-Parameter hinzu. Im bq-Befehlszeilentool muss dem insert-Befehl das template_suffix-Flag hinzugefügt werden. Findet BigQuery einen templateSuffix-Parameter oder das template_suffix-Flag, wird die ausgewählte Tabelle als Basistabelle behandelt und eine neue Tabelle erstellt, die über das gleiche Schema wie die ausgewählte Tabelle verfügt und deren Name das angegebene Suffix enthält:

<targeted_table_name> + <templateSuffix>

Durch die Verwendung einer Vorlagentabelle kann der Overhead vermieden werden, der dabei entsteht, wenn jede Tabelle einzeln erstellt wird und für jede Tabelle das Schema angegeben werden muss. Damit BigQuery die neuen Tabellen für Sie erstellen kann, brauchen Sie nur eine einzige Vorlage zu erstellen und können verschiedene Suffixe angeben. BigQuery speichert die Tabellen im selben Projekt und Dataset. Auch können die Schemas mithilfe von Vorlagen einfacher aktualisiert werden, da nur die Vorlagentabelle aktualisiert werden muss.

Tabellen, die mithilfe von Vorlagentabellen erstellt werden, stehen normalerweise innerhalb weniger Sekunden zur Verfügung. In seltenen Fällen kann es länger dauern, bis sie verfügbar sind.

Das Schema der Vorlagentabelle ändern

Wird das Schema einer Vorlagentabelle geändert, verwenden alle in Folge generierten Tabellen das aktualisierte Schema. Zuvor generierte Tabellen werden davon nur dann beeinflusst, wenn die vorhandene Tabelle noch über einen Streaming-Zwischenspeicher verfügt.

Wird das Vorlagentabellenschema in vorhandenen Tabellen, die noch über einen Streaming-Zwischenspeicher verfügen, rückwärtskompatibel geändert, dann wird das Schema dieser aktiv gestreamten generierten Tabellen ebenfalls aktualisiert. Wird das Vorlagentabellenschema jedoch auf nicht-rückwärtskompatible Weise geändert, gehen alle zwischengespeicherten Daten, die das alte Schema verwenden, verloren. Auch können neue Daten nicht in vorhandene generierte Tabellen gestreamt werden, die das alte, nun jedoch nicht mehr kompatible Schema verwenden.

Warten Sie nach dem Ändern eines Vorlagentabellenschemas, bis die Änderungen wirksam sind, bevor Sie versuchen, neue Daten einzufügen oder eine Abfrage für die generierten Tabellen auszuführen. Anfragen, neue Felder einzufügen, sollten innerhalb weniger Minuten möglich sein. Für das Durchführen von Abfragen auf die neuen Felder könnte eine längere Wartezeit (bis zu 90 Minuten) erforderlich sein.

Wenn Sie das Schema einer generierten Tabelle ändern möchten, sollten Sie dies erst dann tun, wenn das Streaming über die Vorlagentabelle beendet wurde und die tables.get()-Antwort den Streaming-Statistik-Abschnitt der generierten Tabelle nicht mehr enthält, also keine Daten mehr in der Tabelle zwischengespeichert werden.

Vorlagentabellendetails

Wert des Vorlagensuffixes
Der Wert für templateSuffix (oder --template_suffix) darf ausschließlich Buchstaben (a-z, A-Z), Zahlen (0-9) oder Unterstriche enthalten (_). Die maximale gemeinsame Länge von Tabellenname und Tabellensuffix beträgt 1024 Zeichen.
Kontingent
Für alle Tabellen gelten die gleichen Kontingente. Dabei spielt es keine Rolle, ob sie auf Vorlagen basieren oder manuell erstellt werden.
Time to live
Die generierte Tabelle erbt die Ablaufzeit aus dem Dataset. Wie gewöhnliche Streamingdaten können auch generierte Tabellen nicht sofort kopiert oder exportiert werden.
Entfernen von Duplikaten
Das Entfernen von Duplikaten findet nur bei einheitlichen Referenzen auf eine Zieltabelle statt. Wenn Sie beispielsweise gleichzeitig sowohl mithilfe von Vorlagentabellen als auch mit einem regulären insertAll-Befehl in eine generierte Tabelle streamen, werden keine Duplikate zwischen den Zeilen, die von Vorlagentabellen und über einen regulären insertAll-Befehl eingefügt wurden, entfernt.
Ansichten
Bei der Vorlagentabelle und den generierten Tabellen sollte es sich nicht um Ansichten handeln.

Beispielanwendungsfälle

Protokollierung von Ereignissen mit hohem Volumen

Wenn Sie eine App nutzen, die große Mengen an Daten in Echtzeit sammelt, können Streaming-Inserts eine gute Wahl sein. Diese Arten von Apps verfügen normalerweise über die folgenden Kriterien:

  • Nicht transaktional. Hohes Volumen, ständiges Anhängen von Zeilen. Die App kann die seltene Möglichkeit tolerieren, dass es zu einer Duplikatbildung kommt oder dass Daten temporär nicht zur Verfügung stehen.
  • Aggregatanalyse. Abfragen werden normalerweise auf Trendanalysen ausgeführt; es findet keine Auswahl einzelner Datensätze oder kleiner Datenmengen statt.

Ein Beispiel für die Protokollierung von Ereignissen mit hohem Volumen ist das Ereignis-Tracking. Angenommen, Sie haben eine mobile App, die Ereignisse verfolgt. Ihre App – oder mobile Server – könnten Benutzerinteraktionen oder Systemfehler unabhängig aufzeichnen und nach BigQuery streamen. Sie könnten diese Daten analysieren, um Gesamttrends wie Bereiche hoher Interaktion oder Probleme zu ermitteln und Fehlerbedingungen in Echtzeit zu überwachen.

Echtzeit-Dashboards und -Abfragen

In bestimmten Situationen ermöglicht das Streaming von Daten in BigQuery eine Echtzeitanalyse von Transaktionsdaten. Da das Streaming von Daten mit der Möglichkeit von Datenduplikaten einhergeht, sollten Sie sicherstellen, dass ein primärer Transaktionsdatenspeicher außerhalb von BigQuery existiert.

Sie können einige Vorkehrungen treffen, um sicherzustellen, dass Sie Transaktionsdaten analysieren können und Ihnen eine aktuelle Ansicht Ihrer Daten vorliegt:

  1. Erstellen Sie zwei Tabellen mit einem identischen Schema. Die erste Tabelle ist für die abgeglichenen Daten und die zweite für die nicht-abgeglichenen Echtzeitdaten.
  2. Pflegen Sie auf Client-Seite einen Transaktionsdatenspeicher für Datensätze.
  3. Stoßen Sie insertAll()-Anfragen für diese Datensätze an. Die insertAll()-Anfrage sollte die nicht-abgeglichene Echtzeittabelle als Zieltabelle angeben.
  4. Nach einem gewissen Intervall sollten die abgeglichenen Daten aus dem Transaktionsdatenspeicher angehängt und die nicht angeglichene Datentabelle gekürzt werden.
  5. Für Echtzeit-Dashboards und -Abfragen können Daten aus beiden Tabellen ausgewählt werden. Die nicht-abgeglichene Datentabelle kann Duplikate oder verworfene Datensätze enthalten.

Duplikate manuell entfernen

Mithilfe des folgenden manuellen Prozesses können Sie nach dem Abschluss des Streamings sicherstellen, dass keine Duplikatzeilen vorliegen.

  1. Fügen Sie die insertID als Spalte zu Ihrem Tabellenschema hinzu und nehmen Sie den Wert von insertID für jede Zeile in Ihre Daten auf.
  2. Führen Sie nach dem Abschluss des Streamings die folgende Abfrage durch, um nach Duplikaten zu suchen:

    SELECT max(count) FROM(
    SELECT <id_column>, count(*) as count
    FROM <table_name>
    GROUP BY <id_column>)

    Ist das Ergebnis größer als 1, existieren Duplikate.

  3. Führen Sie die folgende Abfrage durch, um Duplikate zu entfernen. Sie sollten eine Zieltabelle angeben, große Ergebnisse zulassen und die Ergebnisvereinfachung deaktivieren.

    #standardSQL
    SELECT * EXCEPT(row_number)
    FROM (
      SELECT
          *,
          ROW_NUMBER()
              OVER (PARTITION BY <id_column>)
              row_number
      FROM <table_name&gt;
    )
    WHERE row_number = 1

Hinweise zur Abfrage zum Entfernen von Duplikaten:

  • Die sicherere Strategie für die Abfrage zum Entfernen von Duplikaten ist die Anwendung auf eine neue Tabelle. Alternativ kann sie mit der WRITE_TRUNCATE-Schreibanordnung auf die Zieltabelle angewendet werden.
  • Die Abfrage zum Entfernen von Duplikaten fügt am Ende des Tabellenschemas eine row_number-Spalte mit dem Wert 1 hinzu. Die Abfrage verwendet eine SELECT * EXCEPT-Anweisung aus dem Standard SQL-Dialekt, um die row_number-Spalte aus der Zieltabelle auszuschließen. Das #standardSQL-Präfix aktiviert den Standard-SQL-Dialekt für diese Abfrage. Alternativ kann eine Auswahl nach Spaltennamen stattfinden, um diese Spalte wegzulassen.
  • Um eine Abfrage auf Live-Daten mit entfernten Duplikaten auszuführen, können Sie mithilfe der Abfrage zum Entfernen von Duplikaten auch eine Ansicht Ihrer Tabelle erstellen. Bitte beachten Sie, dass die Abfragekosten für die Ansicht basierend auf den in der Ansicht ausgewählten Spalten berechnet werden. Dies kann zu hohen Gescannte-Byte-Größen führen.

Mehr als 10.000 Zeilen pro Sekunde streamen

Eine einzelne Tabelle unterstützt nur das Streamen in den Raten, die im Abschnitt Kontingentrichtlinie aufgeführt sind. Wenn Sie mit einer höheren Rate streamen müssen, als von einer einzigen Tabelle unterstützt wird, können Sie Vorlagentabellen verwenden, um Ihre logische Zieltabelle zu fragmentieren, da jedes einzigartige Vorlagentabellensuffix das gesamte Tabellenstreamingkontingent unterstützen kann.

Muss Ihre logische Tabelle beispielsweise 250.000 Zeilen pro Sekunde empfangen, können Sie eine Basisvorlagentabelle erstellen und Ihre Zeilen-Inserts gleichmäßig auf die Vorlagensuffixe "1", "2" und "3" verteilen. Dies erhöht das effektive Kontingent der logischen Tabelle ungefähr auf das Dreifache des Grenzwerts der einzelnen Tabelle. Achten Sie darauf, in dasselbe Fragment zu schreiben, wenn Sie versuchen, eine Zeile zu schreiben.

Zurück nach oben

Fehlerbehebung bei Streaming-Insert-Anweisungen

Weitere Informationen zum Beheben von Fehlern beim Streamen von Insert-Anweisungen finden Sie auf der Seite "Fehlerbehebung" im Bereich Fehlerbehebung bei Streaming-Insert-Anweisungen.

Zurück nach oben

Beispiele für Streaming-Insert-Anweisungen

C#

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

public void UploadJson(string datasetId, string tableId, BigQueryClient client)
{
    // Note that there's a single line per JSON object. This is not a JSON array.
    IEnumerable<string> jsonRows = new string[]
    {
        "{ 'title': 'exampleJsonFromStream', 'unique_words': 1}",
        "{ 'title': 'moreExampleJsonFromStream', 'unique_words': 1}",
        //add more rows here...
    }.Select(row => row.Replace('\'', '"')); // Simple way of representing C# in JSON to avoid escaping " everywhere.

    // Normally we'd be uploading from a file or similar. Any readable stream can be used.
    var stream = new MemoryStream(Encoding.UTF8.GetBytes(string.Join("\n", jsonRows)));

    // This example uploads data to an existing table. If the upload will create a new table
    // or if the schema in the JSON isn't identical to the schema in the table,
    // create a schema to pass into the call instead of passing in a null value.
    BigQueryJob job = client.UploadJson(datasetId, tableId, null, stream);
    // Use the job to find out when the data has finished being inserted into the table,
    // report errors etc.

    // Wait for the job to complete.
    job.PollUntilCompleted();
}

Zu Zeitpunkt springen

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

u := client.Dataset(datasetID).Table(tableID).Uploader()
items := []*Item{
	// Item implements the ValueSaver interface.
	{Name: "n1", Count: 7},
	{Name: "n2", Count: 2},
	{Name: "n3", Count: 1},
}
if err := u.Put(ctx, items); err != nil {
	return err
}

Java

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

TableId tableId = TableId.of(datasetName, tableName);
// Values of the row to insert
Map<String, Object> rowContent = new HashMap<>();
rowContent.put("booleanField", true);
// Bytes are passed in base64
rowContent.put("bytesField", "Cg0NDg0="); // 0xA, 0xD, 0xD, 0xE, 0xD in base64
// Records are passed as a map
Map<String, Object> recordsContent = new HashMap<>();
recordsContent.put("stringField", "Hello, World!");
rowContent.put("recordField", recordsContent);
InsertAllResponse response = bigquery.insertAll(InsertAllRequest.newBuilder(tableId)
    .addRow("rowId", rowContent)
    // More rows can be added in the same RPC by invoking .addRow() on the builder
    .build());
if (response.hasErrors()) {
  // If any of the insertions failed, this lets you inspect the errors
  for (Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
    // inspect row error
  }
}

Node.js

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

function insertRowsAsStream (datasetId, tableId, rows, projectId) {
  // Instantiates a client
  const bigquery = BigQuery({
    projectId: projectId
  });

  // References an existing dataset, e.g. "my_dataset"
  const dataset = bigquery.dataset(datasetId);
  // References an existing dataset, e.g. "my_dataset"
  const table = dataset.table(tableId);

  // Inserts data into a table
  return table.insert(rows)
    .then((insertErrors) => {
      console.log('Inserted:');
      rows.forEach((row) => console.log(row));
      return insertErrors;
    });
}

PHP

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream a row of data into your BigQuery table
 * Example:
 * ```
 * $data = [
 *     "field1" => "value1",
 *     "field2" => "value2",
 * ];
 * stream_row($projectId, $datasetId, $tableId, $data);
 * ```.
 *
 * @param string $projectId The Google project ID.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId   The BigQuery table ID.
 * @param string $data      An associative array representing a row of data.
 * @param string $insertId  An optional unique ID to guarantee data consistency.
 */
function stream_row($projectId, $datasetId, $tableId, $data, $insertId = null)
{
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
        'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $insertResponse = $table->insertRows([
        ['insertId' => $insertId, 'data' => $data],
        // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

def stream_data(dataset_name, table_name, json_data):
    bigquery_client = bigquery.Client()
    dataset = bigquery_client.dataset(dataset_name)
    table = dataset.table(table_name)
    data = json.loads(json_data)

    # Reload the table to get the schema.
    table.reload()

    rows = [data]
    errors = table.insert_data(rows)

    if not errors:
        print('Loaded 1 row into {}:{}'.format(dataset_name, table_name))
    else:
        print('Errors:')
        pprint(errors)

Ruby

In den BigQuery-Clientbibliotheken wird beschrieben, wie ein BigQuery-Client installiert und erstellt wird.

# project_id = "Your Google Cloud project ID"
# dataset_id = "ID of the dataset containing table"
# table_id   = "ID of the table to import data into"
# row_data   = [{ column1: value, column2: value }, ...]

require "google/cloud/bigquery"

bigquery = Google::Cloud::Bigquery.new project: project_id
dataset  = bigquery.dataset dataset_id
table    = dataset.table table_id

response = table.insert row_data

if response.success?
  puts "Inserted rows successfully"
else
  puts "Failed to insert #{response.error_rows.count} rows"
end

Zurück nach oben

Ressourcen unterwegs überwachen

Projekte jetzt einfach in der Google Cloud Console App verwalten.

Feedback geben zu...

BigQuery-Dokumentation