스트림 관리

개요

이 섹션에서는 다음을 위해 Datastream API를 사용하는 방법을 알아봅니다.

  • 스트림 만들기
  • 스트림 및 스트림 객체에 대한 정보 가져오기
  • 스트림 시작, 일시중지, 다시 시작, 수정, 스트림 객체에 대한 백필 시작 및 중지로 스트림 업데이트
  • 영구적으로 실패한 스트림 복구
  • Oracle 스트림의 대용량 객체 스트리밍 사용 설정
  • 스트림 삭제

Datastream API는 두 가지 방법으로 사용될 수 있습니다. REST API 호출을 수행하거나 Google Cloud CLI(CLI)를 사용할 수 있습니다.

gcloud를 사용해서 Datastream 스트림을 관리하는 방법에 대한 개요는 gcloud Datastream 스트림을 참조하세요.

스트림 만들기

이 섹션에서는 소스에서 대상으로 데이터를 전송하는 데 사용하는 스트림을 만드는 방법을 알아봅니다. 다음 예시는 포괄적이지는 않지만 Datastream의 특정 기능을 강조합니다. 특정 사용 사례를 해결하려면 다음 예시를 Datastream API 참고 문서와 함께 사용하세요.

이 섹션에서는 다음 사용 사례를 설명합니다.

예시 1: BigQuery로 특정 객체 스트리밍

이 예시에서는 다음을 수행하는 방법을 알아봅니다.

  • MySQL에서 BigQuery로 스트리밍
  • 스트림에 객체 집합 포함
  • 스트림의 쓰기 모드를 추가 전용으로 정의
  • 스트림에 포함된 모든 객체 백필

다음은 schema1의 모든 테이블과 schema2의 두 가지 특정 테이블인 tableAtableC 테이블을 가져오는 요청입니다. 이벤트가 BigQuery의 데이터 세트에 기록됩니다.

요청에 customerManagedEncryptionKey 매개변수가 포함되지 않으므로 CMEK 대신 Google Cloud 내부 키 관리 시스템을 사용해서 데이터를 암호화합니다.

이전 백필(또는 스냅샷) 수행과 연결된 backfillAll 매개변수는 빈 딕셔너리({})로 설정됩니다. 즉, Datastream이 스트림에 포함된 모든 테이블의 이전 데이터를 백필합니다.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

gcloud를 사용해서 스트림을 만드는 방법은 Google Cloud SDK 문서를 참조하세요.

예시 2: PostgreSQL 소스를 사용하여 스트림에서 특정 객체 제외

이 예시에서는 다음을 수행하는 방법을 알아봅니다.

  • PostgreSQL에서 BigQuery로 스트리밍
  • 스트림에서 객체 제외
  • 백필에서 객체 제외

다음 코드는 소스 PostgreSQL 데이터베이스에서 BigQuery로 데이터를 전송하는 데 사용되는 스트림을 만들기 위한 요청을 보여줍니다. 소스 PostgreSQL 데이터베이스에서 스트림을 만들 때는 요청에 PostgreSQL과 관련된 두 가지 추가 필드를 지정해야 합니다.

  • replicationSlot: 복제 슬롯은 복제용으로 PostgreSQL 데이터베이스를 구성하기 위한 기본 요건입니다. 각 스트림에 대해 복제 슬롯을 만들어야 합니다.
  • publication: 게시는 변경사항을 복제하려는 테이블 그룹입니다. 스트림을 시작하려면 먼저 게시 이름이 데이터베이스에 있어야 합니다. 게시에는 최소한 스트림의 includeObjects 목록에 지정된 테이블이 포함되어야 합니다.

이전 백필(또는 스냅샷) 수행과 관련된 backfillAll 매개변수는 테이블 하나를 제외하도록 설정됩니다.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

gcloud를 사용해서 스트림을 만드는 방법은 Google Cloud SDK 문서를 참조하세요.

예시 3: 스트림의 추가 전용 쓰기 모드 지정

BigQuery로 스트리밍할 때 쓰기 모드를 merge 또는 appendOnly로 정의할 수 있습니다. 자세한 내용은 쓰기 모드 구성을 참조하세요.

스트림을 만드는 요청에 쓰기 모드를 지정하지 않으면 기본 merge 모드가 사용됩니다.

다음 요청에서는 MySQL to BigQuery 스트림을 만들 때 appendOnly 모드를 정의하는 방법을 보여줍니다.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

gcloud를 사용해서 스트림을 만드는 방법은 Google Cloud SDK 문서를 참조하세요.

예시 4: Cloud Storage 대상으로 스트리밍

이 예시에서는 다음을 수행하는 방법을 알아봅니다.

  • Oracle에서 Cloud Storage로 스트리밍
  • 스트림에 포함할 객체 집합 정의
  • 저장 데이터 암호화를 위해 CMEK 정의

다음 요청은 Cloud Storage의 버킷에 이벤트를 쓰는 스트림을 만드는 방법을 보여줍니다.

이 요청 예시에서는 이벤트가 JSON 출력 형식으로 작성되고 새 파일이 100MB 또는 30초마다 생성됩니다(기본값 50MB 및 60초 재정의).

JSON 형식의 경우 다음을 수행할 수 있습니다.

  • 파일 경로에 통합 유형 스키마 파일을 포함합니다. 따라서 Datastream이 Cloud Storage에 JSON 데이터 파일 및 Avro 스키마 파일을 기록합니다. 스키마 파일에는 .schema 확장자와 데이터 파일과 동일한 이름이 사용됩니다.

  • gzip 압축으로 Datastream이 Cloud Storage에 기록되는 파일을 압축할 수 있게 해줍니다.

이 요청은 backfillNone 매개변수를 사용하여 백필 없이 진행 중인 변경사항만 대상에 스트리밍되도록 지정합니다.

이 요청은 Google Cloud 프로젝트 내 저장 데이터를 암호화하는 데 사용되는 키를 제어할 수 있는 고객 관리 암호화 키 매개변수를 지정합니다. 이 매개변수는 Datastream이 소스에서 대상으로 스트리밍되는 데이터를 암호화하기 위해 사용하는 CMEK를 참조합니다. 또한 CMEK의 키링을 지정합니다.

키링에 대한 자세한 내용은 Cloud KMS 리소스를 참조하세요. 암호화 키를 사용하여 데이터를 보호하는 방법은 Cloud Key Management Service(KMS)를 참조하세요.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

gcloud를 사용해서 스트림을 만드는 방법은 Google Cloud SDK 문서를 참조하세요.

스트림 정의 검증

스트림을 만들기 전에 대상을 검증할 수 있습니다. 이렇게 하면 모든 검증 검사가 통과하고 스트림이 생성되었을 때 성공적으로 실행되도록 보장할 수 있습니다.

스트림 검증 시 다음을 확인합니다.

  • Datastream이 데이터를 스트림하도록 소스가 올바르게 구성되었는지 확인합니다.
  • 스트림이 소스 및 대상에 모두 연결할 수 있는지 확인합니다.
  • 스트림의 엔드 투 엔드 구성을 확인합니다.

스트림을 검증하려면 요청 본문 앞의 URL에 &validate_only=true를 추가합니다.

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

이 요청을 수행하면 검사 성공 또는 실패 여부와 함께 Datastream이 소스 및 대상에 대해 실행하는 검증 검사가 표시됩니다. 통과하지 못한 검증 검사의 경우 실패한 이유와 문제 해결을 위한 수행 작업과 관련된 정보가 표시됩니다.

예를 들어 Datastream이 소스에서 대상으로 스트리밍되는 데이터를 암호화하는 데 사용할 고객 관리 암호화 키(CMEK)가 있다고 가정해보세요. 스트림을 검증하는 동안 Datastream은 키가 존재하고 Datastream에 키 사용 권한이 있는지 확인합니다. 이러한 조건 중 하나라도 충족되지 않으면 스트림을 검증할 때 다음 오류 메시지가 반환됩니다.

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

이 문제를 해결하려면 제공한 키가 존재하는지 그리고 Datastream 서비스 계정에 해당 키에 대한 cloudkms.cryptoKeys.get 권한이 있는지 확인합니다.

적절한 수정 조치를 취한 후 다시 요청을 수행하여 모든 유효성 검사가 통과하는지 확인합니다. 위 예시에서 CMEK_VALIDATE_PERMISSIONS 검사는 더 이상 오류 메시지를 반환하지 않으며 상태가 PASSED가 됩니다.

스트림 정보 가져오기

다음 코드는 스트림 정보를 검색하기 위한 요청을 보여줍니다. 이러한 정보에는 다음이 포함됩니다.

  • 스트림 이름(고유 식별자)
  • 스트림의 사용자 친화적인 이름(표시 이름)
  • 스트림이 생성되고 마지막으로 업데이트된 시간의 타임스탬프
  • 스트림과 연결된 소스 및 대상 연결 프로필 관련 정보
  • 스트림 상태

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

응답이 다음과 같이 표시됩니다.

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

gcloud를 사용해서 스트림 정보를 가져오는 방법을 보려면 여기를 클릭하세요.

스트림 나열

다음 코드는 지정된 프로젝트 및 위치에서 모든 스트림 목록을 검색하는 요청을 보여줍니다.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

gcloud를 사용하여 모든 스트림에 대한 정보를 검색하는 방법을 보려면 여기를 클릭하세요.

스트림의 객체 나열

다음 코드는 스트림의 모든 객체에 대한 정보를 검색하는 요청을 보여줍니다.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

gcloud를 사용하여 스트림의 모든 객체에 대한 정보를 검색하는 방법을 보려면 여기를 클릭하세요.

반환되는 객체 목록은 다음과 비슷해야 합니다.

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

gcloud를 사용하여 스트림의 객체를 나열하는 방법을 자세히 알아보려면 여기를 클릭하세요.

스트림 시작

다음 코드는 스트림 시작 요청을 보여줍니다.

요청에서 updateMask 매개변수를 사용하여 사용자가 지정하는 필드만 요청 본문에 포함해야 합니다. 스트림을 시작하려면 state 필드의 값을 CREATED에서 RUNNING으로 변경합니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud를 사용하여 스트림을 시작하는 방법을 보려면 여기를 클릭하세요.

스트림 일시중지

다음 코드는 실행 중인 스트림을 일시중지하는 요청을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드는 state 필드입니다. 스트림을 일시중지하면 상태가 RUNNING에서 PAUSED로 변경됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

gcloud를 사용하여 스트림을 일시중지하는 방법을 보려면 여기를 클릭하세요.

스트림 재개

다음 코드는 일시중지된 스트림을 재개하는 요청을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드는 state 필드입니다. 스트림을 재개하면 상태가 PAUSED에서 RUNNING으로 변경됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

gcloud를 사용해서 스트림을 재개하는 방법을 보려면 여기를 클릭하세요.

스트림 복구

RunStream 메서드를 사용하여 영구적으로 실패한 스트림을 복구할 수 있습니다. 각 소스 데이터베이스 유형에는 가능한 스트림 복구 작업에 대한 자체 정의가 있습니다. 자세한 내용은 스트림 복구를 참조하세요.

MySQL 또는 Oracle 소스의 스트림 복구

다음 코드 샘플은 다양한 로그 파일 위치에서 MySQL 또는 Oracle 소스의 스트림 복구 요청을 보여줍니다.

REST

현재 위치에서 스트림을 복구하세요. 이것은 기본 옵션입니다.

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

사용 가능한 다음 위치에서 스트림을 복구합니다.

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

가장 최근 위치에서 스트림 복구:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

특정 위치에서 스트림 복구(MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

다음을 바꿉니다.

  • NAME_OF_THE_LOG_FILE: 스트림을 복구할 로그 파일의 이름
  • POSITION: 스트림을 복구하려는 로그 파일에서의 위치. 값을 제공하지 않으면 Datastream이 파일 상단의 스트림을 복구옵니다.

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

특정 위치에서 스트림 복구(Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn을 스트림을 복구할 재실행 로그 파일의 시스템 변경 번호(SCN)로 바꿉니다. 필수 입력란입니다.

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

사용 가능한 복구 옵션에 대한 자세한 내용은 스트림 복구를 참조하세요.

gcloud

gcloud를 사용한 스트림 복구는 지원되지 않습니다.

PostgreSQL 소스의 스트림 복구

다음 코드 샘플은 PostgreSQL 소스의 스트림 복구 요청을 보여줍니다. 복구 중에 스트림은 스트림에 구성된 복제 슬롯의 첫 번째 로그 시퀀스 번호(LSN)에서 읽기 시작합니다.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

복제 슬롯을 변경하려면 먼저 스트림을 새 복제 슬롯 이름으로 업데이트합니다.

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

gcloud를 사용한 스트림 복구는 지원되지 않습니다.

SQL Server 소스의 스트림 복구

다음 코드 샘플은 SQL Server 소스의 스트림을 복구하기 위한 요청 예시를 보여줍니다.

REST

사용 가능한 첫 번째 위치에서 스트림을 복구합니다.

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

원하는 로그 시퀀스 번호로부터 스트림을 복구합니다.

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

gcloud를 사용한 스트림 복구는 지원되지 않습니다.

특정 위치에서 스트림 시작 또는 재개

MySQL 및 Oracle 소스에 대해 특정 위치에서 스트림을 시작하거나 일시중지된 스트림을 재개할 수 있습니다. 이는 외부 도구를 사용하여 백필을 수행하거나 지정한 위치에서 CDC를 시작하려는 경우에 유용할 수 있습니다. MySQL 소스의 경우 바이너리 로그 위치를 나타내야 하며 Oracle 소스의 경우 재실행 로그 파일의 시스템 변경 번호(SCN)를 표기해야 합니다.

다음 코드는 특정 위치의 이미 만들어진 스트림을 시작하거나 재개하기 위한 요청을 보여줍니다.

특정 바이너리 로그 위치(MySQL)에서 스트림을 시작하거나 재개합니다.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

다음을 바꿉니다.

  • NAME_OF_THE_LOG_FILE: 스트림을 시작할 로그 파일의 이름
  • POSITION: 스트림을 시작할 로그 파일에서의 위치. 값을 제공하지 않으면 Datastream이 파일 상단에서 읽기 시작합니다.

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

gcloud를 사용하여 특정 위치에서 스트림을 시작하거나 재개하는 것은 지원되지 않습니다. gcloud를 사용하여 스트림을 시작하거나 재개하는 방법에 대한 자세한 내용은 Cloud SDK 문서를 참조하세요.

재실행 로그 파일(Oracle)에서 특정 시스템 변경 번호에서 스트림을 시작하거나 재개합니다.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
scn을 스트림을 시작할 재실행 로그 파일의 시스템 변경 번호(SCN)로 바꿉니다. 필수 입력란입니다.

예를 들면 다음과 같습니다.

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

gcloud를 사용하여 특정 위치에서 스트림을 시작하거나 재개하는 것은 지원되지 않습니다. gcloud를 사용하여 스트림을 시작하는 방법에 대한 자세한 내용은 Cloud SDK 문서를 참조하세요.

스트림 수정

다음 코드는 75MB 또는 45초마다 파일을 순환하도록 스트림의 파일 순환 구성을 업데이트하는 요청을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드에는 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 플래그로 각각 표시되는 fileRotationMbfileRotationInterval 필드가 포함됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

다음 코드는 Datastream이 Cloud Storage에 쓰는 파일의 경로에 통합 유형 스키마 파일을 포함하도록 하는 요청을 보여줍니다. 따라서 Datastream은 JSON 데이터 파일과 Avro 스키마 파일 두 가지 파일을 기록합니다.

이 예시의 경우 지정된 필드는 jsonFileFormat 필드이며 destinationConfig.gcsDestinationConfig.jsonFileFormat 플래그로 표시됩니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

다음 코드는 소스 데이터베이스에서 대상으로 진행 중인 데이터 변경 외에 Datastream이 기존 데이터를 복제하도록 하는 요청을 보여줍니다.

코드의 oracleExcludedObjects 섹션에는 대상 위치로의 백필이 제한된 테이블 및 스키마가 표시됩니다.

이 예시의 경우 schema3의 tableA를 제외한 모든 테이블과 스키마가 백필됩니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

gcloud를 사용하여 스트림을 수정하는 방법을 보려면 여기를 클릭하세요.

스트림 객체에 대한 백필 시작

Datastream의 스트림은 이전 데이터를 백필하고 진행 중인 변경사항을 대상으로 스트림할 수 있습니다. 진행 중인 변경사항은 항상 소스에서 대상으로 스트림됩니다. 하지만 이전 데이터를 스트림할지 여부를 지정할 수 있습니다.

이전 데이터를 소스에서 대상으로 스트림하려면 backfillAll 매개변수를 사용합니다.

Datastream에서는 또한 특정 데이터베이스 테이블에 대해서만 이전 데이터를 스트리밍할 수 있습니다. 이렇게 하려면 backfillAll 매개변수를 사용하고 이전 데이터가 필요하지 않은 테이블을 제외합니다.

진행 중인 변경사항만 대상으로 스트림하려면 backfillNone 매개변수를 사용합니다. 그런 다음 Datastream이 소스에서 모든 기존 데이터의 스냅샷을 스트림하도록 하려면 이 데이터가 포함된 객체의 백필을 수동으로 시작해야 합니다.

소스와 대상 사이에 데이터가 동기화되지 않은 경우에도 객체 백필을 시작해야 합니다. 예를 들어 사용자가 대상에서 데이터를 의도치 않게 삭제하여 데이터가 손실될 수 있습니다. 이 경우 객체 백필을 시작하면 모든 데이터가 한 번에 대상으로 스트리밍되기 때문에 객체 백필이 '재설정 메커니즘'과 같이 사용됩니다. 그 결과 소스와 대상 사이에 데이터가 동기화됩니다.

스트림 객체에 대한 백필을 시작하려면 먼저 객체 정보를 검색해야 합니다.

각 객체에는 객체를 고유하게 식별하는 OBJECT_ID가 포함됩니다. OBJECT_ID를 사용해서 스트림의 백필을 시작합니다.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

gcloud를 사용하여 스트림 객체에 대해 백필을 시작하는 방법에 대한 자세한 내용은 Google Cloud SDK 문서를 참조하세요.

스트림 객체에 대한 백필 중지

스트림 객체에 대한 백필을 시작한 후에는 객체에 대해 백필을 중지할 수 있습니다. 예를 들어 사용자가 데이터베이스 스키마를 수정하면 스키마 또는 데이터가 손상될 수 있습니다. 이러한 스키마 또는 데이터가 대상으로 스트리밍되지 않아야 하므로 객체에 대해 백필을 중지합니다.

또한 부하 분산 목적으로 객체 백필을 중지할 수도 있습니다. Datastream은 여러 백필을 병렬로 실행할 수 있습니다. 그러면 소스에 추가 부하가 발생할 수 있습니다. 이러한 부하가 상당히 크면 각 객체의 백필을 중지하고 하나씩 객체에 대해 백필을 시작합니다.

스트림의 객체에 대해 백필을 중지할 수 있으려면 먼저 스트림의 모든 객체에 대해 정보를 검색하도록 요청을 수행해야 합니다. 반환된 각 객체에는 객체를 고유하게 식별하는 OBJECT_ID가 포함됩니다. OBJECT_ID를 사용해서 스트림의 백필을 중지합니다.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

gcloud를 사용하여 스트림 객체에 대해 백필을 중지하는 방법을 보려면 여기를 클릭하세요.

최대 동시 CDC 태스크 수 변경

다음 코드는 MySQL 스트림의 최대 동시 변경 데이터 캡처(CDC) 태스크 수를 7로 설정하는 방법을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드는 maxConcurrentCdcTasks 필드입니다. 값을 7로 설정하면 최대 동시 CDC 태스크 수가 이전 값에서 7로 변경됩니다. 0에서 50(포함) 사이의 값을 사용할 수 있습니다. 값을 정의하지 않거나 0으로 정의하면 스트림에 대해 시스템 기본값인 5개 태스크가 설정됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

gcloud 사용에 대한 자세한 내용을 보려면 여기를 클릭하세요.

최대 동시 백필 태스크 수 변경

다음 코드는 MySQL 스트림의 최대 동시 백필 태스크 수를 25로 설정하는 방법을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드는 maxConcurrentBackfillTasks 필드입니다. 값을 25로 설정하면 최대 동시 백필 태스크 수가 이전 값에서 25로 변경됩니다. 0에서 50(포함) 사이의 값을 사용할 수 있습니다. 값을 정의하지 않거나 0으로 정의하면 스트림에 대해 시스템 기본값인 16개 태스크가 설정됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

gcloud 사용에 대한 자세한 내용을 보려면 여기를 클릭하세요.

Oracle 소스의 대용량 객체 스트리밍 사용 설정

Oracle 소스를 포함한 스트림에 대해 대용량 바이너리 객체(BLOB), 대용량 문자 객체(CLOB), 대용량 국가별 문자 객체(NCLOB)와 같은 대용량 객체의 스트리밍을 사용 설정할 수 있습니다. streamLargeObjects 플래그를 사용하면 신규 및 기존 스트림 모두에 대용량 객체를 포함할 수 있습니다. 플래그가 스트림 수준에 설정된 경우 대용량 객체 데이터 유형의 열을 지정할 필요가 없습니다.

다음 예시는 대용량 객체를 스트리밍할 수 있게 해주는 스트리밍을 만드는 방법을 보여줍니다.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

gcloud를 사용해서 스트림을 업데이트하는 방법은 Google Cloud SDK 문서를 참조하세요.

스트림 삭제

다음 코드는 스트림 삭제 요청을 보여줍니다.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

gcloud를 사용해서 스트림을 삭제하는 방법을 보려면 여기를 클릭하세요.

다음 단계