Cambiar la captura de datos de MySQL a BigQuery con la plantilla de Debezium y Pub/Sub (Stream)

La plantilla de captura de datos de cambios de MySQL a BigQuery mediante Debezium y Pub/Sub es un flujo de procesamiento en streaming que lee mensajes de Pub/Sub con datos de cambios de una base de datos MySQL y escribe los registros en BigQuery. Un conector de Debezium captura los cambios en la base de datos MySQL y publica los datos modificados en Pub/Sub. La plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.

Puedes usar esta plantilla para sincronizar bases de datos MySQL y tablas de BigQuery. La pipeline escribe los datos modificados en una tabla de almacenamiento provisional de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos MySQL.

Requisitos del flujo de procesamiento

Parámetros de plantilla

Parámetros obligatorios

  • inputSubscriptions lista separada por comas de suscripciones de entrada de Pub/Sub desde las que leer, con el formato <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset el conjunto de datos de BigQuery en el que se almacenarán las tablas de almacenamiento provisional, con el formato <DATASET_NAME>.
  • replicaDataset ubicación del conjunto de datos de BigQuery en el que se almacenarán las tablas de réplica, con el formato <DATASET_NAME>.

Parámetros opcionales

  • inputTopics lista separada por comas de temas de Pub/Sub a los que se envían los datos de CDC.
  • updateFrequencySecs el intervalo con el que la canalización actualiza la tabla de BigQuery que replica la base de datos MySQL.
  • useSingleTopic asigna el valor true si configuras el conector Debezium para que publique todas las actualizaciones de la tabla en un solo tema. Valor predeterminado: false.
  • useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce al usar la API Storage Write, especifica la semántica de escritura. Para usar la semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), asigna el valor true a este parámetro. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true. El valor predeterminado es false.
  • numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro. El valor predeterminado es 0.
  • storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro.

Ejecutar la plantilla

Para ejecutar esta plantilla, sigue estos pasos:

  1. En tu máquina local, clona el repositorio DataflowTemplates.
  2. Cambia al directorio v2/cdc-parent.
  3. Asegúrate de que el conector Debezium esté implementado.
  4. Con Maven, ejecuta la plantilla de Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • SUBSCRIPTIONS: lista de nombres de suscripciones de Pub/Sub separados por comas
    • CHANGELOG_DATASET: tu conjunto de datos de BigQuery para los datos del registro de cambios
    • REPLICA_DATASET: tu conjunto de datos de BigQuery para las tablas de réplica

Siguientes pasos