Modelo de captura de dados alterados do MySQL para BigQuery usando o Debezium e o Pub/Sub (Stream)

O modelo da captura de dados de alteração do MySQL para o BigQuery usando o Debezium e o Pub/Sub é um pipeline de streaming que lê mensagens Pub/Sub com dados de alteração de um banco de dados MySQL e grava os registros no BigQuery. Um conector do Debezium captura alterações no banco de dados MySQL e publica os dados alterados no Pub/Sub. O modelo lê as mensagens do Pub/Sub e as grava no BigQuery.

É possível usar esse modelo para sincronizar bancos de dados MySQL e tabelas do BigQuery. O pipeline grava os dados alterados em uma tabela de preparo do BigQuery e atualiza intermitentemente uma tabela do BigQuery que replica o banco de dados MySQL.

Requisitos de pipeline

  • O conector do Debezium precisa ser implantado.
  • As mensagens do Pub/Sub precisam ser serializadas em uma Linha enviada.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputSubscriptions : a lista separada por vírgulas de assinaturas de entrada do Pub/Sub para leitura, no formato <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset : o conjunto de dados do BigQuery para armazenar as tabelas de preparo, no formato <DATASET_NAME>.
  • replicaDataset : o local do conjunto de dados do BigQuery para armazenar as tabelas de réplica, no formato <DATASET_NAME>.

Parâmetros opcionais

  • inputTopics : lista separada por vírgulas de tópicos do PubSub para onde os dados do CDC estão sendo enviados.
  • updateFrequencySecs : o intervalo em que o pipeline atualiza a tabela do BigQuery replicando o banco de dados MySQL.
  • useSingleTopic : defina como verdadeiro se você configurou seu conector Debezium para publicar todas as atualizações de tabela em um único tópico. O padrão é: falso.
  • useStorageWriteApi : se verdadeiro, o pipeline usa a API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). O valor padrão é false. Para mais informações, consulte Como usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce : ao usar a API Storage Write, especifica a semântica de gravação. Para usar pelo menos uma semântica (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina esse parâmetro como true. Para usar semântica exatamente uma vez, defina o parâmetro como false. Esse parâmetro se aplica apenas quando useStorageWriteApi é true. O valor padrão é false.
  • numStorageWriteApiStreams : ao usar a API Storage Write, especifica o número de fluxos de gravação. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, será necessário definir esse parâmetro. Padrão: 0.
  • storageWriteApiTriggeringFrequencySec: ao usar a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, você precisará definir esse parâmetro.

Executar o modelo

Para executar esse modelo, siga estas etapas:

  1. Na máquina local, clone o repositório DataflowTemplates.
  2. Altere para o diretório v2/cdc-parent.
  3. Certifique-se de que o conector do Debezium esteja implantado.
  4. Usando o Maven, execute o modelo do Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Substitua:

    • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
    • SUBSCRIPTIONS: sua lista separada por vírgulas de nomes de assinatura do Pub/Sub
    • CHANGELOG_DATASET: seu conjunto de dados do BigQuery para dados do registro de alterações
    • REPLICA_DATASET: seu conjunto de dados do BigQuery para tabelas replicadas

A seguir