Modèle de capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub (Flux)
Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Le modèle Capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub est un pipeline de streaming qui lit les messages Pub/Sub avec des données modifiées provenant d'une base de données MySQL et écrit les enregistrements dans BigQuery. Un connecteur Debezium enregistre les modifications apportées à la base de données MySQL et publie les données modifiées dans Pub/Sub. Le modèle lit ensuite les messages Pub/Sub et les écrit dans BigQuery.
Vous pouvez utiliser ce modèle pour synchroniser des bases de données MySQL et des tables BigQuery. Le pipeline écrit les données modifiées dans une table de préproduction BigQuery et met à jour une table BigQuery par intermittence en répliquant la base de données MySQL.
Les messages Pub/Sub doivent être sérialisés dans une classe Beam Row.
Paramètres de modèle
Paramètres obligatoires
inputSubscriptions : liste des abonnements en entrée Pub/Sub à lire séparés par une virgule, au format <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
changeLogDataset : ensemble de données BigQuery dans lequel stocker les tables de préproduction, au format <DATASET_NAME>.
replicaDataset : emplacement de l'ensemble de données BigQuery dans lequel stocker les tables répliquées, au format <DATASET_NAME>.
Paramètres facultatifs
inputTopics : liste des sujets Pub/Sub séparés par une virgule vers lesquels les données CDC sont transférées.
updateFrequencySecs : intervalle auquel le pipeline met à jour la table BigQuery en répliquant la base de données MySQL.
useSingleTopic : définissez cette valeur sur true si vous configurez votre connecteur Debezium pour publier toutes les mises à jour de tables dans un seul sujet. La valeur par défaut est "false".
useStorageWriteApiAtLeastOnce : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre. La valeur par défaut est 0.
storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
Dernière mise à jour le 2025/09/04 (UTC).
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Difficile à comprendre","hardToUnderstand","thumb-down"],["Informations ou exemple de code incorrects","incorrectInformationOrSampleCode","thumb-down"],["Il n'y a pas l'information/les exemples dont j'ai besoin","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],["Dernière mise à jour le 2025/09/04 (UTC)."],[[["\u003cp\u003eThis template streams change data from a MySQL database to BigQuery using Debezium, which captures changes and publishes them to Pub/Sub, and then the pipeline reads these messages and writes them to BigQuery.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline synchronizes MySQL databases with BigQuery tables by writing changed data to a staging table and intermittently updating a BigQuery table that mirrors the MySQL database.\u003c/p\u003e\n"],["\u003cp\u003eTo run the pipeline, a Debezium connector must be deployed, and Pub/Sub messages must be serialized in Beam Row format.\u003c/p\u003e\n"],["\u003cp\u003eRequired parameters for running the template include a comma-separated list of Pub/Sub input subscriptions, a BigQuery dataset for staging tables, and a BigQuery dataset for replica tables.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline supports the optional use of the BigQuery Storage Write API, offering both at-least-once and exactly-once semantics, along with configurable options for stream numbers and triggering frequency.\u003c/p\u003e\n"]]],[],null,["# Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub (Stream) template\n\nThe Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub\ntemplate is a streaming pipeline that reads Pub/Sub messages with change data from\na MySQL database and writes the records to BigQuery. A Debezium connector captures\nchanges to the MySQL database and publishes the changed data to Pub/Sub. The\ntemplate then reads the Pub/Sub messages and writes them to BigQuery.\n\nYou can use this template to sync MySQL databases and BigQuery tables. The\npipeline writes the changed data to a BigQuery staging table and intermittently\nupdates a BigQuery table replicating the MySQL database.\n\nPipeline requirements\n---------------------\n\n- The Debezium connector must be [deployed](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector).\n- The Pub/Sub messages must be serialized in a [Beam Row](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/Row.html).\n\nTemplate parameters\n-------------------\n\n### Required parameters\n\n- **inputSubscriptions** : The comma-separated list of Pub/Sub input subscriptions to read from, in the format `\u003cSUBSCRIPTION_NAME\u003e,\u003cSUBSCRIPTION_NAME\u003e, ...`.\n- **changeLogDataset**: The BigQuery dataset to store the staging tables in, in the format \\\u003cDATASET_NAME\\\u003e.\n- **replicaDataset**: The location of the BigQuery dataset to store the replica tables in, in the format \\\u003cDATASET_NAME\\\u003e.\n\n### Optional parameters\n\n- **inputTopics**: Comma-separated list of PubSub topics to where CDC data is being pushed.\n- **updateFrequencySecs**: The interval at which the pipeline updates the BigQuery table replicating the MySQL database.\n- **useSingleTopic** : Set this to `true` if you configure your Debezium connector to publish all table updates to a single topic. Defaults to: false.\n- **useStorageWriteApi** : If true, the pipeline uses the BigQuery Storage Write API (\u003chttps://cloud.google.com/bigquery/docs/write-api\u003e). The default value is `false`. For more information, see Using the Storage Write API (\u003chttps://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api\u003e).\n- **useStorageWriteApiAtLeastOnce** : When using the Storage Write API, specifies the write semantics. To use at-least once semantics (\u003chttps://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics\u003e), set this parameter to `true`. To use exactly-once semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.\n- **numStorageWriteApiStreams** : When using the Storage Write API, specifies the number of write streams. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter. Defaults to: 0.\n- **storageWriteApiTriggeringFrequencySec** : When using the Storage Write API, specifies the triggering frequency, in seconds. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter.\n\nRun the template\n----------------\n\nTo run this template, perform the following steps:\n\n1. On your local machine, clone the [DataflowTemplates repository](https://github.com/GoogleCloudPlatform/DataflowTemplates).\n2. Change to the `v2/cdc-parent` directory.\n3. Ensure that the Debezium connector is [deployed](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector).\n4. Using Maven, run the Dataflow template: \n\n ```bash\n mvn exec:java -pl cdc-change-applier -Dexec.args=\"--runner=DataflowRunner \\\n --inputSubscriptions=\u003cvar translate=\"no\"\u003eSUBSCRIPTIONS\u003c/var\u003e \\\n --updateFrequencySecs=300 \\\n --changeLogDataset=\u003cvar translate=\"no\"\u003eCHANGELOG_DATASET\u003c/var\u003e \\\n --replicaDataset=\u003cvar translate=\"no\"\u003eREPLICA_DATASET\u003c/var\u003e \\\n --project=\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e \\\n --region=REGION_NAME\"\n \n ```\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e: the Google Cloud project ID where you want to run the Dataflow job\n - \u003cvar translate=\"no\"\u003eSUBSCRIPTIONS\u003c/var\u003e: your comma-separated list of Pub/Sub subscription names\n - \u003cvar translate=\"no\"\u003eCHANGELOG_DATASET\u003c/var\u003e: your BigQuery dataset for changelog data\n - \u003cvar translate=\"no\"\u003eREPLICA_DATASET\u003c/var\u003e: your BigQuery dataset for replica tables\n\nWhat's next\n-----------\n\n- Learn about [Dataflow templates](/dataflow/docs/concepts/dataflow-templates).\n- See the list of [Google-provided templates](/dataflow/docs/guides/templates/provided-templates).\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e"]]