Cambiar la captura de datos de MySQL a BigQuery con la plantilla de Debezium y Pub/Sub (Stream)
Organízate con las colecciones
Guarda y clasifica el contenido según tus preferencias.
La plantilla de captura de datos de cambios de MySQL a BigQuery mediante Debezium y Pub/Sub es un flujo de procesamiento en streaming que lee mensajes de Pub/Sub con datos de cambios de una base de datos MySQL y escribe los registros en BigQuery. Un conector de Debezium captura los cambios en la base de datos MySQL y publica los datos modificados en Pub/Sub. La plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.
Puedes usar esta plantilla para sincronizar bases de datos MySQL y tablas de BigQuery. La
pipeline escribe los datos modificados en una tabla de almacenamiento provisional de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos MySQL.
inputSubscriptions lista separada por comas de suscripciones de entrada de Pub/Sub desde las que leer, con el formato <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
changeLogDataset el conjunto de datos de BigQuery en el que se almacenarán las tablas de almacenamiento provisional, con el formato <DATASET_NAME>.
replicaDataset ubicación del conjunto de datos de BigQuery en el que se almacenarán las tablas de réplica, con el formato <DATASET_NAME>.
Parámetros opcionales
inputTopics lista separada por comas de temas de Pub/Sub a los que se envían los datos de CDC.
updateFrequencySecs el intervalo con el que la canalización actualiza la tabla de BigQuery que replica la base de datos MySQL.
useSingleTopic asigna el valor true si configuras el conector Debezium para que publique todas las actualizaciones de la tabla en un solo tema. Valor predeterminado: false.
useStorageWriteApiAtLeastOnce al usar la API Storage Write, especifica la semántica de escritura. Para usar la semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), asigna el valor true a este parámetro. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true. El valor predeterminado es false.
numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro. El valor predeterminado es 0.
storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro.
[[["Es fácil de entender","easyToUnderstand","thumb-up"],["Me ofreció una solución al problema","solvedMyProblem","thumb-up"],["Otro","otherUp","thumb-up"]],[["Es difícil de entender","hardToUnderstand","thumb-down"],["La información o el código de muestra no son correctos","incorrectInformationOrSampleCode","thumb-down"],["Me faltan las muestras o la información que necesito","missingTheInformationSamplesINeed","thumb-down"],["Problema de traducción","translationIssue","thumb-down"],["Otro","otherDown","thumb-down"]],["Última actualización: 2025-09-10 (UTC)."],[[["\u003cp\u003eThis template streams change data from a MySQL database to BigQuery using Debezium, which captures changes and publishes them to Pub/Sub, and then the pipeline reads these messages and writes them to BigQuery.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline synchronizes MySQL databases with BigQuery tables by writing changed data to a staging table and intermittently updating a BigQuery table that mirrors the MySQL database.\u003c/p\u003e\n"],["\u003cp\u003eTo run the pipeline, a Debezium connector must be deployed, and Pub/Sub messages must be serialized in Beam Row format.\u003c/p\u003e\n"],["\u003cp\u003eRequired parameters for running the template include a comma-separated list of Pub/Sub input subscriptions, a BigQuery dataset for staging tables, and a BigQuery dataset for replica tables.\u003c/p\u003e\n"],["\u003cp\u003eThe pipeline supports the optional use of the BigQuery Storage Write API, offering both at-least-once and exactly-once semantics, along with configurable options for stream numbers and triggering frequency.\u003c/p\u003e\n"]]],[],null,["The Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub\ntemplate is a streaming pipeline that reads Pub/Sub messages with change data from\na MySQL database and writes the records to BigQuery. A Debezium connector captures\nchanges to the MySQL database and publishes the changed data to Pub/Sub. The\ntemplate then reads the Pub/Sub messages and writes them to BigQuery.\n\nYou can use this template to sync MySQL databases and BigQuery tables. The\npipeline writes the changed data to a BigQuery staging table and intermittently\nupdates a BigQuery table replicating the MySQL database.\n\nPipeline requirements\n\n- The Debezium connector must be [deployed](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector).\n- The Pub/Sub messages must be serialized in a [Beam Row](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/values/Row.html).\n\nTemplate parameters\n\nRequired parameters\n\n- **inputSubscriptions** : The comma-separated list of Pub/Sub input subscriptions to read from, in the format `\u003cSUBSCRIPTION_NAME\u003e,\u003cSUBSCRIPTION_NAME\u003e, ...`.\n- **changeLogDataset**: The BigQuery dataset to store the staging tables in, in the format \\\u003cDATASET_NAME\\\u003e.\n- **replicaDataset**: The location of the BigQuery dataset to store the replica tables in, in the format \\\u003cDATASET_NAME\\\u003e.\n\nOptional parameters\n\n- **inputTopics**: Comma-separated list of PubSub topics to where CDC data is being pushed.\n- **updateFrequencySecs**: The interval at which the pipeline updates the BigQuery table replicating the MySQL database.\n- **useSingleTopic** : Set this to `true` if you configure your Debezium connector to publish all table updates to a single topic. Defaults to: false.\n- **useStorageWriteApi** : If true, the pipeline uses the BigQuery Storage Write API (\u003chttps://cloud.google.com/bigquery/docs/write-api\u003e). The default value is `false`. For more information, see Using the Storage Write API (\u003chttps://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api\u003e).\n- **useStorageWriteApiAtLeastOnce** : When using the Storage Write API, specifies the write semantics. To use at-least once semantics (\u003chttps://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics\u003e), set this parameter to `true`. To use exactly-once semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`.\n- **numStorageWriteApiStreams** : When using the Storage Write API, specifies the number of write streams. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter. Defaults to: 0.\n- **storageWriteApiTriggeringFrequencySec** : When using the Storage Write API, specifies the triggering frequency, in seconds. If `useStorageWriteApi` is `true` and `useStorageWriteApiAtLeastOnce` is `false`, then you must set this parameter.\n\nRun the template\n\nTo run this template, perform the following steps:\n\n1. On your local machine, clone the [DataflowTemplates repository](https://github.com/GoogleCloudPlatform/DataflowTemplates).\n2. Change to the `v2/cdc-parent` directory.\n3. Ensure that the Debezium connector is [deployed](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/master/v2/cdc-parent#deploying-the-connector).\n4. Using Maven, run the Dataflow template: \n\n ```bash\n mvn exec:java -pl cdc-change-applier -Dexec.args=\"--runner=DataflowRunner \\\n --inputSubscriptions=\u003cvar translate=\"no\"\u003eSUBSCRIPTIONS\u003c/var\u003e \\\n --updateFrequencySecs=300 \\\n --changeLogDataset=\u003cvar translate=\"no\"\u003eCHANGELOG_DATASET\u003c/var\u003e \\\n --replicaDataset=\u003cvar translate=\"no\"\u003eREPLICA_DATASET\u003c/var\u003e \\\n --project=\u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e \\\n --region=REGION_NAME\"\n \n ```\n\n Replace the following:\n - \u003cvar translate=\"no\"\u003ePROJECT_ID\u003c/var\u003e: the Google Cloud project ID where you want to run the Dataflow job\n - \u003cvar translate=\"no\"\u003eSUBSCRIPTIONS\u003c/var\u003e: your comma-separated list of Pub/Sub subscription names\n - \u003cvar translate=\"no\"\u003eCHANGELOG_DATASET\u003c/var\u003e: your BigQuery dataset for changelog data\n - \u003cvar translate=\"no\"\u003eREPLICA_DATASET\u003c/var\u003e: your BigQuery dataset for replica tables\n\nWhat's next\n\n- Learn about [Dataflow templates](/dataflow/docs/concepts/dataflow-templates).\n- See the list of [Google-provided templates](/dataflow/docs/guides/templates/provided-templates).\n\n\u003cbr /\u003e\n\n\u003cbr /\u003e"]]