Spanner 변경 내역에서 BigQuery로 템플릿

Spanner change stream to BigQuery 템플릿은 Spanner 데이터 변경 레코드를 스트리밍하고 Dataflow Runner V2를 사용하여 BigQuery 테이블에 쓰는 스트리밍 파이프라인입니다.

Spanner 트랜잭션의 수정 여부와 관계없이 변경 내역이 감시하는 모든 열이 각 BigQuery 테이블 행에 포함됩니다. 감시 대상이 아닌 열은 BigQuery 행에 포함되지 않습니다. Dataflow 워터마크보다 낮은 Spanner 변경사항은 BigQuery 테이블에 적용되거나 재시도를 위해 데드 레터 큐에 저장됩니다. BigQuery 행이 저장되는 순서는 원본 Spanner 커밋 타임스탬프의 순서와 다릅니다.

필요한 BigQuery 테이블이 없으면 파이프라인이 테이블을 만듭니다. 그렇지 않으면 기존 BigQuery 테이블이 사용됩니다. 기존 BigQuery 테이블의 스키마에는 Spanner 테이블의 해당 추적 열과 추가 메타데이터 열이 ignoreFields 옵션으로 인해 명시적으로 무시되지 않아야 합니다. 다음 목록에서 메타데이터 필드에 대한 설명을 참조하세요. 각각의 새 BigQuery 행에는 변경 레코드의 타임스탬프에 있는 Spanner 테이블의 해당 행에서 변경 내역이 감시하는 모든 열이 포함됩니다.

다음 메타데이터 필드가 BigQuery 테이블에 추가됩니다. 이러한 필드에 대한 자세한 내용은 '변경 내역 파티션, 레코드, 쿼리'의 데이터 변경 레코드를 참조하세요.

이 템플릿을 사용할 때는 다음 세부정보에 유의하세요.

  • 이 템플릿은 스키마 변경사항을 Spanner에서 BigQuery로 전파하지 않습니다. Spanner에서 스키마 변경을 수행하면 파이프라인이 중단될 가능성이 높으므로 스키마 변경 후 파이프라인을 다시 만들어야 할 수 있습니다.
  • OLD_AND_NEW_VALUESNEW_VALUES 값 캡처 유형의 경우 데이터 변경 레코드에 UPDATE 변경사항이 있으면 템플릿은 변경되지 않았지만 감시된 열을 검색하기 위해 데이터 변경 레코드의 커밋 타임스탬프에서 Spanner에 대한 비활성 읽기를 수행해야 합니다. 비활성 읽기에 대해 데이터베이스 'version_retention_period'를 올바르게 구성해야 합니다. NEW_ROW 값 캡처 유형의 경우 데이터 변경 레코드가 UPDATE 요청에서 업데이트되지 않는 열을 포함하여 전체 새 행을 캡처하기 때문에 더 효율적이므로 템플릿이 비활성 읽기를 수행할 필요가 없습니다.
  • 네트워크 지연 시간 및 네트워크 전송 비용을 최소화하려면 Spanner 인스턴스 또는 BigQuery 테이블과 동일한 리전에서 Dataflow 작업을 실행합니다. 작업 리전 외부에 있는 소스, 싱크, 스테이징 파일 위치 또는 임시 파일 위치를 사용하면 데이터가 리전 간에서 전송될 수 있습니다. 자세한 내용은 Dataflow 리전을 참조하세요.
  • 이 템플릿은 모든 유효한 Spanner 데이터 유형을 지원하지만, BigQuery 유형이 Spanner 유형보다 더 정확한 경우 변환 중에 정밀도 손실이 발생할 수 있습니다. 구체적으로 설명하면 다음과 같습니다.
    • Spanner JSON 유형의 경우 객체 멤버의 순서는 사전순으로 정렬되지만 BigQuery JSON 유형은 보장되지 않습니다.
    • Spanner는 나노초 TIMESTAMP 유형만 지원하지만, BigQuery는 마이크로초 TIMESTAMP 유형만 지원합니다.
  • 이 템플릿은 정확히 한 번 모드에서 BigQuery Storage Write API 사용을 지원하지 않습니다.

변경 스트림, 변경 스트림 Dataflow 파이프라인 빌드 방법, 권장사항에 대해 자세히 알아보세요.

파이프라인 요구사항

  • 파이프라인을 실행하기 전에 Spanner 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 인스턴스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 메타데이터 데이터베이스가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Spanner 변경 내역이 있어야 합니다.
  • 파이프라인을 실행하기 전에 BigQuery 데이터 세트가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • spannerInstanceId: 변경 내역을 읽어올 Spanner 인스턴스입니다.
  • spannerDatabase: 변경 내역을 읽어올 Spanner 데이터베이스입니다.
  • spannerMetadataInstanceId: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 인스턴스입니다.
  • spannerMetadataDatabase: 변경 내역 커넥터 메타데이터 테이블에 사용할 Spanner 데이터베이스입니다. 데이터베이스의 모든 테이블을 추적하는 변경 내역의 경우 메타데이터 테이블을 개별 데이터베이스에 배치하는 것이 좋습니다.
  • spannerChangeStreamName: 읽어 올 Spanner 변경 내역의 이름입니다.
  • bigQueryDataset: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다. dataSetName과 전체 dataSetId(예: bigQueryProjectId.dataSetName) 모두가 허용됩니다.

선택적 매개변수

  • spannerProjectId: 변경 내역을 읽어 올 프로젝트입니다. 이 매개변수의 기본값은 Dataflow 파이프라인이 실행되는 프로젝트입니다.
  • spannerDatabaseRole: 사용자가 변경 내역을 읽는 동안 데이터베이스 역할을 맡습니다. 데이터베이스 역할에는 변경 내역에서 읽는 데 필요한 권한이 있어야 합니다. 데이터베이스 역할을 지정하지 않으면 사용자에게 데이터베이스에서 읽는 데 필요한 IAM 권한이 있어야 합니다.
  • spannerMetadataTableName: 사용할 Cloud Spanner 변경 내역 커넥터 메타데이터 테이블 이름입니다. 제공하지 않으면 파이프라인 흐름 중에 Cloud Spanner 변경 내역 커넥터 메타데이터 테이블이 자동으로 생성됩니다. 기존 파이프라인을 업데이트할 때는 이 매개변수를 반드시 제공해야 하며, 이외의 경우에는 제공해서는 안 됩니다.
  • rpcPriority: Cloud Spanner 호출의 요청 우선순위입니다. 값은 [HIGH,MEDIUM,LOW] 중 하나여야 합니다. 기본값은 HIGH입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 테스트에만 사용됩니다. (예: https://batch-spanner.googleapis.com)
  • startTimestamp: 변경 내역을 읽는 데 사용할 시작 DateTime(https://tools.ietf.org/html/rfc3339)입니다. 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 파이프라인이 시작되는 시점의 타임스탬프입니다.
  • endTimestamp: 변경 내역을 읽는 데 사용할 종료 DateTime(https://tools.ietf.org/html/rfc3339)입니다. 예를 들면 2022-05-05T07:59:59Z입니다. 기본값은 미래의 무한대 시간입니다.
  • bigQueryProjectId: BigQuery 프로젝트입니다. 기본값은 Dataflow 작업의 프로젝트입니다.
  • bigQueryChangelogTableNameTemplate: 변경 로그를 포함하는 BigQuery 테이블 이름의 템플릿입니다. 기본값은 {_metadata_spanner_table_name}_changelog입니다.
  • deadLetterQueueDirectory: 메시지를 처리할 수 없는 이유와 함께 처리되지 않은 모든 레코드가 저장되는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • ignoreFields: 무시될 쉼표로 구분된 필드 목록(대소문자 구분)입니다. 추적된 테이블의 필드 또는 _metadata_spanner_mod_type, _metadata_spanner_table_name, _metadata_spanner_commit_timestamp, _metadata_spanner_server_transaction_id, _metadata_spanner_record_sequence, _metadata_spanner_is_last_record_in_transaction_in_partition, _metadata_spanner_number_of_records_in_transaction, _metadata_spanner_number_of_partitions_in_transaction, _metadata_big_query_commit_timestamp와 같은 메타데이터 필드일 수 있습니다. 기본값은 빈 값입니다.
  • disableDlqRetries: DLQ 재시도를 사용 중지할지 여부입니다. 기본값은 false입니다.
  • useStorageWriteApi: true이면 파이프라인은 BigQuery에 데이터를 쓸 때 Storage Write API를 사용합니다(https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api 참조). 기본값은 false입니다. 정확히 한 번 모드에서 Storage Write API를 사용할 때는 'BigQuery Storage Write API의 스트림 수' 및 'BigQuery Storage Write API의 트리거 빈도(초)'와 같은 매개변수를 설정해야 합니다. Dataflow 적어도 한 번 모드를 사용 설정하거나 useStorageWriteApiAtLeastOnce 매개변수를 true로 설정하면 스트림 수나 트리거 빈도를 설정할 필요가 없습니다.
  • useStorageWriteApiAtLeastOnce: 이 매개변수는 'BigQuery Storage Write API 사용'이 설정된 경우에만 적용됩니다. 사용 설정하면 Storage Write API에 최소 1회의 시맨틱스가 사용되고 그렇지 않은 경우 정확히 한 번의 시맨틱스가 사용됩니다. 기본값은 false입니다.
  • numStorageWriteApiStreams: 스트림 수는 BigQueryIO Write 변환의 동시 로드를 정의하며 파이프라인에서 사용될 Storage Write API의 스트림 수와 대략 일치합니다. 권장 값은 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api를 참조하세요. 기본값은 0입니다.
  • storageWriteApiTriggeringFrequencySec: 트리거 빈도는 BigQuery에서 쿼리하는 데이터가 표시되는 속도를 결정합니다. 권장 값은 https://cloud.google.com/blog/products/data-analytics/streaming-data-into-bigquery-using-storage-write-api를 참조하세요.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Spanner change streams to BigQuery template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Spanner_Change_Streams_to_BigQuery \
    --region REGION_NAME \
    --parameters \
spannerInstanceId=SPANNER_INSTANCE_ID,\
spannerDatabase=SPANNER_DATABASE,\
spannerMetadataInstanceId=SPANNER_METADATA_INSTANCE_ID,\
spannerMetadataDatabase=SPANNER_METADATA_DATABASE,\
spannerChangeStreamName=SPANNER_CHANGE_STREAM,\
bigQueryDataset=BIGQUERY_DATASET

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "spannerInstanceId": "SPANNER_INSTANCE_ID",
          "spannerDatabase": "SPANNER_DATABASE",
          "spannerMetadataInstanceId": "SPANNER_METADATA_INSTANCE_ID",
          "spannerMetadataDatabase": "SPANNER_METADATA_DATABASE",
          "spannerChangeStreamName": "SPANNER_CHANGE_STREAM",
          "bigQueryDataset": "BIGQUERY_DATASET"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Spanner_Change_Streams_to_BigQuery",
   }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • SPANNER_INSTANCE_ID: Spanner 인스턴스 ID입니다.
  • SPANNER_DATABASE: Spanner 데이터베이스입니다.
  • SPANNER_METADATA_INSTANCE_ID: Spanner 메타데이터 인스턴스 ID입니다.
  • SPANNER_METADATA_DATABASE: Spanner 메타데이터 데이터베이스입니다.
  • SPANNER_CHANGE_STREAM: Spanner 변경 내역입니다.
  • BIGQUERY_DATASET: 변경 내역 출력을 위한 BigQuery 데이터 세트입니다.

다음 단계