Daten in BigQuery streamen

Wenn Sie Daten in BigQuery laden möchten, können Sie Ihre Daten auch mithilfe der Methode tabledata.insertAll datensatzweise in BigQuery streamen, anstatt einen Job zu verwenden. Bei dieser Vorgehensweise können Sie Daten ohne die Verzögerung abfragen, die durch die Ausführung eines Ladejobs entsteht.

In diesem Dokument werden mehrere wichtige Alternativen behandelt, die vor der Auswahl eines Ansatzes berücksichtigt werden sollten, darunter Streamingkontingente, Datenverfügbarkeit und Datenkonsistenz.

Hinweise

  1. Sorgen Sie dafür, dass Sie Schreibzugriff auf das Dataset haben, das die Zieltabelle enthält. Die Tabelle muss bereits vorhanden sein, bevor Sie damit beginnen, Daten hineinzuschreiben. Die einzige Ausnahme zu dieser Vorgehensweise sind Vorlagentabellen. Weitere Informationen zu Vorlagentabellen finden Sie unter Tabellen mithilfe von Vorlagentabellen automatisch erstellen.

  2. Prüfen Sie die Kontingentrichtlinie für das Streaming von Daten.

  3. Die Abrechnung für das Google Cloud-Projekt muss aktiviert sein. So prüfen Sie, ob die Abrechnung für Ihr Projekt aktiviert ist.

Das Streaming ist in der kostenlosen Stufe nicht verfügbar. Wenn Sie versuchen, das Streaming ohne aktivierte Abrechnung auszuführen, wird folgende Fehlermeldung angezeigt: BigQuery: Streaming insert is not allowed in the free tier.

Datenverfügbarkeit prüfen

Gestreamte Daten stehen innerhalb weniger Sekunden nach dem Streamen der ersten Insert-Anweisung für Echtzeitanalysen zur Verfügung. In seltenen Fällen (z. B. bei einem Ausfall) sind Daten möglicherweise im Streaming-Zwischenspeicher vorübergehend nicht verfügbar. Wenn nicht alle Daten verfügbar sind, werden Abfragen zwar weiterhin erfolgreich ausgeführt, jedoch werden einige der Daten übersprungen, die sich noch im Streaming-Zwischenspeicher befinden. Diese Abfragen enthalten im Feld errors von bigquery.jobs.getQueryResults, in der Antwort auf bigquery.jobs.query oder im Feld status.errors von bigquery.jobs.get eine Warnung.

Bei Kopiervorgängen kann es bis zu 90 Minuten dauern, bis die Daten verfügbar sind. Beim Streaming in eine partitionierte Tabelle kommt hinzu, dass die Daten im Streaming-Zwischenspeicher für die Pseudospalte _PARTITIONTIME den Wert NULL haben. Wenn Sie herausfinden möchten, ob Daten zum Kopieren verfügbar sind, sehen Sie in der Antwort tables.get nach, ob ein Abschnitt mit dem Namen streamingBuffer vorhanden ist. Fehlt der Abschnitt, sollten Ihre Daten zum Kopieren verfügbar sein und in der Pseudospalte _PARTITIONTIME einen Nicht-Null-Wert haben. Darüber hinaus kann das Feld streamingBuffer.oldestEntryTime verwendet werden, um das Alter von Datensätzen im Streaming-Zwischenspeicher zu ermitteln.

Best-Effort-Deduplizierung

Wenn Sie insertId für eine eingefügte Zeile angeben, verwendet BigQuery diese ID, um Best-Effort-Deduplizierung für bis zu eine Minute zu gewährleisten. Dadurch könnte BigQuery, wenn Sie innerhalb dieses Zeitraums mehrmals versuchen, dieselbe Zeile mit derselben insertId in dieselbe Tabelle zu streamen, die mehrfachen Vorkommen dieser Zeile deduplizieren und nur eines beibehalten.

Dies ist im Allgemeinen für Wiederholungsszenarien in einem verteilten System gedacht, wo bei bestimmten Fehlerbedingungen wie Netzwerkfehlern zwischen Ihrem System und BigQuery oder internen Fehlern in BigQuery keine Möglichkeit zum Ermitteln des Zustands eines Streaming-Insert besteht. Verwenden Sie beim Wiederholen einer Insert-Anweisung dieselbe insertId für denselben Satz Zeilen, damit BigQuery versuchen kann, Ihre Daten zu deduplizieren. Weitere Informationen finden Sie unter Fehlerbehebung bei Streaming-Insert-Anweisungen.

Die Deduplizierung von BigQuery gewährleistet keine bestimmte Qualität der Ergebnisse. Sie sollte daher nicht als Mechanismus verwendet werden, um Ihre Daten garantiert von Duplikaten zu bereinigen. Darüber hinaus kann BigQuery die Qualität der Best-Effort-Deduplizierung jederzeit beeinträchtigen, um eine höhere Zuverlässigkeit und Verfügbarkeit Ihrer Daten zu gewährleisten.

Wenn Sie höhere Anforderungen an die Deduplizierung Ihrer Daten haben, steht mit Google Cloud Datastore ein alternativer Dienst zur Verfügung, der Transaktionen unterstützt.

Best-Effort-Deduplizierung deaktivieren

Sie können die Best-Effort-Deduplizierung deaktivieren, wenn Sie das Feld insertId für keine der eingefügten Zeilen ausfüllen. Wenn Sie das Feld insertId nicht ausfüllen, erhalten Sie höhere Streaming-Aufnahmekontingente in bestimmten Regionen. Dies ist die empfohlene Methode, um höhere Kontingentlimits für die Streamingaufnahme zu erhalten. Weitere Informationen finden Sie unter Kontingente und Limits.

Apache Beam und Dataflow

Verwenden Sie die Methode ignoreInsertIds(), um die Best-Effort-Deduplizierung bei der Verwendung des BigQuery-E/A-Connectors für Java zu deaktivieren.

Daten zonenübergreifend streamen

Daten können sowohl in Datasets in den USA als auch in der EU gestreamt werden. Während BigQuery die insertAll-Anfrage verarbeitet, können die Daten über Rechner bewegt werden, die sich nicht innerhalb der Zone des Datasets befinden. Werden Daten von einer Zone außerhalb der Zone des Datasets gestreamt, kann dies zu erhöhter Latenz und höheren Fehlerquoten führen.

In nach Aufnahmezeit partitionierte Tabellen streamen

Mit insertAll-Anfragen können Sie einzelne Zeilen in eine partitionierte Tabelle streamen. Die Zielpartition für eingefügte Daten wird standardmäßig vom aktuellen Datum basierend auf der UTC-Zeit abgeleitet.

Wenn Sie Daten in eine nach Aufnahmezeit partitionierte Tabelle streamen, können Sie die Datumsinferenz überschreiben. Geben Sie dazu einen Partition-Decorator als Teil der insertAll-Anfrage an. Beispielsweise können Sie mit dem Partition-Decorator in die Partition für den 1. März 2017 der Tabelle mydataset.table streamen:

mydataset.table$20170301

Neu eintreffende Daten werden im Streaming-Zwischenspeicher vorübergehend der Partition UNPARTITIONED zugeordnet. Bei einer Abfrage können daher Daten im Streaming-Zwischenspeicher ausgenommen werden. Dazu werden die NULL-Werte aus der Partition UNPARTITIONED mit einer der Pseudospalten (je nach bevorzugtem Datentyp [_PARTITIONTIME] oder [_PARTITIONDATE]) herausgefiltert.

Mit einem Partition-Decorator können Sie in Partitionen mit einem Datum innerhalb der letzten 31 Tage in der Vergangenheit bzw. 16 Tage in der Zukunft streamen, abhängig vom aktuellen Datum und basierend auf der aktuellen UTC-Zeit. Zum Schreiben in Partitionen für Datumsangaben außerhalb dieser zulässigen Grenzen können Sie Lade- oder Abfragejobs verwenden, wie in Daten in partitionierten Tabellen anfügen oder überschreiben beschrieben.

Streaming in partitionierte Tabellen

Sie können Daten in eine Tabelle streamen, die in einer DATE- oder TIMESTAMP-Spalte partitioniert ist und zwischen fünf Jahren in der Vergangenheit und einem Jahr in der Zukunft liegt. Daten außerhalb dieses Bereichs werden abgelehnt.

Wenn die Daten gestreamt werden, werden sie zuerst in die Partition UNPARTITIONED extrahiert. Wenn genügend nicht partitionierte Daten vorhanden sind, werden die Daten in die entsprechenden Partitionen partitioniert.

Tabellen automatisch mithilfe von Vorlagentabellen erstellen

Vorlagentabellen bieten einen Mechanismus zum Aufteilen einer logischen Tabelle in viele kleinere Tabellen, um kleinere Datasets zu erstellen (z. B. nach Nutzer-ID). Für Vorlagentabellen gelten einige der unten beschriebenen Einschränkungen. Stattdessen wird dieses Verhalten mit partitionierten Tabellen und geclusterten Tabellen erreicht.

Zum Verwenden einer Vorlagentabelle über die BigQuery API fügen Sie zu Ihrer Anfrage insertAll den Parameter templateSuffix hinzu. Fügen Sie für das bq-Befehlszeilentool dem Befehl insert das Flag template_suffix hinzu. Findet BigQuery den Parameter templateSuffix oder das Flag template_suffix, wird die ausgewählte Tabelle als Basisvorlage behandelt und es wird eine neue Tabelle erstellt, die das gleiche Schema wie die ausgewählte Tabelle hat und deren Name das angegebene Suffix enthält:

<targeted_table_name> + <templateSuffix>

Durch die Verwendung einer Vorlagentabelle kann der Overhead vermieden werden, der dabei entsteht, wenn jede Tabelle einzeln erstellt wird und für jede Tabelle das Schema angegeben werden muss. Damit BigQuery die neuen Tabellen für Sie erstellen kann, brauchen Sie nur eine einzige Vorlage zu erstellen und verschiedene Suffixe anzugeben. BigQuery speichert die Tabellen im selben Projekt und Dataset.

Tabellen, die mithilfe von Vorlagentabellen erstellt werden, stehen normalerweise innerhalb weniger Sekunden zur Verfügung. In seltenen Fällen kann es länger dauern, bis sie verfügbar sind.

Das Schema der Vorlagentabelle ändern

Wird das Schema einer Vorlagentabelle geändert, verwenden alle in Folge generierten Tabellen das aktualisierte Schema. Zuvor generierte Tabellen werden davon nur dann beeinflusst, wenn die vorhandene Tabelle über einen Streaming-Zwischenspeicher verfügt.

Wird das Vorlagentabellenschema in vorhandenen Tabellen, die einen Streaming-Zwischenspeicher haben, rückwärtskompatibel geändert, dann wird das Schema dieser aktiv gestreamten, generierten Tabellen ebenfalls aktualisiert. Wird das Vorlagentabellenschema jedoch auf nicht rückwärtskompatible Weise geändert, gehen alle zwischengespeicherten Daten verloren, die das alte Schema verwenden. Auch können neue Daten nicht in vorhandene generierte Tabellen gestreamt werden, die das alte, nun jedoch nicht mehr kompatible Schema verwenden.

Warten Sie nach dem Ändern eines Vorlagentabellenschemas, bis die Änderungen wirksam sind, bevor Sie versuchen, neue Daten einzufügen oder eine Abfrage für die generierten Tabellen auszuführen. Anfragen zum Einfügen neuer Felder sollten innerhalb weniger Minuten möglich sein. Für Abfragen auf die neuen Felder könnte eine längere Wartezeit (bis zu 90 Minuten) erforderlich sein.

Wenn Sie das Schema einer erzeugten Tabelle ändern möchten, sollten Sie dies erst dann tun, wenn das Streaming über die Vorlagentabelle beendet wurde und die Antwort tables.get() den Streaming-Statistik-Abschnitt der erzeugten Tabelle nicht mehr enthält, also keine Daten mehr in der Tabelle zwischengespeichert werden.

Partitionierte Tabellen und geclusterte Tabellen unterliegen nicht den oben genannten Einschränkungen und sind der empfohlene Mechanismus.

Vorlagentabellendetails

Wert des Vorlagensuffixes
Der Wert für templateSuffix (oder --template_suffix) darf ausschließlich Buchstaben (a–z, A–Z), Zahlen (0–9) oder Unterstriche (_) enthalten. Die maximale gemeinsame Länge von Tabellenname und Tabellensuffix beträgt 1024 Zeichen.
Kontingent
Vorlagentabellen unterliegen ähnlichen Streamingkontingenten wie andere Tabellen. Beim Streamen in Vorlagentabellen erhalten Sie durch das Deaktivieren der Best-Effort-Deduplizierung außerdem keine höheren Kontingente.
Gültigkeitsdauer
Die erzeugte Tabelle übernimmt die Ablaufzeit aus dem Dataset. Wie gewöhnliche Streamingdaten können auch generierte Tabellen nicht sofort kopiert oder exportiert werden.
Entfernen von Duplikaten
Die Deduplizierung findet nur bei einheitlichen Referenzen auf eine Zieltabelle statt. Wenn Sie beispielsweise gleichzeitig sowohl mithilfe von Vorlagentabellen als auch mit einem regulären insertAll-Befehl in eine generierte Tabelle streamen, wird keine Deduplizierung zwischen den Zeilen durchgeführt, die von Vorlagentabellen und über einen regulären insertAll-Befehl eingefügt wurden.
Aufrufe
Bei der Vorlagentabelle und den generierten Tabellen sollte es sich nicht um Ansichten handeln.

Beispielanwendungsfälle

Protokollierung von Ereignissen mit hohem Volumen

Wenn Sie eine Anwendung nutzen, die große Mengen an Daten in Echtzeit sammelt, können Streaming-Insert-Anweisungen eine gute Wahl sein. Anwendungen dieser Art entsprechen normalerweise den folgenden Kriterien:

  • Nicht transaktional: Hohes Volumen, ständiges Anhängen von Zeilen. Die Anwendung kann es tolerieren, dass es in seltenen Fällen zu einer Duplikatbildung kommt oder dass Daten temporär nicht zur Verfügung stehen.
  • Aggregatanalyse: Abfragen werden normalerweise für Trendanalysen ausgeführt. Es findet keine Auswahl einzelner Datensätze oder kleiner Datenmengen statt.

Ein Beispiel für das Logging von Ereignissen mit hohem Volumen ist das Ereignis-Tracking. Angenommen, Sie haben eine mobile App, die Ereignisse verfolgt. Ihre App – oder mobilen Server – könnten Benutzerinteraktionen oder Systemfehler unabhängig aufzeichnen und an BigQuery streamen. Sie könnten diese Daten analysieren, um Gesamttrends wie Bereiche mit hoher Interaktion oder Probleme zu ermitteln und Fehlerbedingungen in Echtzeit zu überwachen.

Duplikate manuell entfernen

Mithilfe des folgenden manuellen Prozesses können Sie nach dem Abschluss des Streamings sicherstellen, dass keine Duplikatzeilen vorliegen.

  1. Fügen Sie Ihrem Tabellenschema die insertId als Spalte hinzu und nehmen Sie den insertId-Wert für jede Zeile in Ihre Daten auf.
  2. Führen Sie nach dem Abschluss des Streamings die folgende Abfrage durch, um nach Duplikaten zu suchen:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Ist das Ergebnis größer als 1, existieren Duplikate.
  3. Führen Sie die folgende Abfrage durch, um Duplikate zu entfernen. Sie sollten eine Zieltabelle angeben, große Ergebnisse zulassen und die Ergebnisvereinfachung deaktivieren.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Hinweise zur Abfrage zum Entfernen von Duplikaten:

  • Die sicherere Strategie für die Abfrage zum Entfernen von Duplikaten ist die Anwendung auf eine neue Tabelle. Alternativ kann sie mit der Schreibanordnung WRITE_TRUNCATE auf die Zieltabelle angewendet werden.
  • Die Abfrage zum Entfernen von Duplikaten fügt am Ende des Tabellenschemas eine row_number-Spalte mit dem Wert 1 ein. In der Abfrage wird mit der Anweisung SELECT * EXCEPT aus dem Standard-SQL-Dialekt die Spalte row_number aus der Zieltabelle ausgeschlossen. Mit dem Präfix #standardSQL wird der Standard-SQL-Dialekt für diese Abfrage aktiviert. Alternativ ist eine Auswahl nach Spaltennamen möglich, um diese Spalte auszulassen.
  • Zum Abfragen von Livedaten mit entfernten Duplikaten können Sie mithilfe der Abfrage zum Entfernen von Duplikaten auch eine Ansicht Ihrer Tabelle erstellen. Beachten Sie, dass die Abfragekosten für die Ansicht basierend auf den in der Ansicht ausgewählten Spalten berechnet werden. Dies kann zu hohen Werten für gescannte Byte führen.

Fehlerbehebung bei Streaming-Insert-Anweisungen

Weitere Informationen zum Beheben von Fehlern beim Streamen von Insert-Anweisungen finden Sie auf der Seite "Fehlerbehebung" im Bereich Fehlerbehebung bei Streaming-Insert-Anweisungen.

Nach oben

Beispiele für Streaming-Insert-Anweisungen

C#

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von C# in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery C# API.


using Google.Cloud.BigQuery.V2;
using System;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Bevor Sie dieses Beispiel ausprobieren, folgen Sie den Schritten zur Einrichtung von Go in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Angaben finden Sie in der Referenzdokumentation zur BigQuery Go API.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Java in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Java API.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void runTableInsertRows() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");

    tableInsertRows(datasetName, tableName, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can also supply optional unique row keys to support de-duplication
                  // scenarios.
                  .addRow(rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Node.js in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Node.js API.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  // const rows = [{name: 'Tom', age: 30}, {name: 'Jane', age: 32}];

  // Create a client
  const bigqueryClient = new BigQuery();

  // Insert data into a table
  await bigqueryClient
    .dataset(datasetId)
    .table(tableId)
    .insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}
insertRowsAsStream();

PHP

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von PHP in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery PHP API.

use Google\Cloud\BigQuery\BigQueryClient;

/** Uncomment and populate these variables in your code */
// $projectId = 'The Google project ID';
// $datasetId = 'The BigQuery dataset ID';
// $tableId   = 'The BigQuery table ID';
// $data = [
//     "field1" => "value1",
//     "field2" => "value2",
// ];

// instantiate the bigquery table service
$bigQuery = new BigQueryClient([
    'projectId' => $projectId,
]);
$dataset = $bigQuery->dataset($datasetId);
$table = $dataset->table($tableId);

$insertResponse = $table->insertRows([
    ['data' => $data],
    // additional rows can go here
]);
if ($insertResponse->isSuccessful()) {
    print('Data streamed into BigQuery successfully' . PHP_EOL);
} else {
    foreach ($insertResponse->failedRows() as $row) {
        foreach ($row['errors'] as $error) {
            printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
        }
    }
}

Python

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Python in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur  Python API.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Ruby in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Ruby API.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Sie müssen das Feld insertId nicht ausfüllen, wenn Sie Zeilen hinzufügen. Die folgenden Beispiele zeigen, wie Sie vermeiden können, dass insertId beim Streamen für jede Zeile gesendet wird.

Java

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Java in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur BigQuery Java API.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void runTableInsertRowsWithoutRowIds() {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create rows to insert
    Map<String, Object> rowContent1 = new HashMap<>();
    rowContent1.put("stringField", "Phred Phlyntstone");
    rowContent1.put("numericField", 32);
    Map<String, Object> rowContent2 = new HashMap<>();
    rowContent2.put("stringField", "Wylma Phlyntstone");
    rowContent2.put("numericField", 29);
    List<InsertAllRequest.RowToInsert> rowContent = new ArrayList<>();
    // insertId is null if not specified
    rowContent.add(InsertAllRequest.RowToInsert.of(rowContent1));
    rowContent.add(InsertAllRequest.RowToInsert.of(rowContent2));
    tableInsertRowsWithoutRowIds(datasetName, tableName, rowContent);
  }

  public static void tableInsertRowsWithoutRowIds(
      String datasetName, String tableName, Iterable<InsertAllRequest.RowToInsert> rows) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(InsertAllRequest.newBuilder(tableId).setRows(rows).build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Bevor Sie dieses Beispiel anwenden, folgen Sie den Schritten zur Einrichtung von Python in der BigQuery-Kurzanleitung: Clientbibliotheken verwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur  Python API.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {u"full_name": u"Phred Phlyntstone", u"age": 32},
    {u"full_name": u"Wylma Phlyntstone", u"age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Nach oben