Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.
Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.
Pipelineanforderungen
- Der Debezium-Connector muss bereitgestellt sein.
- Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein.
Vorlagenparameter
Erforderliche Parameter
- inputSubscriptions: Die durch Kommas getrennte Liste der Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format
<SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...
. - changeLogDataset: Das BigQuery-Dataset, in dem die Staging-Tabellen gespeichert werden sollen, im Format <DATASET_NAME>.
- replicaDataset: Der Speicherort des BigQuery-Datasets, in dem die Replikattabellen gespeichert werden sollen, im Format <DATASET_NAME>.
Optionale Parameter
- inputTopics: Durch Kommas getrennte Liste von PubSub-Themen, an die CDC-Daten gesendet werden.
- updateFrequencySecs: Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert.
- useSingleTopic: Legen Sie diesen Wert auf
true
fest, wenn Sie den Debezium-Connector so konfigurieren, dass alle Tabellenaktualisierungen in einem einzigen Thema veröffentlicht werden. Die Standardeinstellung ist "false". - useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist
false
. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf
true
fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auffalse
fest. Dieser Parameter gilt nur, wennuseStorageWriteApi
true
ist. Der Standardwert istfalse
. - numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0. - storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn
useStorageWriteApi
true
unduseStorageWriteApiAtLeastOnce
false
ist, müssen Sie diesen Parameter festlegen.
Führen Sie die Vorlage aus.
Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:
- Klonen Sie das DataflowTemplates-Repository auf Ihren lokalen Computer.
- Wechseln Sie zum Verzeichnis
v2/cdc-parent
. - Achten Sie darauf, dass der Debezium-Connector bereitgestellt ist.
- Führen Sie mit Maven die Dataflow-Vorlage aus.
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
Ersetzen Sie Folgendes:
PROJECT_ID
: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchtenSUBSCRIPTIONS
: Ihre durch Kommas getrennte Liste von Pub/Sub-Abonamen.CHANGELOG_DATASET
: Ihr BigQuery-Dataset für Änderungslogdaten.REPLICA_DATASET
: Ihr BigQuery-Dataset für Replikattabellen.
Nächste Schritte
- Dataflow-Vorlagen
- Sehen Sie sich die Liste der von Google bereitgestellten Vorlagen an.