스트림 관리

개요

이 섹션에서는 Datastream API를 사용하여 다음을 수행하는 방법을 알아봅니다.

  • 스트림 검증 및 만들기
  • 스트림 및 스트림 객체에 대한 정보 가져오기
  • 스트림을 시작, 일시중지, 다시 시작, 수정하고 스트림 객체에 대해 백필 시작 및 중지를 통해 스트림 업데이트
  • 스트림 삭제

Datastream API는 두 가지 방법으로 사용될 수 있습니다. REST API 호출을 수행하거나 gcloud 명령줄 도구(CLI)를 사용할 수 있습니다.

gcloud을 사용하여 Datastream 스트림을 관리하는 방법에 대한 대략적인 정보를 보려면 여기를 클릭하세요.

스트림 검증

스트림을 만들기 전에 검증할 수 있습니다. 이렇게 하면 스트림이 성공적으로 실행되고 모든 유효성 검사가 통과하도록 보장할 수 있습니다.

스트림 검증 시 다음을 확인합니다.

  • Datastream이 데이터를 스트림하도록 소스가 올바르게 구성되었는지 확인합니다.
  • 스트림이 소스 및 대상에 모두 연결할 수 있는지 확인합니다.
  • 스트림의 엔드 투 엔드 구성을 확인합니다.

다음 코드는 소스 Oracle 데이터베이스에서 Cloud Storage의 대상 버킷으로 데이터를 전송하는 데 사용되는 스트림의 유효성을 검사하는 요청을 보여줍니다.

REST

POST "https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {  "customerManagedEncryptionKey": "projects/datastream-con-ronteller/locations/us-central1/keyRings/ringy/cryptoKeys/nicekey",
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

&validate_only=true 값은 스트림 검증만 수행하고 만들지 않음을 나타냅니다. 또한 이 요청의 경우 전체 URL을 따옴표 안에 넣습니다. 이렇게 하면 Datastream이 &validate_only=true 값을 선택하여 스트림을 검증합니다.

이 요청을 수행하면 소스 및 대상에서 Datastream이 실행하는 유효성 검사와 함께 검사의 통과 또는 실패가 표시됩니다. 통과하지 못한 검증 검사의 경우 실패한 이유와 문제 해결을 위한 수행 작업과 관련된 정보가 표시됩니다.

예를 들어 Datastream이 소스에서 대상으로 스트리밍되는 데이터를 암호화하는 데 사용할 고객 관리 암호화 키(CMEK)가 있다고 가정해보세요.

스트림 검증 과정에서 Datastream은 키가 존재하고 Datastream에 키를 사용할 권한이 있는지 확인합니다.

위 조건 중 하나라도 충족되지 않으면 스트림을 검증할 때 다음과 같은 오류 메시지가 반환됩니다.

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

이 문제를 해결하려면 입력한 키가 있는지, DataStream 서비스 계정에 키에 대한 cloudkms.cryptoKeys.get 권한이 있는지 확인하세요.

적절한 수정 조치를 취한 후 다시 요청을 수행하여 모든 검증 검사가 통과하는지 확인합니다. 위의 예시에서 CMEK_VALIDATE_PERMISSIONS 확인은 더 이상 오류 메시지를 반환하지 않지만 PASSED 상태입니다.

스트림 만들기

다음 코드는 소스 Oracle 데이터베이스에서 Cloud Storage의 대상 버킷으로 데이터를 전송하는 데 사용되는 스트림을 만드는 요청을 보여줍니다.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

예를 들어 다음은 schema1의 모든 테이블과 schema3의 두 가지 특정 테이블인 tableA와 tableC를 가져오는 요청입니다.

이벤트는 Cloud Storage의 Avro 형식으로 버킷에 작성되며, 100MB 또는 30초마다(기본값인 50MB 및 60초보다 우선됨) 새 파일이 생성됩니다.

Google Cloud에 저장된 모든 데이터는 암호화된 자체 데이터에 사용되는 강화 키 관리 시스템을 사용하여 저장 상태에서 암호화됩니다. 이러한 키 관리 시스템은 엄격한 키 액세스 제어 및 감사 기능을 제공하며 AES-256 암호화 표준을 사용하여 사용자 저장 데이터를 암호화합니다. 설정, 구성 또는 관리가 필요하지 않습니다. Google Cloud의 기본 저장 데이터 암호화는 암호화 자료의 규정 준수 또는 지역과 관련된 특정 요구사항이 없는 사용자에게 가장 적합합니다.

Google Cloud 프로젝트 내에서 저장 데이터를 암호화하는 데 사용되는 키를 더 세밀하게 제어해야 하는 경우 DataStream은 Cloud Key Management Service(KMS)에서 관리하는 암호화 키를 사용하여 데이터를 보호하는 기능을 제공합니다. 이러한 암호화 키를 고객 관리 암호화 키(CMEK)라고 합니다. CMEK로 Datastream의 데이터를 보호하는 경우 CMEK를 제어할 수 있습니다.

customerManagedEncryptionKey 매개변수는 Datastream이 소스에서 대상으로 스트리밍되는 데이터를 암호화하는 데 사용할 수 있는 CMEK를 포함하는 것과 연결됩니다. CMEK는 [customer-managed-encryption-key] 자리표시자로 표시됩니다.

[ring] 자리표시자는 CMEK의 키링을 나타냅니다. 키링은 특정 Google Cloud 위치의 키를 구성하고 키 그룹의 액세스 제어를 관리할 수 있게 해줍니다. 키링 이름은 Google Cloud 프로젝트 전체에서 고유할 필요는 없지만 지정된 위치 내에서 고유해야 합니다. 키링에 대한 자세한 내용은 Cloud KMS 리소스를 참조하세요.

스트림을 만드는 과정에서 Datastream은 CMEK가 존재하고 Datastream에 키를 사용할 권한이 있는지 확인합니다. 이러한 확인에 대한 자세한 내용은 스트림 검증을 참조하세요.

CMEK 대신 Google Cloud의 내부 키 관리 시스템을 사용하여 데이터를 암호화하려면 API 요청에 customerManagedEncryptionKey 매개변수와 값을 포함하지 마세요.

backfillAll 매개변수는 이전 백필과 연결됩니다. 이 매개변수를 빈 사전({})으로 설정하면 Datastream이 다음 항목을 백필합니다.

  • 지속적인 데이터 변경사항 외에도 소스 데이터베이스에서 대상으로의 내역 데이터
  • 소스에서 대상으로의 스키마 및 테이블
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

스트림 정보 가져오기

다음 코드는 스트림 정보를 검색하기 위한 요청을 보여줍니다. 이러한 정보에는 다음이 포함됩니다.

  • Datastream에서 인식되는 스트림 이름
  • 스트림의 사용자 친화적인 이름(표시 이름)
  • 스트림이 만들어지고 업데이트된 날짜 및 시간 스탬프
  • 스트림과 연결된 소스 및 대상 연결 프로필 정보
  • 스트림의 상태

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

예를 들면 다음과 같습니다.

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

응답이 다음과 같이 표시됩니다.

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/[project-id]/locations/[location]/
  keyRings/[ring]/cryptoKeys/[customer-managed-encryption-key]",
  "backfillAll": {}
}

gcloud

gcloud를 사용하여 스트림 정보를 검색하는 방법에 대한 자세한 내용은 여기를 클릭하세요.

스트림 나열

다음 코드는 모든 스트림의 정보를 검색하기 위한 요청을 보여줍니다.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams

gcloud

gcloud를 사용하여 모든 스트림의 정보를 검색하는 방법에 대한 자세한 내용은 여기를 클릭하세요.

스트림의 객체 나열

다음 코드는 스트림의 모든 객체에 대한 정보를 검색하기 위한 요청을 보여줍니다.

REST

GET https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects

예를 들면 다음과 같습니다.

GET https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myMySQLCdcStream/objects

반환되는 객체 목록은 다음과 비슷하게 표시됩니다.

REST

{
  "streamObjects": [
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/[project-id]/locations/[location]/streams/[stream-id]/
      objects/[object-id]",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

스트림 업데이트

스트림 시작

다음 코드는 스트림 시작 요청을 보여줍니다.

요청에서 updateMask 매개변수를 사용하여 사용자가 지정하는 필드만 요청 본문에 포함해야 합니다.

이 예시에서 지정된 필드는 스트림 상태를 나타내는 state 필드입니다. 스트림을 시작하면 상태가 CREATED에서 RUNNING으로 변경됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

스트림 일시중지

다음 코드는 실행 중인 스트림을 일시중지하는 요청을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드는 state 필드입니다. 스트림을 일시중지하면 상태가 RUNNING에서 PAUSED로 변경됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

스트림 재개

다음 코드는 일시중지된 스트림을 재개하는 요청을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드는 state 필드입니다. 스트림을 재개하면 상태가 PAUSED에서 RUNNING으로 변경됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

스트림 수정

다음 코드는 75MB 또는 45초마다 파일을 순환하도록 스트림의 파일 순환 구성을 업데이트하는 요청을 보여줍니다.

이 예시의 경우 updateMask 매개변수에 지정된 필드에는 destinationConfig.gcsDestinationConfig.fileRotationMbdestinationConfig.gcsDestinationConfig.fileRotationInterval 플래그로 각각 표시되는 fileRotationMbfileRotationInterval 필드가 포함됩니다.

REST

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

다음 코드는 Datastream이 Cloud Storage에 쓰는 파일의 경로에 통합 유형 스키마 파일을 포함하도록 하는 요청을 보여줍니다. 따라서 Datastream은 JSON 데이터 파일과 Avro 스키마 파일 두 가지 파일을 기록합니다.

이 예시의 경우 지정된 필드는 jsonFileFormat 필드이며 destinationConfig.gcsDestinationConfig.jsonFileFormat 플래그로 표시됩니다.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

다음 코드는 소스 데이터베이스에서 대상으로 진행 중인 데이터 변경 외에 Datastream이 기존 데이터를 복제하도록 하는 요청을 보여줍니다.

코드의 oracleExcludedObjects 섹션에는 대상 위치로의 백필이 제한된 테이블 및 스키마가 표시됩니다.

이 예시의 경우 schema3의 tableA를 제외한 모든 테이블과 스키마가 백필됩니다.

PATCH https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

스트림 객체의 백필 시작

Datastream의 스트림은 이전 데이터를 백필하고 진행 중인 변경사항을 대상으로 스트림할 수 있습니다. 진행 중인 변경사항은 항상 소스에서 대상으로 스트림됩니다. 하지만 이전 데이터를 스트림할지 여부를 지정할 수 있습니다.

이전 데이터를 소스에서 대상으로 스트림하려면 backfillAll 매개변수를 사용합니다.

또한 Datastream을 사용하면 특정 데이터베이스 테이블의 이전 데이터만 스트리밍할 수 있습니다. 이렇게 하려면 backfillAll 매개변수를 사용하고 이전 데이터가 필요하지 않은 테이블을 제외합니다.

진행 중인 변경사항만 대상으로 스트림하려면 backfillNone 매개변수를 사용합니다. 그런 다음 Datastream이 소스에서 모든 기존 데이터의 스냅샷을 스트림하도록 하려면 이 데이터가 포함된 객체의 백필을 수동으로 시작해야 합니다.

소스와 대상 사이에 데이터가 동기화되지 않은 경우에도 객체 백필을 시작해야 합니다. 예를 들어 사용자가 대상에서 데이터를 의도치 않게 삭제하여 데이터가 손실될 수 있습니다. 이 경우 객체 백필을 시작하면 모든 데이터가 한 번에 대상으로 스트리밍되기 때문에 객체 백필이 '재설정 메커니즘'과 같이 사용됩니다. 그 결과 소스와 대상 사이에 데이터가 동기화됩니다.

스트림 객체에 대한 백필을 시작하려면 먼저 객체 정보를 검색해야 합니다.

각 객체에는 객체를 고유하게 식별하는 [object-id]가 있습니다. [object-id]를 사용하여 스트림의 백필을 시작합니다.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:startBackfillJob

스트림 객체의 백필 중지

스트림 객체의 백필을 시작한 후 객체의 백필을 중지할 수 있습니다. 예를 들어 사용자가 데이터베이스 스키마를 수정하면 스키마 또는 데이터가 손상될 수 있습니다. 이러한 스키마 또는 데이터가 대상으로 스트리밍되지 않아야 하므로 객체에 대해 백필을 중지합니다.

부하 분산을 위해 객체의 백필을 중지할 수도 있습니다. Datastream은 여러 백필을 병렬로 실행할 수 있습니다. 그러면 소스에 추가 부하가 발생할 수 있습니다. 이러한 부하가 상당히 크면 각 객체의 백필을 중지하고 객체 하나씩 백필을 시작합니다.

스트림 객체의 백필을 중지하려면 먼저 스트림의 모든 객체에 대한 정보를 검색하도록 요청해야 합니다. 반환된 각 객체에는 객체를 고유하게 식별하는 [object-id]가 있습니다. 스트림의 백필을 중지하려면 [object-id]를 사용합니다.

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]/objects/[object-id]:stopBackfillJob

스트림 삭제

다음 코드는 스트림 삭제 요청을 보여줍니다.

REST

DELETE https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]

예를 들면 다음과 같습니다.

DELETE https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

gcloud를 사용하여 스트림을 삭제하는 방법에 대한 자세한 내용은 여기를 클릭하세요.