스트림 관리

개요

이 섹션에서는 Datastream API를 사용하여 다음 작업을 수행하는 방법을 알아봅니다.

  • 스트림 검증 및 생성
  • 스트림 정보 가져오기
  • 스트림에서 오류 검색
  • 스트림을 시작, 일시중지, 재개, 수정하여 스트림 업데이트
  • 스트림 삭제

Datastream API를 사용하는 방법에는 두 가지가 있습니다. REST API를 호출하거나 gcloud 명령줄 인터페이스 (CLI)를 사용할 수 있습니다.

gcloud을 사용하여 DataStream 스트림을 관리하는 방법에 대한 고급 정보를 보려면 여기를 클릭하세요.

스트림 검증

스트림을 만들기 전에 유효성을 검사할 수 있습니다. 이렇게 하면 스트림이 성공적으로 실행되고 모든 유효성 검사가 통과되도록 할 수 있습니다.

스트림 검사의 유효성 검사는 다음과 같습니다.

  • DataStream에서 데이터를 스트리밍할 수 있도록 소스가 올바르게 구성되었는지 여부입니다.
  • 스트림을 소스와 대상 모두에 연결할 수 있는지 여부입니다.
  • 스트림의 엔드 투 엔드 구성

다음 코드는 소스 Oracle 데이터베이스에서 Cloud Storage의 대상 버킷으로 데이터를 전송하는 데 사용되는 스트림의 유효성을 검사하는 요청을 보여줍니다.

REST

POST "https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]&validate_only=true"
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

&validate_only=true 값은 스트림만 확인한다는 것을 나타냅니다. 만드는 것이 아니라 또한 이 요청의 경우 전체 URL을 따옴표로 묶습니다. 이렇게 하면 Datastream이 스트림을 검증하기 위해 &validate_only=true 값을 선택합니다.

이 요청을 실행하면 데이터스트림이 소스와 대상에 대해 실행하는 유효성 검사와 확인 통과 또는 실패 여부가 표시됩니다. 통과하지 못한 유효성 검사의 경우 실패한 이유와 문제를 해결하기 위해 취해야 할 조치에 대한 정보가 표시됩니다.

올바르게 수정한 후 요청을 다시 실행하여 모든 유효성 검사가 통과하도록 하세요.

스트림 만들기

다음 코드는 소스 Oracle 데이터베이스에서 Cloud Storage의 대상 버킷으로 데이터를 전송하는 데 사용되는 스트림을 만드는 요청을 보여줍니다.

REST

POST https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams?streamId=[stream-id]
{
  "displayName": "[display name]",
  "sourceConfig": {
    "sourceConnectionProfileName": "[connectionProfileName]",
    "oracleSourceConfig": {
      "allowlist": {},
      "rejectlist": {}
    }
  }
  "destinationConfig": {
    "destinationConnectionProfileName": "[connectionProfileName]",
    "gcsDestinationConfig": {
      "path": "[filePrefix]",
      "avroFileFormat": "{}"
      "fileRotationMb": MBytes
      "fileRotationInterval": seconds
    }
  },
  "backfillAll": {}
}

예를 들어 다음은 schema1에서 모든 테이블과 schema3에서 두 특정 테이블인 tableA와 tableC를 가져오는 요청입니다.

이벤트는 Cloud Storage의 버킷에 Avro 형식으로 기록되며, 100MB 또는 30초마다 새 파일이 생성됩니다 (기본값은 50MB 및 60초).

backfillAll 매개변수는 이전 백필과 연결되어 있습니다. 이 매개변수를 빈 사전 ({})으로 설정하면 DataStream이 백필합니다.

  • 이전 데이터 및 소스 데이터베이스에서 대상으로의 지속적인 변경 데이터
  • 소스에서 대상까지의 스키마 및 테이블
POST https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
        streams/myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "backfillAll": {}
}

스트림에 대한 정보 가져오기

다음 코드는 스트림에 대한 정보 검색 요청을 보여줍니다. 이러한 정보에는 다음이 포함됩니다.

  • Datastream에서 인식되는 스트림의 이름
  • 스트림의 사용자 친화적인 이름 (표시 이름)
  • 스트림이 생성 및 업데이트된 날짜 및 시간 스탬프
  • 스트림과 연결된 소스 및 대상 연결 프로필에 대한 정보입니다.
  • 스트림의 상태

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]

예를 들면 다음과 같습니다.

GET https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

응답은 다음과 같이 표시됩니다.

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "allowlist": {
        "oracleSchemas": [
          {
            "schemaName": "schema1",
            "oracleTables": []
          },
          {
            "schemaName": "schema3",
            "oracleTables": [
              { "tableName": "tableA" },
              { "tableName": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": "{}",
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "backfillAll": {}
}

gcloud

gcloud를 사용하여 스트림에 대한 정보를 검색하는 방법을 자세히 알아보려면 여기를 클릭하세요.

스트림 나열

다음 코드는 모든 스트림에 대한 정보 검색 요청을 보여줍니다.

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams

gcloud

gcloud를 사용하여 모든 스트림에 대한 정보를 검색하는 방법을 자세히 알아보려면 여기를 클릭하세요.

스트림에서 오류 검색

다음 코드는 스트림이 제대로 작동하지 않는 것과 관련된 오류 목록 가져오기 요청을 보여줍니다.

REST

GET https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]:fetchStreamErrors

예를 들면 다음과 같습니다.

GET https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream:fetchStreamErrors

스트림 업데이트

스트림 시작

다음 코드는 스트림 시작 요청을 보여줍니다.

요청에 updateMask 매개변수를 사용하면 지정한 필드만 요청 본문에 포함되어야 합니다.

이 예시에서 지정된 필드는 스트림 상태 (또는 상태)를 나타내는 state 필드입니다. 스트림을 시작하면 상태가 CREATED에서 RUNNING로 변경됩니다.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

스트림 일시중지

다음 코드는 실행 중인 스트림을 일시중지하는 요청을 보여줍니다.

이 예시에서 updateMask 매개변수에 지정된 필드는 state 필드입니다. 스트림을 일시중지하면 상태를 RUNNING에서 PAUSED로 변경합니다.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "PAUSED"
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "PAUSED"
}

스트림 재개

다음 코드는 일시중지된 스트림을 재개하는 요청을 보여줍니다.

이 예시에서 updateMask 매개변수에 지정된 필드는 state 필드입니다. 스트림을 재개하면 상태를 PAUSED에서 RUNNING로 변경합니다.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=state
{
  "state": "RUNNING"
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=state
{
  "state": "RUNNING"
}

스트림 수정

다음 코드는 75MB 또는 45초마다 파일을 순환하도록 스트림의 파일 회전 구성 업데이트 요청을 보여줍니다.

이 예시에서 updateMask 매개변수에 지정된 필드에는 각각 destinationConfig.gcsDestinationConfig.fileRotationMb 플래그와 destinationConfig.gcsDestinationConfig.fileRotationInterval 플래그로 표시되는 fileRotationMbfileRotationInterval 필드가 포함됩니다.

REST

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

다음 코드는 Datastream이 Cloud Storage에 쓰는 파일 경로에 통합 유형 스키마 파일을 포함하도록 요청을 보여줍니다. 그 결과 Datastream은 JSON 데이터 파일과 Avro 스키마 파일이라는 두 가지 파일을 작성합니다.

이 예시에서 지정된 필드는 destinationConfig.gcsDestinationConfig.jsonFileFormat 플래그로 표시되는 jsonFileFormat 필드입니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

다음 코드는 데이터 데이터베이스에서 진행 중인 데이터 변경과 더불어 소스 데이터베이스에서 대상으로의 기존 데이터 복제 요청을 보여줍니다.

코드의 oracleExcludedObjects 섹션에는 대상으로 백필되지 않도록 제한된 테이블과 스키마가 표시됩니다.

이 예시에서는 schema3의 tableA를 제외한 모든 테이블과 스키마가 백필됩니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

예를 들면 다음과 같습니다.

PATCH https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schemaName": "schema3",
          "oracleTables": [
            {
              "tableName": "tableA"
            }
          ]
        }
      ]
    }
  }
}

스트림 삭제

다음 코드는 스트림 삭제 요청을 보여줍니다.

REST

DELETE https://datastream.googleapis.com/v1alpha1/projects/[project-id]/locations/
[location]/streams/[stream-id]

예를 들면 다음과 같습니다.

DELETE https://datastream.googleapis.com/v1alpha1/projects/myProjectId1/locations/
us-central1/streams/myOracleCdcStream

gcloud

gcloud을 사용하여 스트림을 삭제하는 방법을 자세히 알아보려면 여기를 클릭하세요.