Best Practices für die BigQuery Storage Write API
Dieses Dokument enthält Best Practices für die Verwendung der BigQuery Storage Write API. Bevor Sie dieses Dokument lesen, sollten Sie die Übersicht über die BigQuery Storage Write API lesen.
Die Rate des Streams begrenzen
Bevor Sie einen Stream erstellen, sollten Sie prüfen, ob Sie den Standardstream verwenden können. Für Streamingszenarien hat der Standardstream weniger Kontingentlimits und kann besser skaliert werden als von Anwendungen erstellte Streams. Wenn Sie einen von der Anwendung erstellten Stream verwenden, achten Sie darauf, den maximalen Durchsatz für jeden Stream zu nutzen, bevor Sie zusätzliche Streams erstellen. Verwenden Sie beispielsweise asynchrone Schreibvorgänge.
Vermeiden Sie bei von Anwendungen erstellten Streams einen häufigen Aufruf von CreateWriteStream
. Im Allgemeinen erhöht sich die Latenz der API-Aufrufe erheblich, wenn Sie 40–50 Aufrufe pro Sekunde überschreiten (> 25s). Vergewissern Sie sich, dass Ihre Anwendung einen Kaltstart akzeptiert und die Anzahl der Streams schrittweise erhöht werden kann, und begrenzen Sie die Anzahl der CreateWriteStream
-Aufrufe. Sie können auch eine längere Frist für den Abschluss des Aufrufs festlegen, damit dieser nicht mit dem Fehler DeadlineExceeded
fehlschlägt. Außerdem gibt es ein längerfristiges Kontingent für die maximale Anzahl von CreateWriteStream
-Aufrufen. Das Erstellen von Streams ist ein ressourcenintensiver Prozess. Daher sollten Sie die Rate des Streams erstellen und vorhandene Streams vollständig nutzen, um dieses Limit nicht zu überschreiten.
Verwaltung von Verbindungspools
Die Methode AppendRows
erstellt eine Verbindung zu einem Stream. Sie können mehrere Verbindungen im Standardstream öffnen, aber nur eine einzige aktive Verbindung für von der Anwendung erstellte Streams.
Wenn Sie den Standardstream verwenden, können Sie das Multiplexing der Storage Write API verwenden, um in mehrere Zieltabellen mit freigegebenen Verbindungen zu schreiben. Multiplex-Poolverbindungen für besseren Durchsatz und bessere Ressourcennutzung. Wenn Ihr Workflow mehr als 20 gleichzeitige Verbindungen hat, empfehlen wir die Verwendung von Multiplexing. Multiplexing ist in Java und Go verfügbar. Weitere Informationen zu Java-Implementierungen finden Sie unter Multiplexing verwenden. Weitere Informationen zur Go-Implementierung finden Sie unter Verbindungsfreigabe (Multiplexing). Wenn Sie den Beam-Connector mit "Mindestens einmal"-Semantik verwenden, können Sie das Multiplexing über UseStorageApiConnectionPool aktivieren. Im Dataproc Spark-Connector ist Multiplexing standardmäßig aktiviert.
Für eine optimale Leistung sollten Sie eine Verbindung für so viele Datenschreibvorgänge wie möglich verwenden. Verwenden Sie eine Verbindung nicht nur für einen einzigen Schreibvorgang oder öffnen und schließen Sie keine Streams für viele kleine Schreibvorgänge.
Es gibt ein Kontingent für die Anzahl der gleichzeitigen Verbindungen, die pro Projekt gleichzeitig geöffnet sein können. Wenn das Limit überschritten wird, schlagen Aufruf an AppendRows
fehl.
Das Kontingent für gleichzeitige Verbindungen kann jedoch erhöht werden und sollte normalerweise kein begrenzender Faktor für die Skalierung sein.
Bei jedem Aufruf von AppendRows
wird ein neues Datenautor-Objekt erstellt. Bei Verwendung eines von der Anwendung erstellten Streams entspricht die Anzahl der Verbindungen also der Anzahl der erstellten Streams. Im Allgemeinen unterstützt eine einzelne Verbindung einen Durchsatz von mindestens 1 MB/s. Die obere Grenze hängt von mehreren Faktoren ab, z. B. von der Netzwerkbandbreite, dem Schema der Daten und der Serverlast, kann aber 10 MBps überschreiten.
Außerdem gibt es ein Kontingent für den Gesamtdurchsatz pro Projekt. Dies stellt die Byte pro Sekunde für alle Verbindungen dar, die über den Storage Write API-Dienst fließen. Wenn Ihr Projekt dieses Kontingent überschreitet, können Sie ein höheres Kontingentlimit anfordern. Dazu müssen in der Regel zugehörige Kontingente wie das Kontingent für gleichzeitige Verbindungen in einem gleichen Verhältnis erhöht werden.
Stream-Offsets für eine „Exactly-Once“-Semantik verwalten
Die Storage Write API lässt nur Schreibvorgänge an das aktuelle Ende des Streams zu, das mit dem Anhängen von Daten verschoben wird. Die aktuelle Position im Stream wird als Versatz ab dem Beginn des Streams angegeben.
Wenn Sie in einen von der Anwendung erstellten Stream schreiben, können Sie den Stream-Offset angeben, um eine genau einmalige Schreibsemantik zu erreichen.
Wenn Sie ein Offset angeben, ist der Schreibvorgang idempotent, d. h., er kann bei Netzwerkfehlern oder Nichtbeantwortung durch den Server sicher wiederholt werden. Verarbeiten Sie die folgenden Fehler im Zusammenhang mit Offsets:
ALREADY_EXISTS
(StorageErrorCode.OFFSET_ALREADY_EXISTS
): Die Zeile wurde bereits geschrieben. Sie können diesen Fehler einfach ignorieren.OUT_OF_RANGE
(StorageErrorCode.OFFSET_OUT_OF_RANGE
): Ein vorheriger Schreibvorgang ist fehlgeschlagen. Wiederholen Sie den Vorgang ab dem letzten erfolgreichen Schreibvorgang.
Beachten Sie, dass diese Fehler auch auftreten können, wenn Sie den falschen Offset-Wert festlegen. Daher müssen Sie Offsets sorgfältig verwalten.
Überlegen Sie vor Verwendung von Stream-Offsets, ob Sie eine „Exactly-Once“-Semantik benötigen. Wenn Ihre vorgelagerte Datenpipeline beispielsweise nur „At-Least-Once“-Schreibvorgänge garantiert oder wenn Sie Duplikate nach der Datenaufnahme leicht erkennen können, benötigen Sie möglicherweise keine „Exactly-Once“-Schreibvorgänge. In diesem Fall empfehlen wir die Verwendung des Standardstreams, bei dem keine Zeilen-Offsets erfasst werden müssen.
AppendRows
-Aufrufe nicht blockieren
Die Methode AppendRows
ist asynchron. Sie können eine Reihe von Schreibvorgängen senden, ohne eine Antwort für jeden einzelnen Vorgang zu blockieren. Die Antwortnachrichten auf der bidirektionalen Verbindung treffen in der gleichen Reihenfolge ein, in der die Anfragen in die Warteschlange gestellt wurden.
Für den höchsten Durchsatz rufen Sie AppendRows
auf, ohne zu blockieren und auf die Antwort zu warten.
Schemaaktualisierungen verarbeiten
Bei Datenstreamingszenarien werden Tabellenschemas normalerweise außerhalb der Streamingpipeline verwaltet. Es ist üblich, dass das Schema im Laufe der Zeit weiterentwickelt wird, beispielsweise durch Hinzufügen neuer Felder mit zulässigen Nullwerten. Eine robuste Pipeline muss „Out-of-Band“-Schemaaktualisierungen verarbeiten.
Die Storage Write API unterstützt Tabellenschemas so:
- Die erste Schreibanfrage enthält das Schema.
- Sie senden jede Datenzeile als binären Protokollpuffer. BigQuery ordnet die Daten dem Schema zu.
- Sie können Feldern weglassen, in denen Nullwerte zulässig sind, aber Sie können keine Felder aufnehmen, die im aktuellen Schema nicht vorhanden sind. Wenn Sie Zeilen mit zusätzlichen Feldern senden, gibt die Storage Write API einen
StorageError
mitStorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELD
zurück.
Wenn Sie neue Felder in der Nutzlast senden möchten, müssen Sie zuerst das Tabellenschema in BigQuery aktualisieren. Die Storage Write API erkennt Schemaänderungen nach kurzer Zeit, in der Größenordnung von Minuten. Wenn die Storage Write API die Schemaänderung erkennt, enthält die AppendRowsResponse
-Antwortnachricht ein TableSchema
-Objekt, das das neue Schema beschreibt.
Damit Sie Daten mit dem aktualisierten Schema senden können, müssen Sie vorhandene Verbindungen schließen und neue Verbindungen mit dem neuen Schema öffnen.
Java-Client. Die Java-Clientbibliothek bietet über die Klasse JsonStreamWriter
einige zusätzliche Funktionen für Schemaaktualisierungen. Nach einer Schemaaktualisierung stellt JsonStreamWriter
automatisch wieder eine Verbindung zum aktualisierten Schema her. Sie müssen die Verbindung nicht explizit schließen und neu öffnen.
Um programmatisch auf Schemaänderungen zu prüfen, rufen Sie AppendRowsResponse.hasUpdatedSchema
auf, nachdem die Methode append
abgeschlossen ist.
Sie können den JsonStreamWriter
auch so konfigurieren, dass unbekannte Felder in den Eingabedaten ignoriert werden. Rufen Sie zum Festlegen dieses Verhaltens setIgnoreUnknownFields
auf. Dieses Verhalten ähnelt der Option ignoreUnknownValues
, wenn Sie die Legacy-tabledata.insertAll
API verwenden. Dies kann jedoch zu einem unbeabsichtigten Datenverlust führen, da unbekannte Felder ohne Rückmeldung verworfen werden.