O modelo de captura de dados de alterações do MySQL para o BigQuery através do Debezium e do Pub/Sub é um pipeline de streaming que lê mensagens do Pub/Sub com dados de alterações de uma base de dados MySQL e escreve os registos no BigQuery. Um conetor do Debezium captura as alterações à base de dados do MySQL e publica os dados alterados no Pub/Sub. Em seguida, o modelo lê as mensagens do Pub/Sub e escreve-as no BigQuery.
Pode usar este modelo para sincronizar bases de dados MySQL e tabelas do BigQuery. O pipeline escreve os dados alterados numa tabela de preparação do BigQuery e atualiza intermitentemente uma tabela do BigQuery que replica a base de dados MySQL.
Requisitos do pipeline
- O conetor Debezium tem de ser implementado.
- As mensagens Pub/Sub têm de ser serializadas numa linha do Beam.
Parâmetros de modelos
Parâmetros obrigatórios
- inputSubscriptions: a lista separada por vírgulas de subscrições de entrada do Pub/Sub a partir das quais ler, no formato
<SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ...
. - changeLogDataset: o conjunto de dados do BigQuery para armazenar as tabelas de preparação, no formato <DATASET_NAME>.
- replicaDataset: a localização do conjunto de dados do BigQuery onde armazenar as tabelas de réplica, no formato <DATASET_NAME>.
Parâmetros opcionais
- inputTopics: lista separada por vírgulas de tópicos do PubSub para onde os dados de CDC estão a ser enviados.
- updateFrequencySecs: o intervalo no qual o pipeline atualiza a tabela do BigQuery que replica a base de dados MySQL.
- useSingleTopic: defina esta opção como
true
se configurar o conetor do Debezium para publicar todas as atualizações de tabelas num único tópico. A predefinição é: false. - useStorageWriteApi: se for verdadeiro, o pipeline usa a API Storage Write do BigQuery (https://cloud.google.com/bigquery/docs/write-api). O valor predefinido é
false
. Para mais informações, consulte a secção Usar a API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - useStorageWriteApiAtLeastOnce: quando usa a API Storage Write, especifica a semântica de escrita. Para usar a semântica pelo menos uma vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), defina este parâmetro como
true
. Para usar a semântica exatamente uma vez, defina o parâmetro comofalse
. Este parâmetro só se aplica quandouseStorageWriteApi
étrue
. O valor predefinido éfalse
. - numStorageWriteApiStreams: quando usa a API Storage Write, especifica o número de streams de escrita. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro. A predefinição é: 0. - storageWriteApiTriggeringFrequencySec: quando usa a API Storage Write, especifica a frequência de acionamento, em segundos. Se
useStorageWriteApi
fortrue
euseStorageWriteApiAtLeastOnce
forfalse
, tem de definir este parâmetro.
Execute o modelo
Para executar este modelo, siga estes passos:
- Na sua máquina local, clone o repositório DataflowTemplates.
- Mude para o diretório
v2/cdc-parent
. - Certifique-se de que o conector Debezium está implementado.
- Com o Maven, execute o modelo do Dataflow:
mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \ --inputSubscriptions=SUBSCRIPTIONS \ --updateFrequencySecs=300 \ --changeLogDataset=CHANGELOG_DATASET \ --replicaDataset=REPLICA_DATASET \ --project=PROJECT_ID \ --region=REGION_NAME"
Substitua o seguinte:
PROJECT_ID
: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudSUBSCRIPTIONS
: a sua lista de nomes de subscrições do Pub/Sub separada por vírgulasCHANGELOG_DATASET
: o seu conjunto de dados do BigQuery para dados do registo de alteraçõesREPLICA_DATASET
: o seu conjunto de dados do BigQuery para tabelas de réplicas
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.