Modèle de capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub (Flux)

Le modèle Capture de données modifiées de MySQL vers BigQuery à l'aide de Debezium et Pub/Sub est un pipeline de streaming qui lit les messages Pub/Sub avec des données modifiées provenant d'une base de données MySQL et écrit les enregistrements dans BigQuery. Un connecteur Debezium enregistre les modifications apportées à la base de données MySQL et publie les données modifiées dans Pub/Sub. Le modèle lit ensuite les messages Pub/Sub et les écrit dans BigQuery.

Vous pouvez utiliser ce modèle pour synchroniser des bases de données MySQL et des tables BigQuery. Le pipeline écrit les données modifiées dans une table de préproduction BigQuery et met à jour une table BigQuery par intermittence en répliquant la base de données MySQL.

Conditions requises pour ce pipeline

  • Le connecteur Debezium doit être déployé.
  • Les messages Pub/Sub doivent être sérialisés dans une classe Beam Row.

Paramètres de modèle

Paramètres obligatoires

  • inputSubscriptions : liste des abonnements en entrée Pub/Sub à lire séparés par une virgule, au format <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset : ensemble de données BigQuery dans lequel stocker les tables de préproduction, au format <DATASET_NAME>.
  • replicaDataset : emplacement de l'ensemble de données BigQuery dans lequel stocker les tables dupliquées, au format <DATASET_NAME>.

Paramètres facultatifs

  • inputTopics : liste des sujets Pub/Sub séparés par une virgule vers lesquels les données CDC sont transférées.
  • updateFrequencySecs : intervalle auquel le pipeline met à jour la table BigQuery en répliquant la base de données MySQL.
  • useSingleTopic : définissez cette valeur sur "true" si vous avez configuré votre connecteur Debezium pour publier toutes les mises à jour de tables dans un seul sujet. La valeur par défaut est "false".
  • useStorageWriteApi : si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : spécifie la sémantique d'écriture, lorsque vous utilisez l'API Storage Write. Pour utiliser la sémantique de type "au moins une fois" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), définissez ce paramètre sur true. Pour utiliser la sémantique de type "exactement une fois", définissez le paramètre sur false. Ce paramètre ne s'applique que lorsque la valeur de useStorageWriteApi est définie sur true. La valeur par défaut est false.
  • numStorageWriteApiStreams : spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec : spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.

Exécuter le modèle

Pour exécuter ce modèle, procédez comme suit :

  1. Sur votre ordinateur local, clonez le dépôt DataflowTemplates.
  2. Accédez au répertoire v2/cdc-parent.
  3. Assurez-vous que le connecteur Debezium est déployé.
  4. Exécutez le modèle Dataflow à l'aide de Maven :
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
    • SUBSCRIPTIONS : liste des noms de vos abonnements Pub/Sub, séparés par une virgule
    • CHANGELOG_DATASET : ensemble de données BigQuery pour les données du journal des modifications
    • REPLICA_DATASET : ensemble de données BigQuery pour les tables dupliquées

Étapes suivantes