이 페이지에서는 다음과 같은 변경 내역 속성에 대해 자세히 설명합니다.
- 분할 기반 파티셔닝 모델
- 변경 내역 레코드의 형식 및 콘텐츠
- 이러한 레코드를 쿼리하는 데 사용되는 로우 레벨 구문
- 쿼리 워크플로의 예시
이 페이지의 정보는 Spanner API를 사용해 변경 내역을 직접 쿼리하는 작업과 관련되어 있습니다. Dataflow를 사용하여 변경 내역 데이터를 읽는 애플리케이션은 여기에 설명된 데이터 모델로 직접 작업할 필요가 없습니다.
변경 내역에 대한 내용을 폭넓게 다룬 입문 가이드는 변경 내역 개요를 참조하세요.
변경 내역 파티션
변경 내역으로 감시되는 테이블에 변경사항이 발생하면 Spanner가 데이터 변경과 동일한 트랜잭션으로 데이터베이스에 해당 변경 내역 레코드를 동기식으로 작성합니다. 이를 통해 트랜잭션이 성공하면 Spanner도 변경사항을 성공적으로 캡처하고 지속하게 됩니다. 내부적으로 Spanner는 변경 내역 레코드와 데이터 변경사항을 같은 위치에 배치하여 동일한 서버에서 이를 처리함으로써 쓰기 오버헤드를 최소화합니다.
특정 분할에 대한 DML의 일부로 Spanner는 동일한 트랜잭션에서 해당하는 변경 내역 데이터 분할에 쓰기를 추가합니다. 이러한 코로케이션으로 인해 변경 내역이 제공 리소스 간에 추가 조정을 더하지 않아 트랜잭션 커밋 오버헤드가 최소화됩니다.
Spanner는 데이터베이스 부하 및 크기를 기준으로 동적으로 분할 및 병합을 수행하고 여러 제공 리소스 간에 분할을 분산하여 확장됩니다.
변경 내역 쓰기 및 읽기를 확장할 수 있도록 Spanner는 내부 변경 내역 스토리지를 데이터베이스 데이터와 함께 분할하고 병합하여 핫스팟을 자동으로 방지합니다. 데이터베이스 쓰기가 확장될 때 거의 실시간으로 변경 내역 레코드를 읽도록 지원하기 위해 Spanner API는 변경 내역 파티션을 사용하여 변경 내역을 동시에 쿼리할 수 있도록 설계되었습니다. 변경 내역 파티션은 변경 내역 레코드가 포함된 변경 내역 데이터 분할에 매핑됩니다. 시간이 지나면서 변경 내역의 파티션이 동적으로 변경되며 Spanner가 데이터베이스 데이터를 동적으로 분할하고 병합하는 방식과 연관됩니다.
변경 내역 파티션에는 특정 기간의 변경할 수 없는 키 범위에 대한 레코드가 포함됩니다. 변경 내역 파티션은 하나 이상의 변경 내역 파티션으로 분할하거나 다른 변경 내역 파티션과 병합할 수 있습니다. 이러한 분할 또는 병합 이벤트가 발생하면 다음 기간의 변경할 수 없는 각 키 범위의 변경사항을 캡처하기 위해 하위 파티션이 생성됩니다. 데이터 변경 레코드 외에도 변경 내역 쿼리는 하위 파티션 레코드를 반환하여 쿼리해야 하는 새 변경 내역 파티션을 리더에 알리고 최근에 쓰기가 발생한 적이 없다면 전달 진행 상태를 나타내는 하트비트 레코드도 반환합니다.
특정 변경 내역 파티션을 쿼리할 때는 변경 레코드가 커밋 타임스탬프 순서로 반환됩니다. 각 변경 레코드는 정확히 한 번 반환됩니다. 변경 내역 파티션에서 변경 레코드 순서는 보장되지 않습니다. 특정 기본 키의 변경 레코드는 특정 시간 범위에 하나의 파티션에서만 반환됩니다.
커밋 타임스탬프 순서로 특정 키의 변경사항을 처리하려면 상위-하위 파티션 계보로 인해 모든 상위 파티션의 레코드가 처리된 후에 하위 파티션에서 반환된 레코드를 처리해야 합니다.
변경 내역 읽기 함수 및 쿼리 구문
GoogleSQL
ExecuteStreamingSql
API를 사용하여 변경 내역을 쿼리합니다. Spanner는 변경 내역과 함께 특수 읽기 함수를 자동으로 만듭니다. 읽기 함수는 변경 내역 레코드에 대한 액세스 권한을 제공합니다. 읽기 함수 이름 지정 규칙은 READ_change_stream_name
입니다.
변경 내역 SingersNameStream
이 데이터베이스에 있다고 가정하면 GoogleSQL의 쿼리 구문은 다음과 같습니다.
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
읽기 함수에서는 다음 인수를 허용합니다.
인수 이름 | 유형 | 필수 여부 | 설명 |
---|---|---|---|
start_timestamp |
TIMESTAMP |
필수 | commit_timestamp 가 start_timestamp 보다 크거나 같은 레코드를 반환하도록 지정합니다. 값이 변경 내역 보관 기간 내에 있어야 하며 현재 시간보다 작거나 같아야 하고 변경 내역 생성 타임스탬프보다 크거나 같아야 합니다. |
end_timestamp |
TIMESTAMP |
선택사항(기본값: NULL ) |
commit_timestamp 가 end_timestamp 보다 작거나 같은 레코드를 반환하도록 지정합니다. 값이 변경 내역 보관 기간 내에 있어야 하며 start_timestamp 보다 크거나 같아야 합니다. end_timestamp 까지 모든 ChangeRecord가 반환되거나 사용자가 연결을 종료하면 쿼리가 완료됩니다. NULL 이 반환되거나 지정되지 않으면 모든 ChangeRecord가 반환되거나 사용자가 연결을 종료할 때까지 쿼리가 실행됩니다. |
partition_token |
STRING |
선택사항(기본값: NULL ) |
하위 파티션 레코드의 콘텐츠를 기반으로 쿼리할 변경 내역 파티션을 지정합니다. NULL 이거나 지정되지 않은 경우 리더가 변경 내역을 처음 쿼리하고 쿼리할 특정 파티션 토큰을 가져오지 않았음을 의미합니다. |
heartbeat_milliseconds |
INT64 |
필수 | 이 파티션에 커밋된 트랜잭션이 없는 경우 하트비트 ChangeRecord가 반환되는 빈도를 결정합니다.
값은 1,000 (1초)에서 300,000 (5분) 사이여야 합니다. |
read_options |
ARRAY |
선택사항(기본값: NULL ) |
나중에 사용할 수 있도록 예약된 추가 읽기 옵션입니다. 현재 유일하게 허용되는 값은 NULL 입니다. |
다음 예시와 같이 읽기 함수 쿼리 텍스트를 빌드하고 매개변수를 바인딩하는 편의 메서드를 사용하는 것이 좋습니다.
자바
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
ExecuteStreamingSql
API를 사용하여 변경 내역을 쿼리합니다.
Spanner는 변경 내역과 함께 특수 읽기 함수를 자동으로 만듭니다. 읽기 함수는 변경 내역 레코드에 대한 액세스 권한을 제공합니다. 읽기 함수 이름 지정 규칙은 spanner.read_json_change_stream_name
입니다.
변경 내역 SingersNameStream
이 데이터베이스에 있다고 가정하면 PostgreSQL의 쿼리 구문은 다음과 같습니다.
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
읽기 함수에서는 다음 인수를 허용합니다.
인수 이름 | 유형 | 필수 여부 | 설명 |
---|---|---|---|
start_timestamp |
timestamp with time zone |
필수 | commit_timestamp 가 start_timestamp 보다 크거나 같은 변경 레코드를 반환하도록 지정합니다. 값이 변경 내역 보관 기간 내에 있어야 하며 현재 시간보다 작거나 같아야 하고 변경 내역 생성 타임스탬프보다 크거나 같아야 합니다. |
end_timestamp |
timestamp with timezone |
선택사항(기본값: NULL ) |
commit_timestamp 가 end_timestamp 보다 작거나 같은 변경 레코드를 반환하도록 지정합니다. 값이 변경 내역 보관 기간 내에 있어야 하며 start_timestamp 보다 크거나 같아야 합니다.
end_timestamp 까지 모든 변경 레코드가 반환되거나 사용자가 연결을 종료하면 쿼리가 완료됩니다.
NULL 인 경우 모든 변경 레코드가 반환되거나 사용자가 연결을 종료할 때까지 쿼리가 실행됩니다. |
partition_token |
text |
선택사항(기본값: NULL ) |
하위 파티션 레코드의 콘텐츠를 기반으로 쿼리할 변경 내역 파티션을 지정합니다. NULL 이거나 지정되지 않은 경우 리더가 변경 내역을 처음 쿼리하고 쿼리할 특정 파티션 토큰을 가져오지 않았음을 의미합니다. |
heartbeat_milliseconds |
bigint |
필수 | 이 파티션에 커밋된 트랜잭션이 없는 경우 하트비트 ChangeRecord가 반환되는 빈도를 결정합니다.
값은 1,000 (1초)에서 300,000 (5분) 사이여야 합니다. |
null |
null |
필수 | 나중에 사용하기 위해 예약됨 |
다음 예시와 같이 읽기 함수의 텍스트를 빌드하고 매개변수를 바인딩하는 편의 메서드를 사용하는 것이 좋습니다.
자바
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
변경 내역 레코드 형식
GoogleSQL
변경 내역 읽기 함수에서 ARRAY<STRUCT<...>>
유형의 단일 ChangeRecord
열을 반환합니다. 각 열에서 이 배열은 항상 단일 요소를 포함합니다.
배열 요소 유형은 다음과 같습니다.
STRUCT <
data_change_record ARRAY<STRUCT<...>>,
heartbeat_record ARRAY<STRUCT<...>>,
child_partitions_record ARRAY<STRUCT<...>>
>
이 구조체에는 data_change_record
, heartbeat_record
, child_partitions_record
의 3개 필드가 있으며 각 필드의 유형은 ARRAY<STRUCT<...>>
입니다. 변경 내역 읽기 함수에서 반환하는 모든 행의 이 3가지 필드 중 하나에만 값이 포함되며 나머지 두 개는 비어 있거나 NULL
입니다. 이러한 배열 필드에는 요소가 최대 하나 포함됩니다.
다음 섹션에서는 이러한 3가지 레코드 유형을 각각 살펴봅니다.
PostgreSQL
변경 내역 읽기 함수는 다음 구조를 사용하여 JSON
유형의 단일 ChangeRecord
열을 반환합니다.
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
이 객체에 가능한 세 가지 키는 data_change_record
, heartbeat_record
, child_partitions_record
이며 해당 값 유형은 JSON
입니다.
변경 내역 읽기 함수에서 반환하는 행에는 이 3가지 키 중 하나만 있습니다.
다음 섹션에서는 이러한 3가지 레코드 유형을 각각 살펴봅니다.
데이터 변경 레코드
데이터 변경 레코드에는 동일한 트랜잭션에 대해 한 변경 내역 파티션에서 같은 커밋 타임스탬프로 커밋된 동일한 수정 유형(삽입, 업데이트 또는 삭제)이 있는 테이블 변경사항 집합이 포함됩니다. 여러 변경 내역 파티션에서 동일한 트랜잭션에 대해 여러 데이터 변경 레코드가 반환될 수 있습니다.
모든 데이터 변경 레코드에는 내역 레코드에서 변경 내역의 순서를 결정하는 commit_timestamp
, server_transaction_id
, record_sequence
필드가 있습니다. 이 3가지 필드면 변경사항의 순서를 도출하고 외부 일관성을 제공하는 데 충분합니다.
여러 트랜잭션이 겹치지 않는 데이터를 이용하는 경우 동일한 커밋 타임스탬프를 가질 수 있습니다. server_transaction_id
필드를 사용하면 동일한 트랜잭션 내에서 실행된 (변경 내역 파티션의 잠재적) 변경사항 집합을 구분할 수 있습니다. record_sequence
및 number_of_records_in_transaction
필드와 페어링하면 특정 트랜잭션의 모든 레코드도 버퍼링하고 정렬할 수 있습니다.
데이터 변경 레코드의 필드에는 다음이 포함됩니다.
GoogleSQL
필드 | 유형 | 설명 |
---|---|---|
commit_timestamp |
TIMESTAMP |
변경사항이 커밋된 타임스탬프입니다. |
record_sequence |
STRING |
트랜잭션 내 레코드의 시퀀스 수입니다. 시퀀스 번호는 트랜잭션 내에서 고유하고 단조롭게(단, 연속적일 필요는 없음) 증가하도록 보장됩니다. 같은 server_transaction_id 에 대한 레코드를 record_sequence 별로 정렬하여 트랜잭션 내 변경사항 순서를 재구성합니다.
이 순서는 성능 향상을 위해 Spanner에서 최적화될 수 있으며 사용자가 제공한 원래 순서와 항상 일치하지 않을 수 있습니다. |
server_transaction_id |
STRING |
변경사항이 커밋된 트랜잭션을 나타내는 전역적으로 고유한 문자열입니다. 이 값은 변경 내역 레코드를 처리할 때에만 사용해야 하며 Spanner API의 트랜잭션 ID와는 상관 관계가 없습니다. |
is_last_record_in_transaction_in_partition |
BOOL |
현재 파티션의 트랜잭션에 대한 마지막 레코드인지 여부를 나타냅니다. |
table_name |
STRING |
변경사항의 영향을 받은 테이블의 이름입니다. |
value_capture_type |
STRING |
이 변경사항이 캡처되었을 때 변경 내역 구성에 지정된 값 캡처 유형을 설명합니다. 값 캡처 유형은 |
column_types |
ARRAY<STRUCT< |
열 이름, 열 유형, 기본 키인지 여부, 스키마에 정의된 열의 위치(`ordinal_position`)입니다. 스키마의 테이블 첫 번째 열의 서수 위치는 '1'입니다. 열 유형이 배열 열에서 중첩될 수 있습니다. 형식은 Spanner API 참조에 설명된 유형 구조와 일치합니다. |
mods |
ARRAY<STRUCT< |
기본 키 값, 이전 값, 변경되거나 추적된 열의 새 값을 포함하여 적용된 변경사항을 설명합니다.
이전 값과 새 값의 가용성과 내용은 구성된 value_capture_type에 따라 다릅니다. new_values 및 old_values 필드에는 키 이외의 열만 포함됩니다. |
mod_type |
STRING |
변경사항의 유형을 설명합니다. INSERT , UPDATE 또는 DELETE 중 하나입니다. |
number_of_records_in_transaction |
INT64 |
모든 변경 내역 파티션에서 이 트랜잭션에 포함된 데이터 변경 레코드 수입니다. |
number_of_partitions_in_transaction |
INT64 |
이 트랜잭션의 데이터 변경 레코드를 반환할 파티션 수입니다. |
transaction_tag |
STRING |
이 트랜잭션과 연결된 트랜잭션 태그입니다. |
is_system_transaction |
BOOL |
트랜잭션이 시스템 트랜잭션인지 여부를 나타냅니다. |
PostgreSQL
필드 | 유형 | 설명 |
---|---|---|
commit_timestamp |
STRING |
변경사항이 커밋된 타임스탬프입니다. |
record_sequence |
STRING |
트랜잭션 내 레코드의 시퀀스 수입니다. 시퀀스 번호는 트랜잭션 내에서 고유하고 단조롭게(단, 연속적일 필요는 없음) 증가하도록 보장됩니다. 동일한 `server_transaction_id`에 대한 레코드를 `record_sequence`별로 정렬하여 트랜잭션 내의 변경사항 순서를 재구성합니다. |
server_transaction_id |
STRING |
변경사항이 커밋된 트랜잭션을 나타내는 전역적으로 고유한 문자열입니다. 이 값은 변경 내역 레코드를 처리할 때에만 사용해야 하며 Spanner API의 트랜잭션 ID와는 상관 관계가 없습니다. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
현재 파티션의 트랜잭션에 대한 마지막 레코드인지 여부를 나타냅니다. |
table_name |
STRING |
변경사항의 영향을 받은 테이블의 이름입니다. |
value_capture_type |
STRING |
이 변경사항이 캡처되었을 때 변경 내역 구성에 지정된 값 캡처 유형을 설명합니다. 값 캡처 유형은 |
column_types |
[ { "name": <STRING>, "type": { "code": <STRING> }, "is_primary_key": <BOOLEAN>, "ordinal_position": <NUMBER> }, ... ] |
열 이름, 열 유형, 기본 키인지 여부, 스키마에 정의된 열의 위치(`ordinal_position`)입니다. 스키마의 테이블 첫 번째 열의 서수 위치는 '1'입니다. 열 유형이 배열 열에서 중첩될 수 있습니다. 형식은 Spanner API 참조에 설명된 유형 구조와 일치합니다. |
mods |
[ { "keys": {<STRING> : <STRING>}, "new_values": { <STRING> : <VALUE-TYPE>, [...] }, "old_values": { <STRING> : <VALUE-TYPE>, [...] }, }, [...] ] |
기본 키 값, 이전 값, 변경되거나 추적된 열의 새 값을 포함하여 적용된 변경사항을 설명합니다. 이전 값과 새 값의 가용성과 내용은 구성된 value_capture_type에 따라 다릅니다. new_values 및 old_values 필드에는 키 이외의 열만 포함됩니다.
|
mod_type |
STRING |
변경사항의 유형을 설명합니다. INSERT , UPDATE 또는 DELETE 중 하나입니다. |
number_of_records_in_transaction |
INT64 |
모든 변경 내역 파티션에서 이 트랜잭션에 포함된 데이터 변경 레코드 수입니다. |
number_of_partitions_in_transaction |
NUMBER |
이 트랜잭션의 데이터 변경 레코드를 반환할 파티션 수입니다. |
transaction_tag |
STRING |
이 트랜잭션과 연결된 트랜잭션 태그입니다. |
is_system_transaction |
BOOLEAN |
트랜잭션이 시스템 트랜잭션인지 여부를 나타냅니다. |
다음은 예시 데이터 변경 레코드 쌍입니다. 두 계정 간에 전송되는 단일 트랜잭션을 설명합니다. 두 계정은 별도의 변경 내역 파티션에 있습니다.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
다음 데이터 변경 레코드는 값 캡처 유형이 "NEW_VALUES"
인 레코드의 예시입니다. 새 값만 채워집니다.
"LastUpdate"
열만 수정되었으므로 해당 열만 반환되었습니다.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
다음 데이터 변경 레코드는 값 캡처 유형이 "NEW_ROW"
인 레코드의 예시입니다. "LastUpdate"
열만 수정되었지만 추적된 모든 열이 반환됩니다.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
다음 데이터 변경 레코드는 값 캡처 유형이 "NEW_ROW_AND_OLD_VALUES"
인 레코드의 예시입니다. "LastUpdate"
열만 수정되었지만 추적된 모든 열이 반환됩니다. 이 값 캡처 유형은 LastUpdate
의 새 값과 이전 값을 캡처합니다.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
하트비트 레코드
하드비트 레코드가 반환되면 하트비트 레코드의 timestamp
보다 작거나 같은 commit_timestamp
를 가진 모든 변경사항이 반환되었다는 의미입니다. 이 파티션의 향후 데이터 레코드는 하트비트 레코드에서 반환된 것보다 높은 커밋 타임스탬프를 가져야 합니다. 파티션에 기록된 데이터 변경사항이 없을 때 하트비트 레코드가 반환됩니다. 파티션에 기록된 데이터 변경사항이 있는 경우 heartbeat_record.timestamp
대신 data_change_record.commit_timestamp
를 사용하면 리더가 파티션 읽기를 진행 중임을 알 수 있습니다.
파티션에 반환된 하트비트 레코드를 사용해서 모든 파티션에서 리더를 동기화할 수 있습니다. 모든 리더가 특정 타임스탬프 A
보다 크거나 같은 하트비트를 수신했거나 타임스탬프 A
보다 크거나 같은 데이터 또는 하위 파티션 레코드를 수신하면 리더가 타임스탬프 A
또는 그 이전에 커밋된 모든 레코드가 수신되었음을 알 수 있습니다. 예를 들어 타임스탬프별로 교차 파티션 레코드를 정렬하고 server_transaction_id
로 그룹화하여 버퍼링된 레코드의 처리를 시작할 수 있습니다.
하트비트 레코드에는 하나의 필드만 포함됩니다.
GoogleSQL
필드 | 유형 | 설명 |
---|---|---|
timestamp |
TIMESTAMP |
하트비트 레코드의 타임스탬프입니다. |
PostgreSQL
필드 | 유형 | 설명 |
---|---|---|
timestamp |
STRING |
하트비트 레코드의 타임스탬프입니다. |
타임스탬프가 이 레코드의 타임스탬프보다 작거나 같은 모든 레코드가 반환되었음을 알리는 하트비트 레코드의 예시입니다.
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
하위 파티션 레코드
하위 파티션 레코드는 파티션 토큰, 상위 파티션의 토큰, 하위 파티션에 변경 레코드가 포함된 가장 오래된 타임스탬프를 나타내는 start_timestamp
등 하위 파티션에 대한 정보를 반환합니다. 커밋 타임스탬프가 child_partitions_record.start_timestamp
직전인 레코드는 현재 파티션에서 반환됩니다. 이 파티션의 모든 하위 파티션 레코드를 반환한 후 이 쿼리는 성공 상태를 반환해 이 파티션에 대한 모든 레코드가 반환되었음을 나타냅니다.
하위 파티션 레코드의 필드에는 다음이 포함됩니다.
GoogleSQL
필드 | 유형 | 설명 |
---|---|---|
start_timestamp |
TIMESTAMP |
이 하위 파티션 레코드의 하위 파티션에서 반환된 데이터 변경 레코드의 커밋 타임스탬프는 start_timestamp 보다 크거나 같습니다. 하위 파티션을 쿼리할 때는 쿼리에서 하위 파티션 토큰과 child_partitions_token.start_timestamp 이상의 start_timestamp 를 지정해야 합니다. 파티션에서 반환된 모든 하위 파티션 레코드는 동일한 start_timestamp 를 가지며 타임스탬프는 항상 쿼리에 지정된 start_timestamp 와 end_timestamp 사이에 속합니다. |
record_sequence |
STRING |
특정 파티션에 동일한 start_timestamp로 반환된 하위 파티션 레코드가 여러 개 있는 경우 하위 파티션 레코드의 순서를 정의하는 데 사용할 수 있는 단조롭게 증가하는 시퀀스 번호입니다. 파티션 토큰인 start_timestamp 와 record_sequence 는 하위 파티션 레코드를 고유하게 식별합니다. |
child_partitions |
ARRAY<STRUCT< |
하위 파티션 집합 및 관련 정보를 반환합니다. 여기에는 쿼리의 하위 파티션을 식별하는 데 사용되는 파티션 토큰 문자열과 해당 상위 파티션의 토큰이 포함됩니다. |
PostgreSQL
필드 | 유형 | 설명 |
---|---|---|
start_timestamp |
STRING |
이 하위 파티션 레코드의 하위 파티션에서 반환된 데이터 변경 레코드의 커밋 타임스탬프는 start_timestamp 보다 크거나 같습니다. 하위 파티션을 쿼리할 때는 쿼리에서 하위 파티션 토큰과 child_partitions_token.start_timestamp 이상의 start_timestamp 를 지정해야 합니다. 파티션에서 반환된 모든 하위 파티션 레코드는 동일한 start_timestamp 를 가지며 타임스탬프는 항상 쿼리에 지정된 start_timestamp 와 end_timestamp 사이에 속합니다.
|
record_sequence |
STRING |
특정 파티션에 동일한 start_timestamp로 반환된 하위 파티션 레코드가 여러 개 있는 경우 하위 파티션 레코드의 순서를 정의하는 데 사용할 수 있는 단조롭게 증가하는 시퀀스 번호입니다. 파티션 토큰인 start_timestamp 와 record_sequence 는 하위 파티션 레코드를 고유하게 식별합니다. |
child_partitions |
[ { "token": <STRING>, "parent_partition_tokens": [<STRING>], }, [...] ] |
하위 파티션 배열 및 관련 정보를 반환합니다. 여기에는 쿼리의 하위 파티션을 식별하는 데 사용되는 파티션 토큰 문자열과 해당 상위 파티션의 토큰이 포함됩니다. |
다음은 하위 파티션 레코드 예시입니다.
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
변경 내역 쿼리 워크플로
일회용 읽기 전용 트랜잭션 및 강력한 타임스탬프 경계와 함께 ExecuteStreamingSql
API를 사용하여 변경 내역 쿼리를 실행합니다. 변경 내역 읽기 함수를 사용하면 원하는 기간에 start_timestamp
및 end_timestamp
를 지정할 수 있습니다. 강력한 읽기 전용 타임스탬프 경계를 사용하여 보관 기간 내의 모든 변경 레코드에 액세스할 수 있습니다.
다른 모든 TransactionOptions
는 변경 내역 쿼리에 유효하지 않습니다. 또한 TransactionOptions.read_only.return_read_timestamp
가 true로 설정되면 유효한 읽기 타임스탬프 대신 트랜잭션을 설명하는 Transaction
메시지에서 특수 값 kint64max - 1
이 반환됩니다. 이 특수 값은 삭제해야 하며 이후 쿼리에 사용해서는 안 됩니다.
각 변경 내역 쿼리는 각각 데이터 변경 레코드, 하트비트 레코드 또는 하위 파티션 레코드가 포함된 행을 원하는 만큼 반환할 수 있습니다. 요청에 기한을 설정할 필요가 없습니다.
예를 들면 다음과 같습니다.
스트리밍 쿼리 워크플로는 partition_token
을 NULL
로 지정하여 첫 번째 변경 내역 쿼리를 실행하는 것으로 시작됩니다. 쿼리는 변경 내역에 대한 읽기 함수, 관심 있는 시작 및 종료 타임스탬프, 하트비트 간격을 지정해야 합니다. end_timestamp
가 NULL
이면 쿼리에서 파티션이 종료될 때까지 데이터 변경사항을 계속 반환합니다.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
하위 파티션 레코드가 반환될 때까지 이 쿼리의 데이터 레코드를 처리합니다. 아래 예시에서는 하위 파티션 레코드 2개와 파티션 토큰 3개가 반환된 후에 쿼리가 종료됩니다. 특정 쿼리의 하위 파티션 레코드는 항상 동일한 start_timestamp
를 공유합니다.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
2022-05-01T09:00:01Z
후에 향후 변경사항을 처리하려면 새 쿼리 3개를 만들어 동시에 실행합니다. 세 쿼리는 상위 요소와 동일한 키 범위의 향후 데이터 변경사항을 반환합니다. 항상 동일한 하위 파티션 레코드에서 start_timestamp
를 start_timestamp
로 설정하고 동일한 end_timestamp
및 하트비트 간격을 사용하여 모든 쿼리에서 레코드를 일관되게 처리합니다.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
잠시 후 child_token_2
에 대한 쿼리가 다른 하위 파티션 레코드를 반환한 후에 완료됩니다. 이 레코드는 새 파티션이 2022-05-01T09:30:15Z
부터 child_token_2
및 child_token_3
의 향후 변경사항을 처리할 것임을 나타냅니다. 둘 다 새 child_token_4
의 상위 파티션이므로 child_token_3
에 대한 쿼리에서 정확하게 동일한 레코드를 반환합니다.
특정 키의 데이터 레코드를 엄격한 순서대로 처리하기 위해서는 모든 상위 요소(이 경우에는child_token_2
및child_token_3
)가 완료된 후에 child_token_4
에 대한 쿼리가 시작되어야 합니다. 각 하위 파티션 토큰마다 하나의 쿼리만 만듭니다. 쿼리 워크플로 설계에서 상위 요소 하나를 대기하도록 지정하고 child_token_4
에 쿼리를 예약해야 합니다.
child partitions record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
GitHub에서 Apache Beam SpannerIO Dataflow 커넥터의 변경 내역 레코드 처리 및 파싱 예시를 찾아보세요.