Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub (Stream)"

Die Vorlage "Change Data Capture von MySQL für BigQuery mit Debezium und Pub/Sub" ist eine Streamingpipeline, die Pub/Sub-Nachrichten mit Änderungsdaten aus einer MySQL-Datenbank liest und die Datensätze in BigQuery schreibt. Ein Debezium-Connector erfasst Änderungen an der MySQL-Datenbank und veröffentlicht die geänderten Daten in Pub/Sub. Die Vorlage liest dann die Pub/Sub-Nachrichten und schreibt sie in BigQuery.

Über diese Vorlage können Sie MySQL-Datenbanken und BigQuery-Tabellen miteinander synchronisieren. Die Pipeline schreibt die geänderten Daten in eine BigQuery-Staging-Tabelle und aktualisiert in regelmäßigen Abständen eine BigQuery-Tabelle zu Replikation der MySQL-Datenbank.

Pipelineanforderungen

  • Der Debezium-Connector muss bereitgestellt sein.
  • Die Pub/Sub-Nachrichten müssen in einer Beam Row serialisiert sein.

Vorlagenparameter

Erforderliche Parameter

  • inputSubscriptions: Die durch Kommas getrennte Liste der Pub/Sub-Eingabeabos, aus denen gelesen werden soll, im Format <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset: Das BigQuery-Dataset, in dem die Staging-Tabellen gespeichert werden sollen, im Format <DATASET_NAME>.
  • replicaDataset: Der Speicherort des BigQuery-Datasets, in dem die Replikattabellen gespeichert werden sollen, im Format <DATASET_NAME>.

Optionale Parameter

  • inputTopics: Durch Kommas getrennte Liste von PubSub-Themen, an die CDC-Daten gesendet werden.
  • updateFrequencySecs: Das Intervall, in dem die Pipeline die BigQuery-Tabelle zur Replikation der MySQL-Datenbank aktualisiert.
  • useSingleTopic: Legen Sie diesen Wert auf true fest, wenn Sie den Debezium-Connector so konfigurieren, dass alle Tabellenaktualisierungen in einem einzigen Thema veröffentlicht werden. Die Standardeinstellung ist "false".
  • useStorageWriteApi: Wenn „true“, verwendet die Pipeline die BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Der Standardwert ist false. Weitere Informationen finden Sie unter „Storage Write API verwenden“ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Gibt bei Verwendung der Storage Write API die Schreibsemantik an. Wenn Sie die "Mindestens einmal"-Semantik verwenden möchten (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), legen Sie diesen Parameter auf true fest. Wenn Sie die "Genau einmal"-Semantik verwenden möchten, legen Sie den Parameter auf false fest. Dieser Parameter gilt nur, wenn useStorageWriteApi true ist. Der Standardwert ist false.
  • numStorageWriteApiStreams: Gibt bei Verwendung der Storage Write API die Anzahl der Schreibstreams an. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen. Die Standardeinstellung ist 0.
  • storageWriteApiTriggeringFrequencySec: Wenn Sie die Storage Write API verwenden, wird die Triggerhäufigkeit in Sekunden angegeben. Wenn useStorageWriteApi true und useStorageWriteApiAtLeastOnce false ist, müssen Sie diesen Parameter festlegen.

Führen Sie die Vorlage aus.

Führen Sie die folgenden Schritte aus, um diese Vorlage auszuführen:

  1. Klonen Sie das DataflowTemplates-Repository auf Ihren lokalen Computer.
  2. Wechseln Sie zum Verzeichnis v2/cdc-parent.
  3. Achten Sie darauf, dass der Debezium-Connector bereitgestellt ist.
  4. Führen Sie mit Maven die Dataflow-Vorlage aus.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Ersetzen Sie Folgendes:

    • PROJECT_ID: die Google Cloud -Projekt-ID, in der Sie den Dataflow-Job ausführen möchten
    • SUBSCRIPTIONS: Ihre durch Kommas getrennte Liste von Pub/Sub-Abonamen.
    • CHANGELOG_DATASET: Ihr BigQuery-Dataset für Änderungslogdaten.
    • REPLICA_DATASET: Ihr BigQuery-Dataset für Replikattabellen.

Nächste Schritte