In diesem Dokument wird beschrieben, wie Sie mit dem BigQuery-E/A-Connector von Apache Beam Daten aus BigQuery in Dataflow lesen.
Übersicht
Der BigQuery-E/A-Connector unterstützt zwei Optionen zum Lesen aus BigQuery:
- Direkte Tabellenlesevorgänge Diese Option ist die schnellste, da die BigQuery Storage Read API verwendet wird.
- Exportjob Bei dieser Option führt BigQuery einen Exportjob aus, mit dem die Tabellendaten in Cloud Storage geschrieben werden. Der Connector liest dann die exportierten Daten aus Cloud Storage. Diese Option ist weniger effizient, da der Exportschritt erforderlich ist.
Exportjobs sind die Standardoption. Wenn Sie direkte Lesevorgänge angeben möchten, rufen Sie withMethod(Method.DIRECT_READ)
auf.
Der Connector serialisiert die Tabellendaten in eine PCollection
. Jedes Element in PCollection
stellt eine einzelne Tabellenzeile dar. Der Connector unterstützt die folgenden Serialisierungsmethoden:
- Daten als Avro-formatierte Einträge lesen Bei dieser Methode geben Sie eine Funktion an, die die Avro-Einträge in einen benutzerdefinierten Datentyp parst.
- Daten als
TableRow
-Objekte lesen Diese Methode ist praktisch, da kein benutzerdefinierter Datentyp erforderlich ist. Die Leistung ist jedoch in der Regel geringer als beim Lesen von Avro-formatierten Datensätzen.
Parallelität
Die Parallelität bei diesem Connector hängt von der Lesemethode ab:
Direkte Lesevorgänge: Der E/A-Connector generiert eine dynamische Anzahl von Streams basierend auf der Größe der Exportanfrage. Diese Streams werden parallel direkt aus BigQuery gelesen.
Exportjobs: BigQuery bestimmt, wie viele Dateien in Cloud Storage geschrieben werden. Die Anzahl der Dateien hängt von der Abfrage und der Datenmenge ab. Der E/A-Connector liest die exportierten Dateien parallel.
Leistung
Die folgende Tabelle enthält Leistungsmesswerte für verschiedene BigQuery-E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2
-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.
100 Mio. Datensätze | 1 KB | 1 Spalte | Durchsatz (Byte) | Durchsatz (Elemente) |
---|---|---|
Speicherlesevorgänge | 120 Mbit/s | 88.000 Elemente pro Sekunde |
Avro Export | 105 Mbit/s | 78.000 Elemente pro Sekunde |
JSON-Export | 110 Mbit/s | 81.000 Elemente pro Sekunde |
Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.
Best Practices
Im Allgemeinen empfehlen wir die direkte Tabellenlese (
Method.DIRECT_READ
). Die Storage Read API eignet sich besser für Datenpipelines als Exportjobs, da der Zwischenschritt zum Exportieren von Daten entfällt.Wenn Sie direkte Lesevorgänge verwenden, werden Ihnen die Kosten für die Nutzung der Storage Read API in Rechnung gestellt. Weitere Informationen finden Sie auf der Seite BigQuery-Preise unter Preise für die Datenextraktion.
Für Exportjobs fallen keine zusätzlichen Kosten an. Für Exportjobs gelten jedoch Einschränkungen. Bei der Übertragung großer Datenmengen, bei der Aktualität an erster Stelle steht und die Kosten angepasst werden können, werden direkte Lesevorgänge empfohlen.
Für die Storage Read API gelten Kontingentlimits. Verwenden Sie Google Cloud-Messwerte, um Ihre Kontingentnutzung zu überwachen.
Wenn Sie die Storage Read API verwenden, werden in den Protokollen möglicherweise Fehlermeldungen zum Ablauf des Leasingzeitraums und zur Zeitüberschreitung der Sitzung angezeigt, z. B.:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
Diese Fehler können auftreten, wenn ein Vorgang länger als das Zeitlimit dauert, in der Regel in Pipelines, die länger als 6 Stunden laufen. Verwenden Sie stattdessen Dateiexporte, um dieses Problem zu vermeiden.
Wenn Sie das Java SDK verwenden, sollten Sie eine Klasse erstellen, die das Schema der BigQuery-Tabelle darstellt. Rufen Sie dann
useBeamSchema
in Ihrer Pipeline auf, um automatisch zwischen Apache Beam-Row
- und BigQuery-TableRow
-Typen zu konvertieren. Ein Beispiel für eine Schemaklasse finden Sie unterExampleModel.java
.
Beispiele
In den Codebeispielen in diesem Abschnitt werden direkte Tabellenlesevorgänge verwendet.
Wenn Sie stattdessen einen Exportjob verwenden möchten, lassen Sie den Aufruf von withMethod
aus oder geben Sie Method.EXPORT
an. Legen Sie dann mit der --tempLocation
Pipeline-Option einen Cloud Storage-Bucket für die exportierten Dateien fest.
In diesen Codebeispielen wird davon ausgegangen, dass die Quelltabelle die folgenden Spalten enthält:
name
(string)age
(integer)
Als JSON-Schemadatei angegeben:
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
Avro-formatierte Datensätze lesen
Verwenden Sie die Methode read(SerializableFunction)
, um BigQuery-Daten in Avro-formatierte Datensätze einzulesen. Diese Methode nimmt eine anwendungsdefinierte Funktion an, die SchemaAndRecord
-Objekte analysiert und einen benutzerdefinierten Datentyp zurückgibt. Die Ausgabe des Connectors ist ein PCollection
Ihres benutzerdefinierten Datentyps.
Der folgende Code liest ein PCollection<MyData>
aus einer BigQuery-Tabelle, wobei MyData
eine anwendungsdefinierte Klasse ist.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Die read
-Methode nimmt eine SerializableFunction<SchemaAndRecord, T>
-Schnittstelle an, die eine Funktion zum Konvertieren von Avro-Datensätzen in eine benutzerdefinierte Datenklasse definiert. Im vorherigen Codebeispiel wird diese Konvertierungsfunktion durch die Methode MyData.apply
implementiert. Die Beispielfunktion analysiert die Felder name
und age
aus dem Avro-Datensatz und gibt eine MyData
-Instanz zurück.
Um anzugeben, welche BigQuery-Tabelle gelesen werden soll, rufen Sie die Methode from
auf, wie im vorherigen Beispiel gezeigt. Weitere Informationen finden Sie unter Tabellennamen in der Dokumentation zum BigQuery-E/A-Connector.
Lesen von TableRow
-Objekten
Mit der Methode readTableRows
werden BigQuery-Daten in eine PCollection
von TableRow
-Objekten gelesen. Jedes TableRow
ist eine Zuordnung von Schlüssel/Wert-Paaren, die eine einzelne Zeile von Tabellendaten enthält. Geben Sie die BigQuery-Tabelle an, die gelesen werden soll, indem Sie die Methode from
aufrufen.
Mit dem folgenden Code wird PCollection<TableRows>
aus einer BigQuery-Tabelle gelesen.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Außerdem wird gezeigt, wie auf die Werte aus dem TableRow
-Wörterbuch zugegriffen wird.
Ganzzahlwerte werden als Strings codiert, um dem exportierten JSON-Format von BigQuery zu entsprechen.
Spaltenprojektion und -filterung
Wenn Sie direkte Lesevorgänge (Method.DIRECT_READ
) verwenden, können Sie die Lesevorgänge effizienter gestalten, indem Sie die Menge der Daten reduzieren, die aus BigQuery gelesen und über das Netzwerk gesendet werden.
- Spaltenprojektion: Rufen Sie
withSelectedFields
auf, um eine Teilmenge von Spalten aus der Tabelle zu lesen. Dies ermöglicht effiziente Lesevorgänge, wenn Tabellen viele Spalten enthalten. - Zeilenfilterung: Rufen Sie
withRowRestriction
auf, um ein Prädikat anzugeben, mit dem Daten auf der Serverseite gefiltert werden.
Filterprädikate müssen deterministisch sein und Aggregationen werden nicht unterstützt.
Im folgenden Beispiel werden die Spalten "user_name"
und "age"
projiziert und Zeilen herausgefiltert, die nicht mit dem Prädikat "age > 18"
übereinstimmen.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Aus einem Abfrageergebnis lesen
In den vorherigen Beispielen wurde gezeigt, wie Zeilen aus einer Tabelle gelesen werden. Sie können auch das Ergebnis einer SQL-Abfrage lesen, indem Sie fromQuery
aufrufen. Bei diesem Ansatz wird ein Teil der Rechenarbeit in BigQuery verlagert. Sie können diese Methode auch verwenden, um Daten aus einer BigQuery-Ansicht oder einer materialisierten Ansicht abzurufen, indem Sie eine Abfrage für die Ansicht ausführen.
Im folgenden Beispiel wird eine Abfrage für ein öffentliches BigQuery-Dataset ausgeführt und die Ergebnisse werden gelesen. Nachdem die Pipeline ausgeführt wurde, können Sie den Abfragejob im BigQuery-Jobverlauf sehen.
Java
Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.
Nächste Schritte
- Lesen Sie die Dokumentation zum Bigtable-E/A-Connector.
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.