Die Vorlage „Bigtable-Änderungsstreams zu BigQuery“ ist eine Streamingpipeline, die Bigtable-Datenänderungseinträge streamt und mithilfe von Dataflow in BigQuery-Tabellen schreibt.
Mit einem Bigtable-Änderungsstream können Sie Datenmutationen auf Tabellenbasis abonnieren. Wenn Sie Änderungsstreams für Tabellen abonnieren, gelten die folgenden Einschränkungen:
- Es werden nur geänderte Zellen und Deskriptoren von Löschvorgängen zurückgegeben.
- Es wird nur der neue Wert einer geänderten Zelle zurückgegeben.
Wenn Datenänderungseinträge in BigQuery geschrieben werden, werden Zeilen möglicherweise in der falschen Reihenfolge eingefügt im Vergleich zur ursprünglichen Reihenfolge des Bigtable-Commit-Zeitstempels.
Änderungslogtabellenzeilen, die aufgrund eines persistenten Fehlers nicht in BigQuery geschrieben werden können, werden für eine manuelle Überprüfung oder weitere Verarbeitung durch den Nutzer dauerhaft in einem Warteschlangenverzeichnis für unzustellbare Nachrichten (Warteschlange für unverarbeitete Nachrichten) in Cloud Storage abgelegt.
Wenn die erforderliche BigQuery-Tabelle nicht vorhanden ist, wird sie von der Pipeline erstellt. Andernfalls wird eine vorhandene BigQuery-Tabelle verwendet. Das Schema vorhandener BigQuery-Tabellen muss die Spalten in der folgenden Tabelle enthalten.
Jede neue BigQuery-Zeile enthält einen Datenänderungseintrag, der vom Änderungsstream aus der entsprechenden Zeile in Ihrer Bigtable-Tabelle zurückgegeben wird.
Schema der BigQuery-Ausgabetabelle
Spaltenname | Typ | Nullwerte zulässig | Beschreibung |
---|---|---|---|
row_key |
STRING oder BYTES |
Nein | Der Zeilenschlüssel der geänderten Zeile. Wenn die Pipelineoption writeRowkeyAsBytes auf true gesetzt ist, muss die Spalte vom Typ BYTES sein. Verwenden Sie andernfalls den Typ STRING . |
mod_type |
STRING |
Nein | Der Typ der Zeilenmutation. Verwenden Sie einen der folgenden Werte: SET_CELL , DELETE_CELLS oder DELETE_FAMILY . |
column_family |
STRING |
Nein | Die Spaltenfamilie, die von der Zeilenmutation betroffen ist. |
column |
STRING |
Ja | Der Spaltenqualifizierer, der von der Zeilenmutation betroffen ist. Legen Sie für den Mutationstyp DELETE_FAMILY den Wert NULL fest. |
commit_timestamp |
TIMESTAMP |
Nein | Der Zeitpunkt, an dem Bigtable die Mutation anwendet. |
big_query_commit_timestamp |
TIMESTAMP |
Ja | Optional: Gibt an, wann BigQuery die Zeile in eine Ausgabetabelle schreibt. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Pipeline-Optionswert bigQueryChangelogTableFieldsToIgnore vorhanden ist. |
timestamp |
TIMESTAMP oder INT64 |
Ja | Der Zeitstempelwert der Zelle, die von der Mutation betroffen ist. Wenn die Pipelineoption writeNumericTimestamps auf true gesetzt ist, muss die Spalte vom Typ INT64 sein. Verwenden Sie andernfalls den Typ TIMESTAMP .
Legen Sie für die Mutationstypen DELETE_CELLS und DELETE_FAMILY den Wert NULL fest. |
timestamp_from |
TIMESTAMP oder INT64 |
Ja | Beschreibt den inclusive Beginn des Zeitstempelintervalls für alle Zellen, die durch die DELETE_CELLS -Mutation gelöscht wurden. Bei anderen Mutationstypen geben Sie NULL an. |
timestamp_to |
TIMESTAMP oder INT64 |
Ja | Beschreibt ein exklusives Ende des Zeitstempelintervalls für alle Zellen, die durch die DELETE_CELLS -Mutation gelöscht wurden. Bei anderen Mutationstypen geben Sie NULL an. |
is_gc |
BOOL |
Nein | Optional: Wenn die Mutation durch eine Richtlinie für die automatische Speicherbereinigung ausgelöst wird, geben Sie true ein.
In allen anderen Fällen setzen Sie false . Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Pipeline-Optionswert bigQueryChangelogTableFieldsToIgnore vorhanden ist. |
source_instance |
STRING |
Nein | Optional: Der Name der Bigtable-Instanz, aus der die Mutation stammt. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Pipeline-Optionswert bigQueryChangelogTableFieldsToIgnore vorhanden ist. |
source_cluster |
STRING |
Nein | Optional: Der Name des Bigtable-Clusters, aus dem die Mutation stammt. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Pipeline-Optionswert bigQueryChangelogTableFieldsToIgnore vorhanden ist. |
source_table |
STRING |
Nein | Optional: Der Name der Bigtable-Tabelle, auf die sich die Mutation bezieht. Der Wert in dieser Spalte kann hilfreich sein, wenn mehrere Bigtable-Tabellen Änderungen zur selben BigQuery-Tabelle streamen. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Pipeline-Optionswert bigQueryChangelogTableFieldsToIgnore vorhanden ist. |
tiebreaker |
INT64 |
Nein | Optional: Wenn zwei Mutationen von verschiedenen Bigtable-Clustern gleichzeitig registriert werden, wird die Mutation mit dem höchsten tiebreaker -Wert auf die Quelltabelle angewendet. Mutationen mit niedrigeren tiebreaker -Werten werden verworfen. Das Feld wird nicht ausgefüllt, wenn der Spaltenname im Pipeline-Optionswert bigQueryChangelogTableFieldsToIgnore vorhanden ist. |
value |
STRING oder BYTES |
Ja | Der neue Wert, der durch die Mutation festgelegt wurde. Wenn die Pipelineoption writeValuesAsBytes auf true gesetzt ist, muss die Spalte vom Typ BYTES sein. Verwenden Sie andernfalls den Typ STRING . Der Wert wird für SET_CELL Mutationen festgelegt. Bei anderen Mutationstypen ist der Wert auf NULL festgelegt. |
Pipelineanforderungen
- Die angegebene Bigtable-Quellinstanz.
- Die angegebene Bigtable-Quelltabelle In der Tabelle müssen Änderungsstreams aktiviert sein.
- Das angegebene Bigtable-Anwendungsprofil.
- Das angegebene BigQuery-Ziel-Dataset.
Vorlagenparameter
Erforderliche Parameter
- bigQueryDataset: Der Dataset-Name der BigQuery-Zieltabelle.
- bigtableChangeStreamAppProfile: Die Bigtable-Anwendungsprofil-ID. Das Anwendungsprofil muss Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen zulassen.
- bigtableReadInstanceId: Die Bigtable-Quellinstanz-ID.
- bigtableReadTableId: Die Bigtable-Quelltabellen-ID.
Optionale Parameter
- writeRowkeyAsBytes: Gibt an, ob Zeilenschlüssel als BigQuery-
BYTES
geschrieben werden sollen. Wenn dies auftrue
festgelegt ist, werden Zeilenschlüssel in die SpalteBYTES
geschrieben. Andernfalls werden Zeilenschlüssel in die SpalteSTRING
geschrieben. Die Standardeinstellung istfalse
. - writeValuesAsBytes: Wenn dieser Wert auf
true
festgelegt ist, werden Werte in eine Spalte vom Typ BYTES geschrieben, andernfalls in eine Spalte vom Typ STRING . Die Standardeinstellung ist:false
. - writeNumericTimestamps: Gibt an, ob der Bigtable-Zeitstempel als BigQuery INT64 geschrieben werden soll. Wenn dies auf
true
festgelegt ist, werden Werte in die INT64-Spalte geschrieben. Andernfalls werden Werte in die SpalteTIMESTAMP
geschrieben. Betroffene Spalten:timestamp
,timestamp_from
undtimestamp_to
. Die Standardeinstellung istfalse
. Wenn dieser Wert auftrue
gesetzt ist, wird die Zeit seit der Unix-Epoche (1. Januar 1970 um UTC) in Mikrosekunden gemessen. - bigQueryProjectId: Die Projekt-ID des BigQuery-Datasets. Der Standardwert ist das Projekt für den Dataflow-Job.
- bigQueryChangelogTableName: Name der BigQuery-Tabelle. Wenn keine Angabe erfolgt, wird der Wert
bigtableReadTableId + "_changelog"
verwendet. Die Standardeinstellung ist leer. - bigQueryChangelogTablePartitionGranularity: Gibt einen Detaillierungsgrad für die Partitionierung der Änderungslogtabelle an. Wenn dies festgelegt ist, wird die Tabelle partitioniert. Verwenden Sie einen der folgenden unterstützten Werte:
HOUR
,DAY
,MONTH
oderYEAR
. Standardmäßig ist die Tabelle nicht partitioniert. - bigQueryChangelogTablePartitionExpirationMs: Legt die Ablaufzeit der Änderungslog-Tabellenpartition in Millisekunden fest. Wenn dies auf
true
gesetzt ist, werden Partitionen gelöscht, die älter als die angegebene Anzahl an Millisekunden sind. Standardmäßig ist kein Ablaufdatum festgelegt. - bigQueryChangelogTableFieldsToIgnore: Eine durch Kommas getrennte Liste der Änderungslogspalten, die, wenn angegeben, nicht erstellt und ausgefüllt werden. Verwenden Sie einen der folgenden unterstützten Werte:
is_gc
,source_instance
,source_cluster
,source_table
,tiebreaker
oderbig_query_commit_timestamp
. Standardmäßig sind alle Spalten ausgefüllt. - dlqDirectory: Das Verzeichnis, das für die Warteschlange für unzustellbare Nachrichten verwendet werden soll. Datensätze, die nicht verarbeitet werden können, werden in diesem Verzeichnis gespeichert. Der Standardwert ist ein Verzeichnis unter dem temporären Speicherort des Dataflow-Jobs. In den meisten Fällen können Sie den Standardpfad verwenden.
- bigtableChangeStreamMetadataInstanceId: Die Metadateninstanz-ID des Bigtable-Änderungsstreams. Die Standardeinstellung ist leer.
- bigtableChangeStreamMetadataTableTableId: Die ID der Metadatentabelle des Bigtable-Änderungsstream-Connectors. Wenn nicht angegeben, wird während der Pipelineausführung automatisch eine Metadatentabelle für Bigtable-Änderungsstreams erstellt. Die Standardeinstellung ist leer.
- bigtableChangeStreamCharset: Der Zeichensatzname des Bigtable-Änderungsstreams. Standardmäßig ist dies auf UTF8 eingestellt.
- bigtableChangeStreamStartTimestamp: Der Startzeitstempel (https://tools.ietf.org/html/rfc3339) (einschließlich), der zum Lesen von Änderungsstreams verwendet wird. Beispiel:
2022-05-05T07:59:59Z
Standardmäßig ist der Zeitstempel für den Start der Pipeline festgelegt. - bigtableChangeStreamIgnoreColumnFamilies: Eine durch Kommas getrennte Liste von Änderungen an den Namen der Spaltenfamilien, die ignoriert werden sollen. Die Standardeinstellung ist leer.
- bigtableChangeStreamIgnoreColumns: Eine durch Kommas getrennte Liste von Änderungen an den Spaltennamen, die ignoriert werden sollen. Die Standardeinstellung ist leer.
- bigtableChangeStreamName: Ein eindeutiger Name für die Client-Pipeline. Damit wird die Verarbeitung ab dem Punkt fortgesetzt, an dem eine zuvor ausgeführte Pipeline beendet wurde. Standardmäßig wird ein automatisch generierter Name verwendet. Den verwendeten Wert finden Sie in den Dataflow-Joblogs.
- bigtableChangeStreamResume: Wenn dieser Wert auf
true
gesetzt ist, setzt eine neue Pipeline die Verarbeitung ab dem Punkt fort, an dem eine zuvor ausgeführte Pipeline mit demselbenbigtableChangeStreamName
-Wert gestoppt wurde. Wenn die Pipeline mit dem angegebenenbigtableChangeStreamName
-Wert noch nie ausgeführt wurde, wird keine neue Pipeline gestartet. Wenn dieser Wert auffalse
gesetzt ist, wird eine neue Pipeline gestartet. Wenn eine Pipeline mit demselbenbigtableChangeStreamName
-Wert bereits für die angegebene Quelle ausgeführt wurde, wird keine neue Pipeline gestartet. Die Standardeinstellung istfalse
. - bigtableReadProjectId: Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bigtable change streams to BigQuery templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ bigQueryDataset=BIGQUERY_DESTINATION_DATASET
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
BIGTABLE_INSTANCE_ID
: Ihre Bigtable-Instanz-ID.BIGTABLE_TABLE_ID
: Ihre Bigtable-Tabellen-ID.BIGTABLE_APPLICATION_PROFILE_ID
: Ihre Bigtable-Anwendungsprofil-ID.BIGQUERY_DESTINATION_DATASET
: Name des BigQuery-Ziel-Datasets
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_BigQuery", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "bigQueryDataset": "BIGQUERY_DESTINATION_DATASET" } } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
BIGTABLE_INSTANCE_ID
: Ihre Bigtable-Instanz-ID.BIGTABLE_TABLE_ID
: Ihre Bigtable-Tabellen-ID.BIGTABLE_APPLICATION_PROFILE_ID
: Ihre Bigtable-Anwendungsprofil-ID.BIGQUERY_DESTINATION_DATASET
: Name des BigQuery-Ziel-Datasets
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.