Die Vorlage „Spanner-Änderungsstreams für BigQuery“ ist eine Streamingpipeline, die Datenänderungsdatensätze von Spanner streamt und sie mithilfe von Dataflow Runner V2 in BigQuery-Tabellen schreibt.
Alle Spalten zum Beobachten von Änderungsstreams sind in jeder BigQuery-Tabellenzeile enthalten, unabhängig davon, ob sie durch eine Spanner-Transaktion geändert werden. Nicht beobachtete Spalten sind nicht in der BigQuery-Zeile enthalten. Alle Spanner-Änderungen, die kleiner als das Dataflow-Wasserzeichen sind, werden entweder erfolgreich auf die BigQuery-Tabellen angewendet oder in der Dead-Letter-Warteschlange zur Wiederholung gespeichert. BigQuery-Zeilen werden im Vergleich zur ursprünglichen Reihenfolge des Spanner Commit-Zeitstempels in der falschen Reihenfolge eingefügt.
Wenn die erforderlichen BigQuery-Tabellen nicht vorhanden sind, werden sie von der Pipeline erstellt. Andernfalls werden vorhandene BigQuery-Tabellen verwendet. Das Schema vorhandener BigQuery-Tabellen muss die entsprechenden nachverfolgten Spalten der Spanner-Tabellen und alle zusätzlichen Metadatenspalten enthalten, die nicht explizit von der Option ignoreFields
ignoriert werden.
Eine Beschreibung der Metadatenfelder finden Sie in der folgenden Liste.
Jede neue BigQuery-Zeile enthält alle Spalten, die vom Änderungsstream aus der entsprechenden Zeile in Ihrer Spanner-Tabelle zum Zeitstempel des Änderungsdatensatzes beobachtet werden.
Die folgenden Metadatenfelder werden zu BigQuery-Tabellen hinzugefügt: Weitere Informationen zu diesen Feldern finden Sie unter Datenänderungsdatensätze in „Änderungsstream-Partitionen, -Datensätze und -Abfragen”.
_metadata_spanner_mod_type
: der Änderungstyp (Einfügen, Aktualisieren oder Löschen) der Spanner-Transaktion. Extrahiert aus dem Änderungsstream-Datenänderungsdatensatz._metadata_spanner_table_name
: der Name der Spanner-Tabelle. Dieses Feld ist nicht der Name der Metadatentabelle des Connectors._metadata_spanner_commit_timestamp
: Der Commit-Zeitstempel von Spanner ist die Zeit, zu der eine Änderung per Commit festgeschrieben wird. Dieser Wert wird aus dem Änderungsstream-Datensatz extrahiert._metadata_spanner_server_transaction_id
: Ein global eindeutiger String, der die Spanner-Transaktion darstellt, in der die Änderung per Commit übergeben wurde. Verwenden Sie diesen Wert nur im Zusammenhang mit der Verarbeitung von Änderungsstream-Datensätzen. Sie ist nicht mit der Transaktions-ID in der Spanner API korreliert. Dieser Wert wird aus dem Änderungsstream-Datensatz extrahiert._metadata_spanner_record_sequence
: Die Sequenznummer für den Datensatz innerhalb der Spanner-Transaktion. Sequenznummern sind innerhalb einer Transaktion eindeutig und kontinuierlich ansteigend (aber nicht unbedingt fortlaufend). Dieser Wert wird aus dem Änderungsstream-Datensatz extrahiert._metadata_spanner_is_last_record_in_transaction_in_partition
: Gibt an, ob der Datensatz der letzte Datensatz für eine Spanner-Transaktion in der aktuellen Partition ist. Dieser Wert wird aus dem Änderungsstream-Datensatz extrahiert._metadata_spanner_number_of_records_in_transaction
: Die Anzahl der Datenänderungs-Datensätze, die Teil der Spanner-Transaktion in allen Änderungsstream-Partitionen sind. Dieser Wert wird aus dem Änderungsstream-Datensatz extrahiert._metadata_spanner_number_of_partitions_in_transaction
: Die Anzahl der Partitionen, die Datenänderungsdatensätze für die Spanner-Transaktion zurückgeben. Dieser Wert wird aus dem Änderungsstream-Datensatz extrahiert._metadata_big_query_commit_timestamp
: Der Commit-Zeitstempel, wenn die Zeile in BigQuery eingefügt wird. Wenn füruseStorageWriteApi
der Werttrue
festgelegt ist, wird diese Spalte nicht automatisch von der Pipeline in der Änderungslogtabelle erstellt. In diesem Fall müssen Sie diese Spalte bei Bedarf manuell in der Tabelle des Änderungslogs hinzufügen.
Beachten Sie bei der Verwendung dieser Vorlage die folgenden Details:
- Mit dieser Vorlage können Sie neue Spalten aus vorhandenen Tabellen oder neue Tabellen von Spanner nach BigQuery übertragen. Weitere Informationen finden Sie unter Tracking-Tabellen oder ‑Spalten hinzufügen.
- Bei den Werterfassungstypen
OLD_AND_NEW_VALUES
undNEW_VALUES
muss die Vorlage, wenn der Datensatz eine UPDATE-Änderung enthält, einen veralteten Lesevorgang in Spanner zum Commit-Zeitstempel des Datensatzes durchführen, um die unveränderten, aber überwachten Spalten abzurufen. Prüfen Sie, ob Sie die „version_retention_period“ in Ihrer Datenbank richtig konfiguriert haben, um veraltete Daten lesen zu können. Für den WerterfassungstypNEW_ROW
ist die Vorlage effizienter, da der Datensatz zur Datenänderung die vollständige neue Zeile erfasst, einschließlich Spalten, die in UPDATE-Anfragen nicht aktualisiert werden, und die Vorlage keinen veralteten Lesevorgang ausführen muss. - Führen Sie den Dataflow-Job in derselben Region wie Ihre Spanner-Instanz oder BigQuery-Tabellen aus, um die Netzwerklatenz und die Netzwerktransportkosten zu minimieren. Wenn Sie Quellen und Senken sowie Speicherorte für Staging-Dateien und temporäre Dateien verwenden, die sich außerhalb der Region Ihres Jobs befinden, werden Ihre Daten möglicherweise regionenübergreifend gesendet. Weitere Informationen finden Sie unter Dataflow-Regionen.
- Diese Vorlage unterstützt alle gültigen Spanner-Datentypen. Wenn der BigQuery-Typ jedoch genauer ist als der Spanner-Typ, kann während der Transformation ein Genauigkeitsverlust auftreten. Insbesondere:
- Im Fall von JSON-Typen in Spanner wird die Reihenfolge der Mitglieder eines Objekts lexikografisch angeordnet. Es gibt jedoch keine Garantie dafür.
- Spanner unterstützt den TIMESTAMP-Typ „Nanosekunden”, aber BigQuery unterstützt nur den TIMESTAMP-Typ „Mikrosekunden”.
- Diese Vorlage unterstützt die Verwendung der BigQuery Storage Write API im „Genau einmal“-Modus ni ht.
Weitere Informationen zu Änderungsstreams, zum Erstellen von Dataflow-Pipelines für Änderungsstreams und Best Practices
Pipelineanforderungen
- Die Spanner-Instanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
- Die Spanner-Datenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
- Die Spanner-Metadateninstanz muss vorhanden sein, bevor Sie die Pipeline ausführen.
- Die Spanner-Metadatendatenbank muss vorhanden sein, bevor Sie die Pipeline ausführen.
- Der Spanner-Änderungsstream muss vorhanden sein, bevor Sie die Pipeline ausführen.
- Das BigQuery-Dataset muss vorhanden sein, bevor Sie die Pipeline ausführen.
Tracking-Tabellen oder ‑Spalten hinzufügen
In diesem Abschnitt werden Best Practices für das Hinzufügen von Tracking-Spanner-Tabellen und ‑Spalten während der Ausführung der Pipeline beschrieben.
- Bevor Sie dem Bereich eines Spanner-Änderungsstreams eine neue Spalte hinzufügen, fügen Sie die Spalte zuerst der BigQuery-Änderungstabelle hinzu. Die hinzugefügte Spalte muss denselben Datentyp haben und
NULLABLE
sein. Warten Sie mindestens 10 Minuten, bevor Sie mit dem Erstellen der neuen Spalte oder Tabelle in Spanner fortfahren. Wenn Sie ohne Warten in die neue Spalte schreiben, kann das zu einem unverarbeiteten Datensatz mit dem Fehlercode ungültig im Verzeichnis der Dead-Letter-Warteschlange führen. - Wenn Sie eine neue Tabelle hinzufügen möchten, fügen Sie sie zuerst der Spanner-Datenbank hinzu. Die Tabelle wird automatisch in BigQuery erstellt, wenn die Pipeline einen Datensatz für die neue Tabelle empfängt.
- Nachdem Sie die neuen Spalten oder Tabellen in die Spanner-Datenbank eingefügt haben, müssen Sie Ihren Änderungsstream ändern, um die gewünschten neuen Spalten oder Tabellen zu erfassen, sofern sie nicht bereits implizit erfasst werden.
- Die Vorlage löscht weder Tabellen noch Spalten in BigQuery. Wird eine Spalte aus der Spanner-Tabelle gelöscht, werden in die Spalten des BigQuery-Änderungsprotokolls für Datensätze, die nach dem Löschen der Spalten aus der Spanner-Tabelle generiert wurden, Nullwerte eingefügt, es sei denn, Sie entfernen die Spalte manuell aus BigQuery.
- Die Vorlage unterstützt keine Aktualisierungen des Spaltentyps. Spanner unterstützt zwar das Ändern einer
STRING
-Spalte in eineBYTES
-Spalte oder einerBYTES
-Spalte in eineSTRING
-Spalte, aber Sie können weder den Datentyp einer vorhandenen Spalte ändern noch denselben Spaltennamen mit unterschiedlichen Datentypen in BigQuery verwenden. Wenn Sie eine Spalte mit demselben Namen, aber einem anderen Typ in Spanner löschen und neu erstellen, werden die Daten möglicherweise in die vorhandene BigQuery-Spalte geschrieben, der Typ bleibt dabei jedoch unverändert. - Diese Vorlage unterstützt keine Aktualisierungen des Spaltenmodus. Metadatenspalten, die in BigQuery repliziert werden, sind auf den
REQUIRED
-Modus festgelegt. Alle anderen Spalten, die in BigQuery repliziert werden, werden aufNULLABLE
gesetzt, unabhängig davon, ob sie in der Spanner-Tabelle alsNOT NULL
definiert sind. Sie können dieNULLABLE
-Spalten nicht auf denREQUIRED
-Modus in BigQuery aktualisieren. - Das Ändern des Werterfassungstyps eines Änderungsstreams wird für laufende Pipelines nicht unterstützt.
Vorlagenparameter
Erforderliche Parameter
- spannerInstanceId : Die Spanner-Instanz, aus der Änderungsstreams gelesen werden sollen.
- spannerDatabase : Die Spanner-Datenbank, aus der Änderungsstreams gelesen werden sollen.
- spannerMetadataInstanceId : Die Spanner-Instanz, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
- spannerMetadataDatabase : Die Spanner-Datenbank, die für die Metadatentabelle des Connectors für Änderungsstreams verwendet werden soll.
- spannerChangeStreamName : Der Name des Spanner-Änderungsstreams, aus dem gelesen werden soll.
- bigQueryDataset: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.
Optionale Parameter
- spannerProjectId : Das Projekt, aus dem Änderungsstreams gelesen werden. Dieser Wert ist auch das Projekt, in dem die Metadatentabelle des Änderungsstream-Connectors erstellt wird. Der Standardwert für diesen Parameter ist das Projekt, in dem die Dataflow-Pipeline ausgeführt wird.
- spannerDatabaseRole : Die Spanner-Datenbankrolle, die beim Ausführen der Vorlage verwendet werden soll. Dieser Parameter ist nur erforderlich, wenn das IAM-Hauptkonto, das die Vorlage ausführt, ein Nutzer für die Zugriffssteuerung ist. Die Datenbankrolle muss die Berechtigung SELECT für den Änderungsstream und die Berechtigung EXECUTE für die Lesefunktion des Änderungsstreams haben. Weitere Informationen finden Sie unter „Detaillierte Zugriffssteuerung für Änderungsstreams“ (https://cloud.google.com/spanner/docs/fgac-change-streams).
- spannerMetadataTableName : Der Name der zu verwendenden Connector-Metadatentabelle für Spanner-Änderungsstreams. Wenn nicht angegeben, wird während des Pipelineablaufs automatisch eine Metadatentabelle für Spanner-Änderungsstreams erstellt. Sie müssen diesen Parameter beim Aktualisieren einer vorhandenen Pipeline angeben. Geben Sie andernfalls diesen Parameter nicht an.
- rpcPriority : Die Anfragepriorität für Spanner-Aufrufe. Der Wert muss einer der folgenden Werte sein:
HIGH
,MEDIUM
oderLOW
. Der Standardwert istHIGH
. - spannerHost : Der Cloud Spanner-Endpunkt, der in der Vorlage aufgerufen werden soll. Wird nur zum Testen verwendet. (Beispiel: https://batch-spanner.googleapis.com).
- startTimestamp : Die Start-DateTime (https://datatracker.ietf.org/doc/html/rfc3339) (einschließlich), die zum Lesen von Änderungsstreams verwendet wird. Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist der Zeitstempel für den Start der Pipeline, d. h. die aktuelle Zeit.
- endTimestamp : Die End-DateTime (https://datatracker.ietf.org/doc/html/rfc3339) (einschließlich), die zum Lesen von Änderungsstreams verwendet wird.Ex-2021-10-12T07:20:50.52Z. Die Standardeinstellung ist eine unendliche Zeit in der Zukunft.
- bigQueryProjectId: Das BigQuery-Projekt. Der Standardwert ist das Projekt für den Dataflow-Job.
- bigQueryChangelogTableNameTemplate : Die Vorlage für den Namen der BigQuery-Tabelle, die das Änderungslog enthält. Die Standardeinstellung ist {_metadata_spanner_table_name}_changelog.
- deadLetterQueueDirectory : Der Pfad zum Speichern nicht verarbeiteter Datensätze. Der Standardpfad ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. Der Standardwert ist in der Regel ausreichend.
- dlqRetryMinutes: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Der Standardwert ist 10.
- ignoreFields : Eine durch Kommas getrennte Liste von Feldern (Groß- und Kleinschreibung wird berücksichtigt) wird ignoriert. Diese Felder können Felder überwachter Tabellen oder Metadatenfelder sein, die von der Pipeline hinzugefügt werden. Ignorierte Felder werden nicht in BigQuery eingefügt. Wenn Sie das Feld "_metadata_spanner_table_name" ignorieren, wird auch der Parameter "bigQueryChangelogTableNameTemplate" ignoriert. Die Standardeinstellung ist leer.
- disableDlqRetries: Gibt an, ob Wiederholungsversuche für den DLQ deaktiviert werden sollen. Die Standardeinstellung ist "false".
- useStorageWriteApi: Wenn "true", verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist
false
. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf
true
fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auffalse
fest. Dieser Parameter gilt nur, wennuseStorageWriteApi
true
ist. Der Standardwert istfalse
. - numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0. - storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Cloud Spanner change streams to BigQuery templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \ --region REGION_NAME \ --parameters \ spannerInstanceId=SPANNER_INSTANCE_ID,\ spannerDatabase=SPANNER_DATABASE,\ spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\ spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\ spannerChangeStreamName=SPANNER_CHANGE_STREAM,\ bigQueryDataset=BIGQUERY_DATASET
Ersetzen Sie dabei Folgendes:
JOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
SPANNER_INSTANCE_ID
: Spanner-Instanz-IDSPANNER_DATABASE
: Spanner-DatenbankSPANNER_METADATA_INSTANCE_ID
: Spanner-Metadateninstanz-IDSPANNER_METADATA_DATABASE
: Spanner-MetadatendatenbankSPANNER_CHANGE_STREAM
: Spanner-ÄnderungsstreamBIGQUERY_DATASET
: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "spannerInstanceId": "SPANNER_INSTANCE_ID", "spannerDatabase": "SPANNER_DATABASE", "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID", "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE", "spannerChangeStreamName": "SPANNER_CHANGE_STREAM", "bigQueryDataset": "BIGQUERY_DATASET" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery", } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
SPANNER_INSTANCE_ID
: Spanner-Instanz-IDSPANNER_DATABASE
: Spanner-DatenbankSPANNER_METADATA_INSTANCE_ID
: Spanner-Metadateninstanz-IDSPANNER_METADATA_DATABASE
: Spanner-MetadatendatenbankSPANNER_CHANGE_STREAM
: Spanner-ÄnderungsstreamBIGQUERY_DATASET
: Das BigQuery-Dataset für die Ausgabe der Änderungsstreams.
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.