In diesem Dokument wird gezeigt, wie Sie mit dem BigQuery-E/A-Connector von Apache Beam Daten aus Dataflow in BigQuery schreiben.
Der BigQuery-E/A-Connector ist im Apache Beam SDK verfügbar. Wir empfehlen die Verwendung der aktuellsten SDK-Version. Weitere Informationen finden Sie unter Apache Beam 2.x SDKs.
Für Python steht außerdem eine sprachübergreifende Unterstützung zur Verfügung.
Überblick
Der BigQuery-E/A-Connector unterstützt folgende Methoden zum Schreiben in BigQuery:
STORAGE_WRITE_API
. In diesem Modus führt der Connector direkte Schreibvorgänge in BigQuery-Speicher über die BigQuery Storage Write API aus. Die Storage Write API kombiniert die Streamingaufnahme und das Laden im Batch in einer einzigen Hochleistungs-API. Dieser Modus stellt eine "Genau einmal"-Semantik sicher.STORAGE_API_AT_LEAST_ONCE
. Dieser Modus verwendet auch die Storage Write API, stellt aber eine "Mindestens einmal"-Semantik bereit. Dieser Modus führt bei den meisten Pipelines zu einer geringeren Latenz. Doppelte Schreibvorgänge sind jedoch möglich.FILE_LOADS
. In diesem Modus schreibt der Connector die Eingabedaten in Staging-Dateien in Cloud Storage. Anschließend wird ein BigQuery-Ladejob ausgeführt, um die Daten in BigQuery zu laden. Der Modus ist die Standardeinstellung für begrenztePCollections
, die am häufigsten in Batchpipelines gefunden werden.STREAMING_INSERTS
. In diesem Modus verwendet der Connector die Legacy-Streaming API. Dieser Modus ist die Standardeinstellung für unbegrenztePCollections
, wird jedoch für neue Projekte nicht empfohlen.
Beachten Sie bei der Auswahl einer Schreibmethode folgende Punkte:
- Für Streamingjobs sollten Sie
STORAGE_WRITE_API
oderSTORAGE_API_AT_LEAST_ONCE
verwenden, da diese Modi direkt in BigQuery-Speicher schreiben, ohne Zwischen-Staging-Dateien zu verwenden. - Wenn Sie die Pipeline im "Mindestens einmal"-Streamingmodus ausführen, legen Sie den Schreibmodus auf
STORAGE_API_AT_LEAST_ONCE
fest. Diese Einstellung ist effizienter und entspricht der Semantik des "Mindestens einmal"-Streamingmodus. - Für Dateiladevorgänge und die Storage Write API gelten unterschiedliche Kontingente und Limits.
- Ladejobs verwenden entweder den freigegebenen BigQuery-Slot-Pool oder reservierte Slots. Führen Sie den Ladejob in einem Projekt mit einer Reservierungszuweisung vom Typ
PIPELINE
aus, um reservierte Slots zu verwenden. Ladejobs sind kostenlos, wenn Sie den freigegebenen BigQuery-Slot-Pool verwenden. BigQuery gibt jedoch keine Garantien für verfügbare Kapazitäten des gemeinsamen Pools. Weitere Informationen finden Sie unter Einführung in Reservierungen.
Parallelität
Für
FILE_LOADS
undSTORAGE_WRITE_API
in Streamingpipelines teilt der Connector die Daten in eine Reihe von Dateien oder Streams. Im Allgemeinen empfehlen wir,withAutoSharding
aufzurufen, um die automatische Fragmentierung zu aktivieren.Bei
FILE_LOADS
in Batchpipelines schreibt der Connector Daten in partitionierte Dateien. Diese werden dann parallel in BigQuery geladen.Bei
STORAGE_WRITE_API
in Batchpipelines erstellt jeder Worker einen oder mehrere Streams zum Schreiben in BigQuery, die durch die Gesamtzahl der Shards bestimmt werden.Für
STORAGE_API_AT_LEAST_ONCE
gibt es einen einzelnen Standardschreibstream. An diesen Stream werden mehrere Worker angehängt.
Leistung
Die folgende Tabelle enthält Leistungsmesswerte verschiedener BigQuery E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2
-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Sie haben nicht Runner v2 verwendet.
100 Mio. Datensätze | 1 KB | 1 Spalte | Durchsatz (Byte) | Durchsatz (Elemente) |
---|---|---|
Schreiben im Speicher | 55 Mbit/s | 54.000 Elemente pro Sekunde |
Avro-Last | 78 Mbit/s | 77.000 Elemente pro Sekunde |
Json-Load | 54 Mbit/s | 53.000 Elemente pro Sekunde |
Diese Messwerte basieren auf einfachen Batchpipelines. Sie sollen die Leistung von E/A-Connectors vergleichen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Leistung von Beam IO.
Best Practices
In diesem Abschnitt werden Best Practices für das Schreiben von Daten in BigQuery aus Dataflow beschrieben.
Allgemeines
Für die Storage Write API gelten Kontingentlimits. Der Connector verarbeitet diese Limits für die meisten Pipelines. Einige Szenarien können jedoch die verfügbaren Storage Write API-Streams ausschöpfen. Dieses Problem kann beispielsweise in einer Pipeline auftreten, die automatische Fragmentierung und Autoscaling mit einer großen Anzahl von Zielen verwendet, insbesondere bei lang andauernden Jobs mit sehr variablen Arbeitslasten. Wenn dieses Problem auftritt, sollten Sie
STORAGE_WRITE_API_AT_LEAST_ONCE
verwenden. Dadurch wird das Problem vermieden.Verwenden Sie Google Cloud-Messwerte, um die Kontingentnutzung der Storage Write API zu überwachen.
Bei Verwendung von Dateiladevorgängen übertrifft Avro normalerweise JSON. Rufen Sie
withAvroFormatFunction
auf, um Avro zu verwenden.Standardmäßig werden Ladejobs im selben Projekt wie der Dataflow-Job ausgeführt. Wenn Sie ein anderes Projekt angeben möchten, rufen Sie
withLoadJobProjectId
auf.Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann
useBeamSchema
in der Pipeline auf, um automatisch zwischen Apache Beam-Row
- und BigQuery-TableRow
-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unterExampleModel.java
.Wenn Sie Tabellen mit komplexen Schemas laden, die Tausende von Feldern enthalten, sollten Sie vielleicht
withMaxBytesPerPartition
aufrufen, um für jeden Ladejob eine kleinere maximale Größe festzulegen.
Streamingpipelines
Die folgenden Empfehlungen gelten für Streamingpipelines.
Für Streamingpipelines empfehlen wir die Verwendung der Storage Write API (
STORAGE_WRITE_API
oderSTORAGE_API_AT_LEAST_ONCE
).Eine Streaming-Pipeline kann Dateiladevorgänge verwenden. Dieser Ansatz hat jedoch Nachteile:
- Es ist Windowing erforderlich, um die Dateien zu schreiben. Das globale Fenster kann nicht verwendet werden.
- BigQuery lädt Dateien auf Best-Effort-Basis, wenn der freigegebene Slot-Pool verwendet wird. Zwischen dem Schreiben eines Eintrags und dessen Verfügbarkeit in BigQuery kann es zu einer erheblichen Verzögerung kommen.
- Wenn ein Ladejob fehlschlägt, z. B. aufgrund fehlerhafter Daten oder Schemaabweichungen, schlägt die gesamte Pipeline fehl.
Verwenden Sie nach Möglichkeit
STORAGE_WRITE_API_AT_LEAST_ONCE
. Dies kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden. Dieser Vorgang ist jedoch kostengünstiger und skalierbarer alsSTORAGE_WRITE_API
.Vermeiden Sie im Allgemeinen die Verwendung von
STREAMING_INSERTS
. Streaming-Insert-Anweisungen sind teurer als die Storage Write API und bieten auch keine gute Leistung.Die Datenfragmentierung kann die Leistung in Streamingpipelines verbessern. Bei den meisten Pipelines ist die automatische Fragmentierung ein guter Ausgangspunkt. Sie können die Fragmentierung jedoch so optimieren:
- Rufen Sie für
STORAGE_WRITE_API
withNumStorageWriteApiStreams
auf, um die Anzahl der Schreibstreams festzulegen. - Rufen Sie für
FILE_LOADS
withNumFileShards
auf, um die Anzahl der Dateifragmente festzulegen.
- Rufen Sie für
Wenn Sie Streaming-Insert-Anweisungen verwenden, empfehlen wir,
retryTransientErrors
als Wiederholungsrichtlinie festzulegen.
Batchpipelines
Die folgenden Empfehlungen gelten für Batchpipelines.
Für die meisten großen Batchpipelines empfehlen wir, es zuerst mit
FILE_LOADS
zu versuchen. Eine Batchpipeline kannSTORAGE_WRITE_API
verwenden. Allerdings wird sie wahrscheinlich bei großen Datenmengen (mehr als 1.000 vCPUs) oder bei gleichzeitig ausgeführten Pipelines Kontingentlimits überschreiten. Apache Beam drosselt die maximale Anzahl von Schreibstreams fürSTORAGE_WRITE_API
-Batchjobs nicht. Dadurch erreicht der Job schließlich die BigQuery Storage API-Limits.Wenn Sie
FILE_LOADS
verwenden, können Sie entweder den freigegebenen BigQuery-Slot-Pool oder Ihren Pool reservierter Slots erschöpfen. Wenn diese Art von Fehlern auftritt, versuchen Sie die folgenden Ansätze:- Reduzieren Sie die maximale Anzahl von Workern oder die Worker-Größe für den Job.
- Erwerben Sie mehr reservierte Slots.
- Geeignete Methoden:
STORAGE_WRITE_API
Kleine bis mittelgroße Pipelines (< 1.000 vCPUs) können von
STORAGE_WRITE_API
profitieren. Bei diesen kleineren Jobs sollten SieSTORAGE_WRITE_API
verwenden, wenn Sie eine Warteschlange für unzustellbare Nachrichten benötigen oder wenn der freigegebene Slot-PoolFILE_LOADS
nicht ausreicht.Wenn Sie doppelte Daten tolerieren können, sollten Sie die Verwendung von
STORAGE_WRITE_API_AT_LEAST_ONCE
in Betracht ziehen. Dieser Modus kann dazu führen, dass doppelte Datensätze in BigQuery geschrieben werden, ist jedoch möglicherweise günstiger als die OptionSTORAGE_WRITE_API
.Unterschiedliche Schreibmodi können je nach den Eigenschaften Ihrer Pipeline unterschiedlich funktionieren. Experimentieren Sie, um den besten Schreibmodus für Ihre Arbeitslast zu ermitteln.
Fehler auf Zeilenebene behandeln
In diesem Abschnitt wird beschrieben, wie Sie mögliche Fehler auf Zeilenebene behandeln können, z. B. aufgrund falsch formatierter Eingabedaten oder wegen Schemaabweichungen.
Für die Storage Write API werden alle Zeilen, die nicht geschrieben werden können, in einen separaten PCollection
geschrieben. Rufen Sie getFailedStorageApiInserts
für das Objekt WriteResult
auf, um diese Sammlung abzurufen. Ein Beispiel für diesen Ansatz finden Sie unter Daten in BigQuery streamen.
Es wird empfohlen, die Fehler zur späteren Verarbeitung an eine Warteschlange oder Tabelle für unzustellbare Nachrichten zu senden. Weitere Informationen zu diesem Muster finden Sie unter BigQueryIO
-Dead-Letter-Muster.
Wenn für FILE_LOADS
ein Fehler beim Laden der Daten auftritt, schlägt der Ladejob fehl und die Pipeline löst eine Laufzeitausnahme aus. Sie können den Fehler in den Dataflow-Logs oder im BigQuery-Jobverlauf aufrufen.
Der E/A-Connector gibt keine Informationen zu einzelnen fehlgeschlagenen Zeilen zurück.
Weitere Informationen zur Fehlerbehebung finden Sie unter BigQuery-Connector-Fehler.
Beispiele
Die folgenden Beispiele zeigen, wie Sie mit Dataflow in BigQuery schreiben.
In vorhandene Tabellen schreiben
Im folgenden Beispiel wird eine Batchpipeline erstellt, die einen PCollection<MyData>
in BigQuery schreibt, wobei MyData
ein benutzerdefinierter Datentyp ist.
Die Methode BigQueryIO.write()
gibt einen BigQueryIO.Write<T>
-Typ zurück, der zum Konfigurieren des Schreibvorgangs verwendet wird. Weitere Informationen finden Sie in der Apache Beam-Dokumentation unter In Tabellen schreiben. In diesem Codebeispiel wird eine vorhandene Tabelle (CREATE_NEVER
) geschrieben. Die neuen Zeilen werden an die Tabelle angehängt (WRITE_APPEND
).
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
In neue oder vorhandene Tabellen schreiben
Im folgenden Beispiel wird eine neue Tabelle erstellt, wenn die Zieltabelle nicht vorhanden ist. Dazu wird die Erstellungsanordnung auf CREATE_IF_NEEDED
gesetzt. Wenn Sie diese Option verwenden, müssen Sie ein Tabellenschema angeben. Der Connector verwendet dieses Schema, wenn neue Tabellen erstellt werden.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Daten zu BigQuery streamen
Im folgenden Beispiel wird gezeigt, wie Sie Daten mit einer "Genau einmal"-Semantik streamen. Dazu setzen Sie den Schreibmodus auf STORAGE_WRITE_API
.
Nicht alle Streamingpipelines erfordern eine "Genau einmal"-Semantik. Beispielsweise können Sie Duplikate eventuell aus der Zieltabelle manuell entfernen. Wenn für Ihr Szenario doppelt vorhandene Datensätze akzeptabel sind, sollten Sie die "Mindestens einmal"-Semantik verwenden. Dazu setzen Sie die Schreibmethode auf STORAGE_API_AT_LEAST_ONCE
. Diese Methode ist im Allgemeinen effizienter und führt bei den meisten Pipelines zu einer geringeren Latenz.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Nächste Schritte
- Erfahren Sie mehr über den BigQuery-E/A-Connector in der Apache Beam-Dokumentation.
- Lesen Sie den Artikel Daten mit der Storage Write API in BigQuery streamen (Blogpost).