Datastream to MySQL 또는 Datastream to PostgreSQL(스트림) 템플릿

Datastream to SQL 템플릿은 Datastream 데이터를 읽고 모든 MySQL 또는 PostgreSQL 데이터베이스에 복제하는 스트리밍 파이프라인입니다. 이 템플릿은 Pub/Sub 알림을 사용하여 Cloud Storage에서 데이터를 읽고 이 데이터를 SQL 복제본 테이블에 복제합니다.

이 템플릿은 데이터 정의 언어(DDL)를 지원하지 않으며 모든 테이블이 이미 데이터베이스에 있어야 합니다. 복제는 Dataflow 스테이트풀(Stateful) 변환을 사용하여 비활성 데이터를 필터링하고 비순차적인 데이터의 일관성을 보장합니다. 예를 들어 행의 최신 버전이 이미 통과되면 해당 행의 최신 도착 버전은 무시됩니다. 실행되는 DML은 소스를 대상 데이터로 완벽하게 복제하려고 합니다. 실행된 DML 문은 다음 규칙을 따릅니다.

  • 기본 키가 존재하는 경우 삽입 및 업데이트 작업에서 upsert 구문을 사용합니다(즉, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • 기본 키가 있으면 삭제가 삭제 DML으로 복제됩니다.
  • 기본 키가 없으면 삽입 작업과 업데이트 작업 모두 테이블에 삽입됩니다.
  • 기본 키가 없으면 삭제는 무시됩니다.

Oracle to Postgres 유틸리티를 사용하는 경우 기본 키가 없으면 SQL에서 ROWID를 기본 키로 추가합니다.

파이프라인 요구사항

  • 데이터 복제가 가능하거나 이미 복제하고 있는 Datastream 스트림
  • Datastream 데이터에 Cloud Storage Pub/Sub 알림이 사용 설정되어 있습니다.
  • PostgreSQL 데이터베이스에 필요한 스키마가 입력되었습니다.
  • Dataflow 작업자와 PostgreSQL 사이에 네트워크 액세스가 설정됩니다.

템플릿 매개변수

매개변수 설명
inputFilePattern Cloud Storage에서 복제할 Datastream 파일의 파일 위치입니다. 일반적으로 이 파일 위치는 스트림의 루트 경로입니다.
gcsPubSubSubscription Datastream 파일 알림을 사용하는 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
inputFileFormat Datastream에서 생성한 출력 파일의 형식입니다. 예를 들면 avro,json입니다. 기본값은 avro입니다.
databaseHost 연결할 SQL 호스트입니다.
databaseUser 복제의 모든 테이블에 쓰는 데 필요한 모든 권한이 있는 SQL 사용자입니다.
databasePassword 지정된 SQL 사용자의 비밀번호입니다.
databasePort (선택사항) 연결할 SQL 데이터베이스 포트입니다. 기본값은 5432입니다.
databaseName (선택사항) 연결할 SQL 데이터베이스의 이름입니다. 기본값은 postgres입니다.
streamName (선택사항) 스키마 정보를 폴링할 스트림의 이름이나 템플릿입니다. 기본값은 {_metadata_stream}입니다.

템플릿 실행

콘솔

  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Cloud Datastream to SQL template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

gcloud

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • DATABASE_HOST: SQL 호스트 IP입니다.
  • DATABASE_USER: SQL 사용자입니다.
  • DATABASE_PASSWORD: SQL 비밀번호입니다.

API

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: Datastream 데이터의 Cloud Storage 경로입니다. 예를 들면 gs://bucket/path/to/data/입니다.
  • GCS_SUBSCRIPTION_NAME: 변경된 파일을 읽을 Pub/Sub 구독입니다. 예를 들면 projects/my-project-id/subscriptions/my-subscription-id입니다.
  • DATABASE_HOST: SQL 호스트 IP입니다.
  • DATABASE_USER: SQL 사용자입니다.
  • DATABASE_PASSWORD: SQL 비밀번호입니다.

다음 단계