Datastream to Spanner 템플릿

Datastream to Spanner 템플릿은 Cloud Storage 버킷에서 Datastream 이벤트를 읽고 Spanner 데이터베이스에 쓰는 스트리밍 파이프라인입니다. 데이터를 Datastream 소스에서 Spanner로 마이그레이션합니다.

마이그레이션에 필요한 모든 테이블은 템플릿을 실행하기 전에 대상 Spanner 데이터베이스에 있어야 합니다. 따라서 데이터 마이그레이션 전에 소스 데이터베이스에서 대상 Spanner로 스키마 마이그레이션을 완료해야 합니다. 마이그레이션하기 전에 데이터가 테이블에 있을 수 있습니다. 이 템플릿은 Datastream 스키마 변경사항을 Spanner 데이터베이스에 전파하지 않습니다.

데이터 일관성은 모든 데이터가 Spanner에 기록될 때 마이그레이션 종료 시에만 보장됩니다. Spanner에 쓰인 각 레코드의 순서 정보를 저장하도록 이 템플릿은 Spanner 데이터베이스의 각 테이블에 추가 테이블 (섀도 테이블이라고 함)을 만듭니다. 이 테이블은 마이그레이션 종료 시 일관성을 보장하는 데 사용됩니다. 섀도 테이블은 마이그레이션 후에 삭제되지 않으며 마이그레이션 종료 시 유효성 검사 목적으로 사용될 수 있습니다.

스키마 불일치, 잘못된 형식의 JSON 파일 또는 변환 실행으로 발생하는 오류와 같은 작업 중에 발생하는 모든 오류는 오류 큐에 기록됩니다. 오류 큐는 오류와 함께 오류가 발생한 모든 Datastream 이벤트를 텍스트 형식으로 저장하는 Cloud Storage 폴더입니다. 오류는 일시적이거나 영구적일 수 있으며 오류 큐의 적절한 Cloud Storage 폴더에 저장됩니다. 일시적인 오류는 자동으로 재시도되지만 영구 오류는 그렇지 않습니다. 영구적인 오류가 발생할 경우 변경 이벤트를 수정하고 템플릿이 실행되는 동안 재시도 가능한 버킷으로 이동할 수 있습니다.

파이프라인 요구사항

  • 실행 중 또는 시작되지 않음 상태의 Datastream 스트림
  • Datastream 이벤트가 복제되는 Cloud Storage 버킷
  • 기존 테이블이 있는 Spanner 데이터베이스 이러한 테이블은 비어 있거나 데이터를 포함할 수 있습니다.

템플릿 매개변수

필수 매개변수

  • instanceId: 변경사항이 복제된 Spanner 인스턴스입니다.
  • databaseId: 변경사항이 복제된 Spanner 데이터베이스입니다.
  • streamName: 스키마 정보와 소스 유형을 폴링할 스트림의 이름이나 템플릿입니다.

선택적 매개변수

  • inputFilePattern: 복제할 Datastream 파일이 포함된 Cloud Storage 파일 위치입니다. 일반적으로 이는 스트림의 루트 경로입니다. 이 기능에 대한 지원이 중지되었습니다.
  • inputFileFormat: Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
  • sessionFilePath: HarbourBridge의 매핑 정보가 포함된 Cloud Storage의 세션 파일 경로입니다.
  • projectId: Spanner 프로젝트 ID입니다.
  • spannerHost: 템플릿에서 호출할 Cloud Spanner 엔드포인트입니다. 예를 들면 https://batch-spanner.googleapis.com입니다. 기본값은 https://batch-spanner.googleapis.com입니다.
  • gcsPubSubSubscription: Cloud Storage 알림 정책에 사용 중인 Pub/Sub 구독입니다. 이름에는 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 형식을 사용합니다.
  • shadowTablePrefix: 섀도우 테이블의 이름을 지정하는 데 사용되는 접두사입니다. 기본값: shadow_
  • shouldCreateShadowTables: 이 플래그는 Cloud Spanner 데이터베이스에 섀도 테이블을 만들어야 하는지 여부를 나타냅니다. 기본값은 true입니다.
  • rfcStartDateTime: Cloud Storage에서 가져오는 데 사용되는 시작 DateTime입니다 (https://tools.ietf.org/html/rfc3339). 기본값은 1970-01-01T00:00:00.00Z입니다.
  • fileReadConcurrency: 읽을 동시 DataStream 파일 수입니다. 기본값은 30입니다.
  • deadLetterQueueDirectory: 오류 큐 출력을 저장할 때 사용되는 파일 경로입니다. 기본 파일 경로는 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다.
  • dlqRetryMinutes: 데드 레터 큐 재시도 간격(분)입니다. 기본값은 10입니다.
  • dlqMaxRetryCount: DLQ를 통해 임시 오류를 재시도할 수 있는 최대 횟수입니다. 기본값은 500입니다.
  • dataStreamRootUrl: Datastream API 루트 URL입니다. 기본값은 https://datastream.googleapis.com/입니다.
  • datastreamSourceType: Datastream이 연결되는 소스 데이터베이스의 유형입니다. 예: mysql/oracle 실제로 실행 중인 Datastream이 없는 경우 테스트할 때 설정해야 합니다.
  • roundJsonDecimals: 이 플래그가 설정되면 json 열의 소수점 값을 정밀도 손실 없이 저장할 수 있는 숫자로 반올림합니다. 기본값은 false입니다.
  • runMode: 일반 모드인지 또는 retryDLQ 모드인지 여부에 관계없는 실행 모드 유형입니다. 기본값은 regular입니다.
  • transformationContextFilePath: 이전 중에 실행된 변환에 사용된 데이터를 채우는 데 사용되는 Cloud Storage의 변환 컨텍스트 파일 경로입니다. 예: 행이 이전된 데이터베이스를 식별하기 위한 샤드 ID와 데이터베이스 이름
  • directoryWatchDurationInMinutes: 파이프라인이 GCS에서 디렉터리를 계속 폴링해야 하는 시간입니다. Datastreamoutput 파일은 이벤트의 타임스탬프를 분 단위로 그룹화하여 보여주는 디렉터리 구조로 정렬됩니다. 이 매개변수는 소스 데이터베이스에서 발생한 이벤트와 Datastream에서 GCS에 쓰는 동일한 이벤트 간에 발생할 수 있는 최대 지연 시간과 거의 같습니다. 99.9백분위수 = 10분 기본값은 10입니다.
  • spannerPriority: Cloud Spanner 호출의 요청 우선순위입니다. 값은 [HIGH,MEDIUM,LOW] 중 하나여야 합니다. 기본값은 HIGH입니다.
  • dlqGcsPubSubSubscription: 일반 모드에서 실행할 때 DLQ 재시도 디렉터리의 Cloud Storage 알림 정책에 사용되는 Pub/Sub 구독입니다. 이름에는 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME> 형식을 사용합니다. 이 옵션을 설정하면 deadLetterQueueDirectory 및 dlqRetryMinutes가 무시됩니다.
  • transformationJarPath: 전방 이전에서 레코드를 처리하기 위한 커스텀 변환 로직이 포함된 파일의 Cloud Storage에 있는 커스텀 JAR 파일 위치입니다. 기본값은 빈 값입니다.
  • transformationClassName: 커스텀 변환 로직이 있는 정규화된 클래스 이름입니다. transformationJarPath가 지정된 경우 필수 필드입니다. 기본값은 빈 값입니다.
  • transformationCustomParameters: 커스텀 변환 클래스에 전달할 커스텀 매개변수가 포함된 문자열입니다. 기본값은 빈 값입니다.
  • filteredEventsDirectory: 맞춤 변환을 통해 필터링된 이벤트를 저장하는 파일 경로입니다. 기본값은 Dataflow 작업의 임시 위치 아래에 있는 디렉터리입니다. 대부분의 상황에서는 기본값이면 충분합니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Datastream to Spanner template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • GCS_FILE_PATH: Datastream 이벤트를 저장하는 데 사용되는 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • CLOUDSPANNER_INSTANCE: Spanner 인스턴스
  • CLOUDSPANNER_DATABASE: Spanner 데이터베이스
  • DLQ: 오류 큐 디렉터리의 Cloud Storage 경로

다음 단계