Die Vorlage "Bigtable-Änderungsstreams für Pub/Sub" ist eine Streamingpipeline, die Bigtable-Datenänderungsdatensätze streamt und sie mithilfe von Dataflow in einem Pub/Sub-Thema veröffentlicht.
Mit einem Bigtable-Änderungsstream können Sie Datenmutationen auf Tabellenbasis abonnieren. Wenn Sie Änderungsstreams für Tabellen abonnieren, gelten die folgenden Einschränkungen:
- Es werden nur geänderte Zellen und Deskriptoren von Löschvorgängen zurückgegeben.
- Es wird nur der neue Wert einer geänderten Zelle zurückgegeben.
Wenn Datensätze zur Datenänderung in einem Pub/Sub-Thema veröffentlicht werden, könnten Nachrichten in der falschen Reihenfolge eingefügt werden im Vergleich zur ursprünglichen Reihenfolge des Bigtable-Commits.
Bigtable-Datenänderungseinträge, die nicht in Pub/Sub-Themen veröffentlicht werden können, werden vorübergehend in einem Verzeichnis für Dead-Letter-Warteschlangen (unverarbeitete Nachrichtenwarteschlange) in Cloud Storage abgelegt. Nach der maximalen Anzahl fehlgeschlagener Wiederholungsversuche werden diese Einträge für eine manuelle Überprüfung oder weitere Verarbeitung durch den Nutzer auf unbestimmte Zeit im selben Warteschlangenverzeichnis für unzustellbare Nachrichten abgelegt.
Für die Pipeline muss das Pub/Sub-Zielthema vorhanden sein. Das Zielthema kann so konfiguriert sein, dass Nachrichten mithilfe eines Schemas validiert werden. Wenn ein Pub/Sub-Thema ein Schema angibt, wird die Pipeline nur gestartet, wenn das Schema gültig ist. Verwenden Sie je nach Schematyp eine der folgenden Schemadefinitionen für das Zielthema:
Log-Puffer
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Verwenden Sie das folgende Protobuf-Schema mit JSON
-Nachrichtencodierung:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Jede neue Pub/Sub-Nachricht enthält einen Eintrag aus einem Datensatz zu Datenänderungen aus der entsprechenden Zeile in Ihrer Bigtable-Tabelle. Die Pub/Sub-Vorlage vereinfacht die Einträge in jedem Datenänderungsdatensatz zu einzelnen Änderungen auf Zellenebene.
Beschreibung der Pub/Sub-Ausgabenachricht
Feldname | Beschreibung |
---|---|
rowKey |
Der Zeilenschlüssel der geänderten Zeile. Er kommt in Form eines Byte-Arrays an. Wenn die JSON-Nachrichtencodierung konfiguriert ist, werden Zeilenschlüssel als Strings zurückgegeben. Wenn useBase64Rowkeys angegeben ist, sind Zeilenschlüssel Base64-codiert. Andernfalls wird ein durch bigtableChangeStreamCharset angegebenes Zeichensatz verwendet, um Zeilenschlüsselbyte in einen String zu decodieren. |
modType |
Der Typ der Zeilenmutation. Verwenden Sie einen der folgenden Werte: SET_CELL , DELETE_CELLS oder DELETE_FAMILY . |
columnFamily |
Die Spaltenfamilie, die von der Zeilenmutation betroffen ist. |
column |
Der Spaltenqualifizierer, der von der Zeilenmutation betroffen ist. Für den Mutationstyp DELETE_FAMILY ist das Spaltenfeld nicht festgelegt. Er kommt in Form eines Byte-Arrays an. Wenn die JSON-Nachrichtencodierung konfiguriert ist, werden Spalten als Strings zurückgegeben. Wenn useBase64ColumnQualifier angegeben ist, ist das Spaltenfeld Base64-codiert. Andernfalls wird ein durch bigtableChangeStreamCharset angegebenes Zeichensatz verwendet, um Zeilenschlüsselbyte in einen String zu decodieren. |
commitTimestamp |
Der Zeitpunkt, an dem Bigtable die Mutation anwendet. Die Zeit wird in Mikrosekunden seit der Unix-Epoche gemessen (1. Januar 1970 um UTC). |
timestamp |
Der Zeitstempelwert der Zelle, die von der Mutation betroffen ist. Für die Mutationstypen DELETE_CELLS und DELETE_FAMILY ist kein Zeitstempel festgelegt. Die Zeit wird in Mikrosekunden seit der Unix-Epoche gemessen (1. Januar 1970 um UTC). |
timestampFrom |
Beschreibt einen inklusiven Beginn des Zeitstempelintervalls für alle Zellen, die durch die Mutation DELETE_CELLS gelöscht wurden. Bei anderen Mutationstypen ist timestampFrom nicht festgelegt. Die Zeit wird in Mikrosekunden seit der Unix-Epoche gemessen (1. Januar 1970 um UTC). |
timestampTo |
Beschreibt ein exklusives Ende des Zeitstempelintervalls für alle Zellen, die durch die Mutation DELETE_CELLS gelöscht wurden. Bei anderen Mutationstypen ist timestampTo nicht festgelegt. |
isGC |
Ein boolescher Wert, mit dem angegeben wird, ob die Mutation von einem Speicherbereinigungsmechanismus von Bigtable generiert wird. |
tieBreaker |
Wenn zwei Mutationen von verschiedenen Bigtable-Clustern gleichzeitig registriert werden, wird die Mutation mit dem höchsten tiebreaker -Wert auf die Quelltabelle angewendet. Mutationen mit niedrigeren tiebreaker -Werten werden verworfen. |
value |
Der von der Mutation festgelegte neue Wert. Sofern die Pipelineoption stripValues nicht festgelegt ist, wird der Wert für SET_CELL -Mutationen festgelegt. Bei anderen Mutationstypen ist der Wert nicht festgelegt. Er kommt in Form eines Byte-Arrays an. Wenn die JSON-Nachrichtencodierung konfiguriert ist, werden Werte als Strings zurückgegeben.
Wenn useBase64Values angegeben ist, ist der Wert Base64-codiert. Andernfalls wird ein durch bigtableChangeStreamCharset angegebenes Zeichensatz verwendet, um Wertbyte in einen String zu decodieren. |
sourceInstance |
Der Name der Bigtable-Instanz, die die Mutation registriert hat. Kann vorkommen, wenn mehrere Pipelines Änderungen aus verschiedenen Instanzen zum selben Pub/Sub-Thema streamen. |
sourceCluster |
Der Name des Bigtable-Clusters, der die Mutation registriert hat. Kann verwendet werden, wenn mehrere Pipelines Änderungen von verschiedenen Instanzen zum selben Pub/Sub-Thema streamen. |
sourceTable |
Der Name der Bigtable-Tabelle, die die Mutation erhalten hat. Kann verwendet werden, wenn mehrere Pipelines Änderungen von verschiedenen Tabellen zum selben Pub/Sub-Thema streamen. |
Pipelineanforderungen
- Die angegebene Bigtable-Quellinstanz.
- Die angegebene Bigtable-Quelltabelle In der Tabelle müssen Änderungsstreams aktiviert sein.
- Das angegebene Bigtable-Anwendungsprofil.
- Das angegebene Pub/Sub-Thema muss vorhanden sein.
Vorlagenparameter
Erforderliche Parameter
- pubSubTopic : Der Name des Pub/Sub-Zielthemas.
- bigtableChangeStreamAppProfile : Die Bigtable-Anwendungsprofil-ID. Das Anwendungsprofil muss Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen zulassen.
- bigtableReadInstanceId: Die Bigtable-Quellinstanz-ID.
- bigtableReadTableId: Die Bigtable-Quelltabellen-ID.
Optionale Parameter
- messageEncoding : Die Codierung der Nachrichten, die im Pub/Sub-Thema veröffentlicht werden sollen. Wenn das Schema des Zielthemas konfiguriert ist, wird die Nachrichtencodierung durch die Themeneinstellungen bestimmt. Die folgenden Werte werden unterstützt:
BINARY
undJSON
. Die Standardeinstellung istJSON
. - messageFormat : Die Codierung der Nachrichten, die im Pub/Sub-Thema veröffentlicht werden sollen. Wenn das Schema des Zielthemas konfiguriert ist, wird die Nachrichtencodierung durch die Themeneinstellungen bestimmt. Die folgenden Werte werden unterstützt:
AVRO
,PROTOCOL_BUFFERS
undJSON
. Der Standardwert istJSON
. Wenn das FormatJSON
verwendet wird, sind die Zeilen- und Wertfelder der Nachricht Strings, deren Inhalt durch die PipelineoptionenuseBase64Rowkeys
,useBase64ColumnQualifiers
,useBase64Values
undbigtableChangeStreamCharset
bestimmt wird. - stripValues : Wenn dieser Wert auf „true“ gesetzt ist, werden die SET_CELL-Mutationen ohne neue Werte zurückgegeben. Die Standardeinstellung ist "false". Dieser Parameter ist nützlich, wenn Sie keinen neuen Wert haben müssen, der als Cache-Entwertung bezeichnet wird, oder wenn die Werte extrem groß sind und die Größenbeschränkungen für Pub/Sub-Nachrichten überschreiten.
- dlqDirectory : Das Verzeichnis für die Warteschlange für unzustellbare Nachrichten. Datensätze, die nicht verarbeitet werden können, werden in diesem Verzeichnis gespeichert. Die Standardeinstellung ist ein Verzeichnis unter dem temporären Dataflow-Job. In den meisten Fällen können Sie den Standardpfad verwenden.
- dlqRetryMinutes: Die Anzahl der Minuten zwischen DLQ-Wiederholungen (Dead Letter Queue). Die Standardeinstellung ist
10
. - dlqMaxRetries : Die maximale Anzahl von Wiederholungsversuchen für unzustellbare Nachrichten. Die Standardeinstellung ist
5
. - useBase64Rowkeys : Wird mit der JSON-Nachrichtencodierung verwendet. Wenn das Feld
rowKey
auftrue
gesetzt ist, ist es ein Base64-codierter String. Andernfalls wirdrowKey
durch die Decodierung von Byte in einen String mitbigtableChangeStreamCharset
erzeugt. Die Standardeinstellung istfalse
. - pubSubProjectId : Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.
- useBase64ColumnQualifiers : Wird mit der JSON-Nachrichtencodierung verwendet. Wenn das Feld
column
auftrue
gesetzt ist, ist es ein Base64-codierter String. Andernfalls wird die Spalte mithilfe vonbigtableChangeStreamCharset
erstellt, um Byte in einen String zu decodieren. Die Standardeinstellung istfalse
. - useBase64Values : Wird mit JSON-Nachrichtencodierung verwendet. Wenn
true
festgelegt ist, ist das Wertfeld ein Base64-codierter String. Andernfalls wird der Wert mithilfe vonbigtableChangeStreamCharset
erzeugt, um Byte in einen String zu decodieren. Die Standardeinstellung istfalse
. - disableDlqRetries: Gibt an, ob Wiederholungsversuche für den DLQ deaktiviert werden sollen. Die Standardeinstellung ist "false".
- bigtableChangeStreamMetadataInstanceId : Die Metadateninstanz-ID des Bigtable-Änderungsstreams. Die Standardeinstellung ist leer.
- bigtableChangeStreamMetadataTableTableId: Die ID der Metadatentabelle des Bigtable-Änderungsstream-Connectors. Wenn nicht angegeben, wird während der Pipelineausführung automatisch eine Metadatentabelle für einen Bigtable-Änderungsstream-Connector erstellt. Die Standardeinstellung ist leer.
- bigtableChangeStreamCharset: Der Zeichensatzname des Bigtable-Änderungsstreams. Standardmäßig ist dies auf UTF8 eingestellt.
- bigtableChangeStreamStartTimestamp: Der Startzeitstempel (https://tools.ietf.org/html/rfc3339) (einschließlich), der zum Lesen von Änderungsstreams verwendet wird. Beispiel:
2022-05-05T07:59:59Z
Die Standardeinstellung ist der Zeitstempel der Pipeline-Startzeit. - bigtableChangeStreamIgnoreColumnFamilies: Eine durch Kommas getrennte Liste von Änderungen an den Namen der Spaltenfamilien, die ignoriert werden sollen. Die Standardeinstellung ist leer.
- bigtableChangeStreamIgnoreColumns: Eine durch Kommas getrennte Liste von Änderungen der Spaltennamen, die ignoriert werden sollen. Die Standardeinstellung ist leer.
- bigtableChangeStreamName: Ein eindeutiger Name für die Client-Pipeline. Ermöglicht die Fortsetzung der Verarbeitung ab dem Punkt, an dem eine zuvor ausgeführte Pipeline beendet wurde. Standardmäßig wird ein automatisch generierter Name verwendet. Den verwendeten Wert finden Sie in den Dataflow-Joblogs.
- bigtableChangeStreamResume: Wenn dieser Wert auf
true
gesetzt ist, setzt eine neue Pipeline die Verarbeitung ab dem Punkt fort, an dem eine zuvor ausgeführte Pipeline mit demselbenbigtableChangeStreamName
-Wert gestoppt wurde. Wenn die Pipeline mit dem angegebenenbigtableChangeStreamName
-Wert noch nie ausgeführt wurde, wird keine neue Pipeline gestartet. Wennfalse
festgelegt ist, wird eine neue Pipeline gestartet. Wenn für die angegebene Quelle bereits eine Pipeline mit demselbenbigtableChangeStreamName
-Wert ausgeführt wurde, wird keine neue Pipeline gestartet. Die Standardeinstellung istfalse
. - bigtableReadProjectId: Die Bigtable-Projekt-ID. Der Standardwert ist das Projekt für den Dataflow-Job.
Führen Sie die Vorlage aus.
Console
- Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf. Zur Seite "Job aus Vorlage erstellen“
- Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
- Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist
us-central1
.Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.
- Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Bigtable change streams to Pub/Sub templateaus.
- Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
- Klicken Sie auf Job ausführen.
gcloud
Führen Sie die Vorlage in der Shell oder im Terminal aus:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
REGION_NAME
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
BIGTABLE_INSTANCE_ID
: Ihre Bigtable-Instanz-ID.BIGTABLE_TABLE_ID
: Ihre Bigtable-Tabellen-ID.BIGTABLE_APPLICATION_PROFILE_ID
: Ihre Bigtable-Anwendungsprofil-ID.PUBSUB_TOPIC
: der Name des Pub/Sub-Zielthemas
API
Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Ersetzen Sie dabei Folgendes:
PROJECT_ID
: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchtenJOB_NAME
: ein eindeutiger Jobname Ihrer WahlVERSION
: Die Version der Vorlage, die Sie verwenden möchtenSie können die folgenden Werte verwenden:
latest
zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/- Den Versionsnamen wie
2023-09-12-00_RC00
, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
LOCATION
: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.us-central1
BIGTABLE_INSTANCE_ID
: Ihre Bigtable-Instanz-ID.BIGTABLE_TABLE_ID
: Ihre Bigtable-Tabellen-ID.BIGTABLE_APPLICATION_PROFILE_ID
: Ihre Bigtable-Anwendungsprofil-ID.PUBSUB_TOPIC
: der Name des Pub/Sub-Zielthemas
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.