Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub(스트림) 템플릿

Change Data Capture from MySQL to BigQuery using Debezium and Pub/Sub 템플릿은 Pub/Sub 메시지와 MySQL의 변경 데이터를 읽고 BigQuery에 레코드를 작성하는 스트리밍 파이프라인입니다. Debezium 커넥터는 MySQL 데이터베이스의 변경사항을 캡처하고 변경된 데이터를 Pub/Sub에 게시합니다. 그런 다음 템플릿이 Pub/Sub 메시지를 읽고 BigQuery에 씁니다.

이 템플릿을 사용하여 MySQL 데이터베이스와 BigQuery 테이블을 동기화할 수 있습니다. 파이프라인은 변경된 데이터를 BigQuery 스테이징 테이블에 쓰고 MySQL 데이터베이스를 복제하는 BigQuery 테이블을 간헐적으로 업데이트합니다.

파이프라인 요구사항

  • Debezium 커넥터가 배포되어야 합니다.
  • Pub/Sub 메시지는 빔 행으로 직렬화되어야 합니다.

템플릿 매개변수

필수 매개변수

  • inputSubscriptions: 읽어올 Pub/Sub 입력 구독의 쉼표로 구분된 목록이며 <SUBSCRIPTION_NAME>,<SUBSCRIPTION_NAME>, ... 형식입니다.
  • changeLogDataset: 스테이징 테이블을 저장할 BigQuery 데이터 세트이며 <DATASET_NAME> 형식입니다.
  • replicaDataset: 복제 테이블을 저장할 BigQuery 데이터 세트의 위치이며 <DATASET_NAME> 형식입니다.

선택적 매개변수

  • inputTopics: CDC 데이터가 푸시되는 PubSub 주제의 쉼표로 구분된 목록입니다.
  • updateFrequencySecs: 파이프라인이 MySQL 데이터베이스를 복제하는 BigQuery 테이블을 업데이트하는 간격입니다.
  • useSingleTopic: 모든 테이블 업데이트를 단일 주제에 게시하도록 Debezium 커넥터를 구성하는 경우 true로 설정합니다. 기본값은 false입니다.
  • useStorageWriteApi: true인 경우 파이프라인은 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • useStorageWriteApiAtLeastOnce: Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)를 사용하려면, 이 매개변수를 true로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
  • numStorageWriteApiStreams: Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

템플릿 실행

이 템플릿을 실행하려면 다음 단계를 따르세요.

  1. 로컬 머신에서 DataflowTemplates 저장소를 복제합니다.
  2. v2/cdc-parent 디렉터리로 변경합니다.
  3. Debezium 커넥터가 배포되어 있는지 확인합니다.
  4. Maven을 사용하여 Dataflow 템플릿을 실행합니다.
    mvn exec:java -pl cdc-change-applier -Dexec.args="--runner=DataflowRunner \
        --inputSubscriptions=SUBSCRIPTIONS \
        --updateFrequencySecs=300 \
        --changeLogDataset=CHANGELOG_DATASET \
        --replicaDataset=REPLICA_DATASET \
        --project=PROJECT_ID \
        --region=REGION_NAME"
      

    다음을 바꿉니다.

    • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
    • SUBSCRIPTIONS: 쉼표로 구분된 Pub/Sub 구독 이름
    • CHANGELOG_DATASET: 변경 로그 데이터의 BigQuery 데이터 세트
    • REPLICA_DATASET: 복제본 테이블의 BigQuery 데이터 세트

다음 단계