Plantilla de captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub (transmisión)

La plantilla Captura de datos modificados desde MySQL hasta BigQuery mediante Debezium y Pub/Sub es una canalización de transmisión que lee mensajes de Pub/Sub con datos de modificación desde una base de datos de MySQL y escribe los registros en BigQuery. Un conector Debezium captura las modificaciones en la base de datos de MySQL y publica los datos modificados en Pub/Sub. Luego, la plantilla lee los mensajes de Pub/Sub y los escribe en BigQuery.

Puedes usar esta plantilla para sincronizar las bases de datos de MySQL y las tablas de BigQuery. La canalización escribe los datos modificados en una tabla de etapa de pruebas de BigQuery y actualiza de forma intermitente una tabla de BigQuery que replica la base de datos de MySQL.

Requisitos de la canalización

Parámetros de la plantilla

Parámetros obligatorios

  • inputSubscriptions: Es la lista separada por comas de suscripciones de entrada de Pub/Sub desde las que se va a leer, en el formato <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset: Es el conjunto de datos de BigQuery en el que se almacenarán las tablas de etapa de pruebas, en el formato <DATASET_NAME>.
  • replicaDataset: Es la ubicación del conjunto de datos de BigQuery en la que se almacenarán las tablas de réplica, en el formato <DATASET_NAME>.

Parámetros opcionales

  • inputTopics: Es una lista separada por comas de temas de Pub/Sub a los que se envían los datos de CDC.
  • updateFrequencySecs: Es el intervalo en el que la canalización actualiza la tabla de BigQuery y replica la base de datos de MySQL.
  • useSingleTopic: Establece esto como "true" si configuraste el conector Debezium para publicar todas las actualizaciones de la tabla en un solo tema. La configuración predeterminada es "false".
  • useStorageWriteApi: Si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: Cuando usas la API de Storage Write, se especifica la semántica de escritura. Para usar una semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), configura este parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.
  • numStorageWriteApiStreams: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec: cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.

Ejecuta la plantilla

Para ejecutar esta plantilla, sigue estos pasos:

  1. En tu máquina local, clona el repositorio DataflowTemplates.
  2. Cambia al directorio v2/cdc-parent.
  3. Asegúrate de que el conector Debezium esté implementado.
  4. Mediante Maven, ejecuta la plantilla de Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • SUBSCRIPTIONS: la lista separada por comas de los nombres de suscripción a Pub/Sub
    • CHANGELOG_DATASET: el conjunto de datos de BigQuery para los datos de registro de cambios
    • REPLICA_DATASET: el conjunto de datos de BigQuery para tablas de réplica

¿Qué sigue?