Spanner 변경 내역에서 BigQuery로 템플릿

Spanner change stream to BigQuery 템플릿은 Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다.

Spanner 트랜잭션의 수정 여부와 관계없이 변경 내역이 감시하는 모든 열이 각 BigQuery 테이블 행에 포함됩니다. 감시 대상이 아닌 열은 BigQuery 행에 포함되지 않습니다. Dataflow 워터마크보다 낮은 Spanner 변경사항은 BigQuery 테이블에 적용되거나 재시도를 위해 데드 레터 큐에 저장됩니다. BigQuery 행이 저장되는 순서는 원본 Spanner 커밋 타임스탬프의 순서와 다릅니다.

필요한 BigQuery 테이블이 없으면 파이프라인이 테이블을 만듭니다. 그렇지 않으면 기존 BigQuery 테이블이 사용됩니다. 기존 BigQuery 테이블의 스키마에는 Spanner 테이블의 해당 추적 열과 추가 메타데이터 열이 ignoreFields 옵션으로 인해 명시적으로 무시되지 않아야 합니다. 다음 목록에서 메타데이터 필드에 대한 설명을 참조하세요. 각각의 새 BigQuery 행에는 변경 레코드의 타임스탬프에 있는 Spanner 테이블의 해당 행에서 변경 내역이 감시하는 모든 열이 포함됩니다.

다음 메타데이터 필드가 BigQuery 테이블에 추가됩니다. 이러한 필드에 대한 자세한 내용은 '변경 내역 파티션, 레코드, 쿼리'의 데이터 변경 레코드를 참조하세요.

  • _metadata_spanner_mod_type: Spanner 트랜잭션의 수정 유형(삽입, 업데이트 또는 삭제)입니다. 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_table_name: Spanner 테이블 이름입니다. 이 필드는 커넥터의 메타데이터 테이블 이름이 아닙니다.
  • _metadata_spanner_commit_timestamp: Spanner 커밋 타임스탬프로, 변경사항이 커밋되는 시간입니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_server_transaction_id: 변경사항이 커밋된 Spanner 트랜잭션을 나타내는 전역적으로 고유한 문자열입니다. 변경 내역 레코드를 처리할 때에만 이 값을 사용합니다. Spanner API의 트랜잭션 ID와는 상관 관계가 없습니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_record_sequence: Spanner 트랜잭션 내 레코드의 시퀀스 넘버입니다. 시퀀스 번호는 트랜잭션 내에서 고유하고 단조롭게(단, 반드시 연속적일 필요는 없음) 증가하도록 보장됩니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_is_last_record_in_transaction_in_partition: 레코드가 현재 파티션의 마지막 Spanner 트랜잭션 레코드인지 여부를 나타냅니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_number_of_records_in_transaction: 모든 변경 내역 파티션에서 Spanner 트랜잭션에 포함된 데이터 변경 레코드 수입니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_spanner_number_of_partitions_in_transaction: Spanner 트랜잭션의 데이터 변경 레코드를 반환하는 파티션 수입니다. 이 값은 변경 내역 데이터 변경 레코드에서 추출됩니다.
  • _metadata_big_query_commit_timestamp: BigQuery에 행이 삽입된 시점의 커밋 타임스탬프입니다. useStorageWriteApitrue이면 이 열은 파이프라인에 의해 변경 로그 테이블에 자동으로 생성되지 않습니다. 이 경우 필요한 경우 변경 로그 테이블에 이 열을 수동으로 추가해야 합니다.

이 템플릿을 사용할 때는 다음 세부정보에 유의하세요.

  • 이 템플릿을 사용하여 Spanner에서 BigQuery로 기존 테이블 또는 새 테이블의 새 열을 전파할 수 있습니다. 자세한 내용은 추적 테이블 또는 열 추가 처리를 참고하세요.
  • OLD_AND_NEW_VALUESNEW_VALUES 값 캡처 유형의 경우 데이터 변경 레코드에 UPDATE 변경사항이 있으면 템플릿은 변경되지 않았지만 감시된 열을 검색하기 위해 데이터 변경 레코드의 커밋 타임스탬프에서 Spanner에 대한 비활성 읽기를 수행해야 합니다. 비활성 읽기에 대해 데이터베이스 'version_retention_period'를 올바르게 구성해야 합니다. NEW_ROW 값 캡처 유형의 경우 데이터 변경 레코드가 UPDATE 요청에서 업데이트되지 않는 열을 포함하여 전체 새 행을 캡처하기 때문에 더 효율적이므로 템플릿이 비활성 읽기를 수행할 필요가 없습니다.
  • 네트워크 지연 시간 및 네트워크 전송 비용을 최소화하려면 Spanner 인스턴스 또는 BigQuery 테이블과 동일한 리전에서 Dataflow 작업을 실행합니다. 작업 리전 외부에 있는 소스, 싱크, 스테이징 파일 위치 또는 임시 파일 위치를 사용하면 데이터가 리전 간에서 전송될 수 있습니다. 자세한 내용은 Dataflow 리전을 참조하세요.
  • 이 템플릿은 모든 유효한 Spanner 데이터 유형을 지원하지만, BigQuery 유형이 Spanner 유형보다 더 정확한 경우 변환 중에 정밀도 손실이 발생할 수 있습니다. 구체적으로 설명하면 다음과 같습니다.
    • Spanner JSON 유형의 경우 객체 멤버의 순서는 사전순으로 정렬되지만 BigQuery JSON 유형은 보장되지 않습니다.
    • Spanner는 나노초 TIMESTAMP 유형만 지원하지만, BigQuery는 마이크로초 TIMESTAMP 유형만 지원합니다.
  • 이 템플릿은 정확히 한 번 모드에서 BigQuery Storage Write API 사용을 지원하지 않습니다.

변경 스트림, 변경 스트림 Dataflow 파이프라인 빌드 방법, 권장사항에 대해 자세히 알아보세요.

파이프라인 요구사항

  • 파이프라인을 실행하기 전에 Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 BigQuery 데이터 세트가 있어야 합니다.

추적 테이블 또는 열 추가 처리

이 섹션에서는 파이프라인이 실행되는 동안 추적 Spanner 테이블 및 열 추가를 처리하기 위한 권장사항을 설명합니다.

  • Spanner 변경 스트림 범위에 새 열을 추가하기 전에 먼저 BigQuery 변경 로그 테이블에 열을 추가합니다. 추가된 열은 일치하는 데이터 유형을 보유하고 NULLABLE이어야 합니다. Spanner에서 새 열 또는 테이블을 계속 만들기 전에 10분 이상 기다립니다. 기다리지 않고 새 열에 쓰면 데드 레터 큐 디렉터리에 잘못됨 오류 코드가 있는 처리되지 않은 레코드가 생성될 수 있습니다.
  • 새 테이블을 추가하려면 먼저 Spanner 데이터베이스에 테이블을 추가합니다. 파이프라인이 새 테이블의 레코드를 수신하면 BigQuery에 테이블이 자동으로 생성됩니다.
  • Spanner 데이터베이스에 새 열 또는 테이블을 추가한 후에는 새 열 또는 테이블이 아직 암시적으로 추적되지 않는 경우 변경 스트림을 변경하여 원하는 새 열 또는 테이블을 추적해야 합니다.
  • 이 템플릿은 BigQuery에서 테이블이나 열을 삭제하지 않습니다. 열이 Spanner 테이블에서 삭제되면 BigQuery에서 열을 수동으로 삭제하지 않는 한 Spanner 테이블에서 열이 삭제된 후에 생성된 레코드의 BigQuery 변경 로그 열에 null 값이 채워집니다.
  • 템플릿은 열 유형 업데이트를 지원하지 않습니다. Spanner는 STRING 열을 BYTES 열로 또는 BYTES 열을 STRING 열로 변경하는 것을 지원하지만 BigQuery에서는 기존 열의 데이터 유형을 수정하거나 동일한 열 이름을 다른 데이터 유형에 사용할 수 없습니다. Spanner에서 이름은 같지만 유형이 다른 열을 삭제하고 다시 만들면 데이터가 기존 BigQuery 열에 쓰일 수 있지만 유형은 변경되지 않습니다.
  • 이 템플릿은 열 모드 업데이트를 지원하지 않습니다. BigQuery에 복제된 메타데이터 열은 REQUIRED 모드로 설정됩니다. BigQuery로 복제된 다른 모든 열은 Spanner 테이블에서 NOT NULL로 정의되었는지와 관계없이 NULLABLE로 설정됩니다. BigQuery에서는 NULLABLE 열을 REQUIRED 모드로 업데이트할 수 없습니다.
  • 실행 중인 파이프라인에서는 변경 스트림의 값 캡처 유형 변경이 지원되지 않습니다.

템플릿 매개변수

필수 매개변수

  • spannerInstanceId: 변경 내역을 읽어올 Spanner 인스턴스입니다.
  • spannerDatabase: 변경 내역을 읽어올 Spanner 데이터베이스입니다.
  • spannerMetadataInstanceId: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 인스턴스입니다.
  • spannerMetadataDatabase: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 데이터베이스입니다.
  • spannerChangeStreamName: 읽어 올 Spanner 변경 내역의 이름입니다.
  • bigQueryDataset: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

선택적 매개변수

  • spannerProjectId: 변경 내역을 읽어 올 프로젝트입니다. 또한 이 값은 변경 내역 커넥터 메타데이터 테이블이 생성되는 프로젝트이기도 합니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • spannerDatabaseRole: 템플릿을 실행할 때 사용할 Spanner 데이터베이스 역할입니다. 이 매개변수는 템플릿을 실행하는 IAM 주 구성원이 세분화된 액세스 제어 사용자인 경우에만 필요합니다. 데이터베이스 역할에는 변경 내역에 대한 SELECT 권한과 변경 내역의 읽기 함수에 대한 EXECUTE 권한이 있어야 합니다. 자세한 내용은 변경 내역에 대한 세분화된 액세스 제어(https://cloud.google.com/spanner/docs/fgac-change-streams)를 참조하세요.
  • spannerMetadataTableName: 사용할 Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공하지 않으면 파이프라인 흐름 중에 Spanner 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기존 파이프라인을 업데이트할 때 이 매개변수를 제공해야 합니다. 그렇지 않으면 이 매개변수를 제공하지 마세요.
  • rpcPriority: Spanner 호출의 요청 우선순위입니다. 값은 HIGH, MEDIUM, LOW 값 중 하나여야 합니다. 기본값은 HIGH입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 테스트에만 사용됩니다. (예: https://batch-spanner.googleapis.com)
  • startTimestamp : 변경 내역을 읽는 데 사용할 시작 날짜/시간(https://datatracker.ietf.org/doc/html/rfc3339)입니다. Ex-2021-10-12T07:20:50.52Z. 기본값은 파이프라인이 시작되는 시점의 타임스탬프, 즉 현재 시간입니다.
  • endTimestamp : 변경 내역 Ex-2021-10-12T07:20:50.52Z를 읽는 데 사용할 종료 날짜/시간(https://datatracker.ietf.org/doc/html/rfc3339)입니다. 기본값은 미래의 무한대 시간입니다.
  • bigQueryProjectId: BigQuery 프로젝트입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
  • bigQueryChangelogTableNameTemplate : 변경 로그를 포함하는 BigQuery 테이블 이름의 템플릿입니다. 기본값은 {_metadata_spanner_table_name}_changelog입니다.
  • deadLetterQueueDirectory : 처리되지 않은 레코드를 저장하는 경로입니다. 기본 경로는 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 보통 기본값으로 충분합니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • ignoreFields : 무시할 쉼표로 구분된 필드 목록(대소문자 구분)입니다. 이러한 필드는 감시 테이블 필드이거나 파이프라인에서 추가한 메타데이터 필드일 수 있습니다. 무시된 필드는 BigQuery에 삽입되지 않습니다. _metadata_spanner_table_name 필드를 무시하면 bigQueryChangelogTableNameTemplate 매개변수도 무시됩니다. 기본값은 빈 값입니다.
  • disableDlqRetries: DLQ 재시도를 사용 중지할지 여부입니다. 기본값은 false입니다.
  • useStorageWriteApi : true인 경우 파이프라인은 BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)를 사용합니다. 기본값은 false입니다. 자세한 내용은 Storage Write API(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api) 사용을 참조하세요.
  • useStorageWriteApiAtLeastOnce : Storage Write API를 사용할 때 쓰기 시맨틱스를 지정합니다. 1회 이상 실행되는 시맨틱스(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)를 사용하려면, 이 매개변수를 true로 설정합니다. 1회만 실행되는 시맨틱스를 사용하려면 매개변수를 false로 설정합니다. 이 매개변수는 useStorageWriteApitrue인 경우에만 적용됩니다. 기본값은 false입니다.
  • numStorageWriteApiStreams : Storage Write API를 사용할 때 쓰기 스트림 수를 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec : Storage Write API를 사용할 때 트리거 빈도를 초 단위로 지정합니다. useStorageWriteApitrue이고 useStorageWriteApiAtLeastOncefalse이면 이 매개변수를 설정해야 합니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

다음 단계