Captura de dados de alterações do MySQL para o BigQuery através do modelo Debezium e Pub/Sub (stream)

O modelo de captura de dados de alterações do MySQL para o BigQuery através do Debezium e do Pub/Sub é um pipeline de streaming que lê mensagens do Pub/Sub com dados de alterações de uma base de dados MySQL e escreve os registos no BigQuery. Um conetor do Debezium captura as alterações à base de dados do MySQL e publica os dados alterados no Pub/Sub. Em seguida, o modelo lê as mensagens do Pub/Sub e escreve-as no BigQuery.

Pode usar este modelo para sincronizar bases de dados MySQL e tabelas do BigQuery. O pipeline escreve os dados alterados numa tabela de preparação do BigQuery e atualiza intermitentemente uma tabela do BigQuery que replica a base de dados MySQL.

Requisitos do pipeline

Parâmetros de modelos

Parâmetros obrigatórios

  • inputSubscriptions: a lista separada por vírgulas de subscrições de entrada do Pub/Sub a partir das quais ler, no formato <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ....
  • changeLogDataset: o conjunto de dados do BigQuery para armazenar as tabelas de preparação, no formato <DATASET_NAME>.
  • replicaDataset: a localização do conjunto de dados do BigQuery onde armazenar as tabelas de réplica, no formato <DATASET_NAME>.

Parâmetros opcionais

  • inputTopics: lista separada por vírgulas de tópicos do PubSub para onde os dados de CDC estão a ser enviados.
  • updateFrequencySecs: o intervalo no qual o pipeline atualiza a tabela do BigQuery que replica a base de dados MySQL.
  • useSingleTopic: defina esta opção como true se configurar o conetor do Debezium para publicar todas as atualizações de tabelas num único tópico. A predefinição é: false.
  • useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é false. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como true. Para usar a semântica exatamente uma vez, defina o parâmetro como false. Este parâmetro só se aplica quando useStorageWriteApi é true. O valor predefinido é false.
  • numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro. A predefinição é: 0.
  • storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se useStorageWriteApi for true e useStorageWriteApiAtLeastOnce for false, tem de definir este parâmetro.

Execute o modelo

Para executar este modelo, siga estes passos:

  1. Na sua máquina local, clone o repositório DataflowTemplates.
  2. Mude para o diretório v2/cdc-parent.
  3. Certifique-se de que o conector Debezium está implementado.
  4. Com o Maven, execute o modelo do Dataflow:
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
    • SUBSCRIPTIONS: a sua lista de nomes de subscrições do Pub/Sub separada por vírgulas
    • CHANGELOG_DATASET: o seu conjunto de dados do BigQuery para dados do registo de alterações
    • REPLICA_DATASET: o seu conjunto de dados do BigQuery para tabelas de réplicas

O que se segue?