In diesem Dokument wird beschrieben, wie Sie mit dem BigQuery-E/A-Connector von Apache Beam Daten aus Dataflow in BigQuery schreiben.
Der BigQuery-E/A-Connector ist im Apache Beam SDK verfügbar. Wir empfehlen die Verwendung der aktuellsten SDK-Version. Weitere Informationen finden Sie unter Apache Beam 2.x SDKs.
Für Python steht außerdem eine sprachübergreifende Unterstützung zur Verfügung.
Übersicht
Der BigQuery-E/A-Connector unterstützt folgende Methoden zum Schreiben in BigQuery:
STORAGE_WRITE_API
. In diesem Modus führt der Connector direkte Schreibvorgänge in BigQuery-Speicher über die BigQuery Storage Write API aus. Die Storage Write API kombiniert die Streamingaufnahme und das Laden im Batch in einer einzigen Hochleistungs-API. Dieser Modus stellt eine "Genau einmal"-Semantik sicher.STORAGE_API_AT_LEAST_ONCE
. Dieser Modus verwendet auch die Storage Write API, stellt aber eine "Mindestens einmal"-Semantik bereit. Dieser Modus führt bei den meisten Pipelines zu einer geringeren Latenz. Doppelte Schreibvorgänge sind jedoch möglich.FILE_LOADS
. In diesem Modus schreibt der Connector die Eingabedaten in Stagingdateien in Cloud Storage. Anschließend wird ein Ladejob in BigQuery ausgeführt, um die Daten in BigQuery zu laden. Dieser Modus ist die Standardeinstellung für begrenztePCollections
, die am häufigsten in Batch-Pipelines vorkommen.STREAMING_INSERTS
. In diesem Modus verwendet der Connector die Legacy-Streaming-API. Dieser Modus ist der Standard für unbegrenztePCollections
, wird aber für neue Projekte nicht empfohlen.
Beachten Sie bei der Auswahl einer Schreibmethode folgende Punkte:
- Für Streamingjobs sollten Sie
STORAGE_WRITE_API
oderSTORAGE_API_AT_LEAST_ONCE
verwenden, da diese Modi direkt in BigQuery-Speicher schreiben, ohne Zwischen-Staging-Dateien zu verwenden. - Wenn Sie die Pipeline im "Mindestens einmal"-Streamingmodus ausführen, legen Sie den Schreibmodus auf
STORAGE_API_AT_LEAST_ONCE
fest. Diese Einstellung ist effizienter und entspricht der Semantik des „Mindestens einmal“-Streamingmodus. - Für Dateiladevorgänge und die Storage Write API gelten unterschiedliche Kontingente und Limits.
- Für Ladejobs wird entweder der freigegebene BigQuery-Slot-Pool oder reservierte Slots verwendet. Wenn Sie reservierte Slots verwenden möchten, führen Sie den Ladejob in einem Projekt mit einer Reservierungszuweisung vom Typ
PIPELINE
aus. Ladejobs sind kostenlos, wenn Sie den freigegebenen BigQuery-Slot-Pool verwenden. BigQuery gibt jedoch keine Garantien für verfügbare Kapazitäten des gemeinsamen Pools. Weitere Informationen finden Sie unter Einführung in Reservierungen.
Parallelität
Bei
FILE_LOADS
undSTORAGE_WRITE_API
in Streaming-Pipelines teilt der Connector die Daten in eine Reihe von Dateien oder Streams auf. Im Allgemeinen empfehlen wir,withAutoSharding
aufzurufen, um die automatische Fragmentierung zu aktivieren.Bei
FILE_LOADS
in Batchpipelines schreibt der Connector Daten in partitionierte Dateien, die dann parallel in BigQuery geladen werden.Bei
STORAGE_WRITE_API
in Batch-Pipelines erstellt jeder Worker einen oder mehrere Streams, die in BigQuery geschrieben werden. Die Anzahl der Streams wird durch die Gesamtzahl der Fragmente bestimmt.Für
STORAGE_API_AT_LEAST_ONCE
gibt es einen einzelnen Standardschreibstream. Mehrere Worker fügen diesem Stream Daten hinzu.
Leistung
Die folgende Tabelle enthält Leistungsmesswerte für verschiedene BigQuery-E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2
-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.
100 Mio. Datensätze | 1 KB | 1 Spalte | Durchsatz (Byte) | Durchsatz (Elemente) |
---|---|---|
Schreiben im Speicher | 55 Mbit/s | 54.000 Elemente pro Sekunde |
Avro Load | 78 Mbit/s | 77.000 Elemente pro Sekunde |
JSON-Ladeprogramm | 54 Mbit/s | 53.000 Elemente pro Sekunde |
Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.
Best Practices
In diesem Abschnitt werden Best Practices für das Schreiben von Daten in BigQuery aus Dataflow beschrieben.
Allgemeines
Für die Storage Write API gelten Kontingentlimits. Der Connector verarbeitet diese Limits für die meisten Pipelines. In einigen Fällen können jedoch die verfügbaren Storage Write API-Streams erschöpft werden. Dieses Problem kann beispielsweise in einer Pipeline auftreten, die automatische Fragmentierung und Autoskalierung mit einer großen Anzahl von Zielen verwendet, insbesondere bei lang laufenden Jobs mit stark variablen Arbeitslasten. Wenn dieses Problem auftritt, können Sie stattdessen
STORAGE_WRITE_API_AT_LEAST_ONCE
verwenden.Mit den Google Cloud-Messwerten können Sie die Nutzung Ihres Storage Write API-Kontingents im Blick behalten.
Bei Dateiuploads ist Avro in der Regel schneller als JSON. Wenn Sie Avro verwenden möchten, rufen Sie
withAvroFormatFunction
auf.Standardmäßig werden Ladejobs im selben Projekt wie der Dataflow-Job ausgeführt. Wenn Sie ein anderes Projekt angeben möchten, drücken Sie die Taste
withLoadJobProjectId
.Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann
useBeamSchema
in Ihrer Pipeline auf, um automatisch zwischen Apache Beam-Row
- und BigQuery-TableRow
-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unterExampleModel.java
.Wenn Sie Tabellen mit komplexen Schemas laden, die Tausende von Feldern enthalten, sollten Sie vielleicht
withMaxBytesPerPartition
aufrufen, um für jeden Ladejob eine kleinere maximale Größe festzulegen.
Streamingpipelines
Die folgenden Empfehlungen gelten für Streamingpipelines.
Für Streamingpipelines empfehlen wir die Verwendung der Storage Write API (
STORAGE_WRITE_API
oderSTORAGE_API_AT_LEAST_ONCE
).Eine Streamingpipeline kann Dateiladungen verwenden, dieser Ansatz hat jedoch Nachteile:
- Zum Schreiben der Dateien ist Windowing erforderlich. Sie können das globale Fenster nicht verwenden.
- BigQuery lädt Dateien nach dem Best-Effort-Prinzip, wenn der freigegebene Slot-Pool verwendet wird. Zwischen dem Schreiben eines Datensatzes und seiner Verfügbarkeit in BigQuery kann es zu einer erheblichen Verzögerung kommen.
- Wenn ein Ladejob fehlschlägt, z. B. aufgrund fehlerhafter Daten oder Schemaabweichungen, schlägt die gesamte Pipeline fehl.
Verwenden Sie nach Möglichkeit
STORAGE_WRITE_API_AT_LEAST_ONCE
. Dies kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch kostengünstiger und skalierbarer alsSTORAGE_WRITE_API
.Vermeiden Sie im Allgemeinen die Verwendung von
STREAMING_INSERTS
. Streaming-Insert-Anweisungen sind teurer als die Storage Write API und bieten auch keine gute Leistung.Durch die Fragmentierung von Daten kann die Leistung in Streaming-Pipelines verbessert werden. Für die meisten Pipelines ist die automatische Fragmentierung ein guter Ausgangspunkt. Sie können die Fragmentierung jedoch so optimieren:
- Rufen Sie für
STORAGE_WRITE_API
withNumStorageWriteApiStreams
auf, um die Anzahl der Schreibstreams festzulegen. - Rufen Sie für
FILE_LOADS
withNumFileShards
auf, um die Anzahl der Dateifragmente festzulegen.
- Rufen Sie für
Wenn Sie Streaming-Insert-Anweisungen verwenden, empfehlen wir,
retryTransientErrors
als Wiederholungsrichtlinie festzulegen.
Batchpipelines
Die folgenden Empfehlungen gelten für Batchpipelines.
Für die meisten großen Batchpipelines empfehlen wir, es zuerst mit
FILE_LOADS
zu versuchen. Eine Batchpipeline kannSTORAGE_WRITE_API
verwenden. Allerdings wird sie wahrscheinlich bei großen Datenmengen (mehr als 1.000 vCPUs) oder bei gleichzeitig ausgeführten Pipelines Kontingentlimits überschreiten. Apache Beam drosselt die maximale Anzahl von Schreibstreams fürSTORAGE_WRITE_API
-Batchjobs nicht. Dadurch erreicht der Job schließlich die BigQuery Storage API-Limits.Wenn Sie
FILE_LOADS
verwenden, können Sie entweder den freigegebenen BigQuery-Slot-Pool oder Ihren Pool reservierter Slots erschöpfen. Wenn diese Art von Fehlern auftritt, versuchen Sie die folgenden Ansätze:- Reduzieren Sie die maximale Anzahl von Workern oder die Worker-Größe für den Job.
- Erwerben Sie mehr reservierte Slots.
- Geeignete Methoden:
STORAGE_WRITE_API
Kleine bis mittelgroße Pipelines (< 1.000 vCPUs) können von
STORAGE_WRITE_API
profitieren. Bei diesen kleineren Jobs sollten SieSTORAGE_WRITE_API
verwenden, wenn Sie eine Warteschlange für unzustellbare Nachrichten benötigen oder wenn der freigegebene Slot-PoolFILE_LOADS
nicht ausreicht.Wenn Sie doppelte Daten tolerieren können, sollten Sie die Verwendung von
STORAGE_WRITE_API_AT_LEAST_ONCE
in Betracht ziehen. Dieser Modus kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch möglicherweise günstiger als die OptionSTORAGE_WRITE_API
.Unterschiedliche Schreibmodi können je nach den Eigenschaften Ihrer Pipeline unterschiedlich funktionieren. Experimentieren Sie, um den besten Schreibmodus für Ihre Arbeitslast zu ermitteln.
Fehler auf Zeilenebene behandeln
In diesem Abschnitt wird beschrieben, wie Sie mögliche Fehler auf Zeilenebene behandeln können, z. B. aufgrund falsch formatierter Eingabedaten oder wegen Schemaabweichungen.
Bei der Storage Write API werden alle Zeilen, die nicht geschrieben werden können, in eine separate PCollection
verschoben. Rufen Sie zum Abrufen dieser Sammlung Folgendes auf:
getFailedStorageApiInserts
auf dem WriteResult
-Objekt. Ein Beispiel für diesen Ansatz finden Sie unter Daten in BigQuery streamen.
Es empfiehlt sich, die Fehler zur späteren Verarbeitung an eine Dead-Letter-Warteschlange oder -Tabelle zu senden. Weitere Informationen zu diesem Muster finden Sie unter BigQueryIO
-Muster für unzustellbare Nachrichten.
Wenn bei FILE_LOADS
beim Laden der Daten ein Fehler auftritt, schlägt der Ladejob fehl und die Pipeline löst eine Laufzeitausnahme aus. Sie können den Fehler in den Dataflow-Logs oder im BigQuery-Jobverlauf sehen.
Der E/O-Connector gibt keine Informationen zu einzelnen fehlgeschlagenen Zeilen zurück.
Weitere Informationen zur Fehlerbehebung finden Sie unter BigQuery-Connector-Fehler.
Beispiele
Die folgenden Beispiele zeigen, wie Sie mit Dataflow in BigQuery schreiben.
In vorhandene Tabellen schreiben
Im folgenden Beispiel wird eine Batchpipeline erstellt, die einen PCollection<MyData>
in BigQuery schreibt, wobei MyData
ein benutzerdefinierter Datentyp ist.
Die Methode BigQueryIO.write()
gibt einen BigQueryIO.Write<T>
-Typ zurück, der zum Konfigurieren des Schreibvorgangs verwendet wird. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter In Tabellen schreiben. In diesem Codebeispiel wird in eine vorhandene Tabelle (CREATE_NEVER
) geschrieben und die neuen Zeilen werden an die Tabelle angehängt (WRITE_APPEND
).
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
In neue oder vorhandene Tabellen schreiben
Im folgenden Beispiel wird eine neue Tabelle erstellt, wenn die Zieltabelle nicht vorhanden ist. Dazu wird die Erstellungsanordnung auf CREATE_IF_NEEDED
gesetzt. Wenn Sie diese Option verwenden, müssen Sie ein Tabellenschema angeben. Der Connector verwendet dieses Schema, wenn neue Tabellen erstellt werden.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Daten zu BigQuery streamen
Im folgenden Beispiel wird gezeigt, wie Sie Daten mit einer "Genau einmal"-Semantik streamen. Dazu setzen Sie den Schreibmodus auf STORAGE_WRITE_API
.
Nicht alle Streamingpipelines erfordern eine "Genau einmal"-Semantik. Beispielsweise können Sie Duplikate eventuell aus der Zieltabelle manuell entfernen. Wenn für Ihr Szenario doppelt vorhandene Datensätze akzeptabel sind, sollten Sie die "Mindestens einmal"-Semantik verwenden. Dazu setzen Sie die Schreibmethode auf STORAGE_API_AT_LEAST_ONCE
. Diese Methode ist in der Regel effizienter und führt bei den meisten Pipelines zu einer geringeren Latenz.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Nächste Schritte
- Erfahren Sie mehr über den BigQuery-E/A-Connector in der Apache Beam-Dokumentation.
- Weitere Informationen zum Streamen von Daten mit der Storage Write API in BigQuery (Blogpost)