이 페이지에서는 Kafka 커넥터를 사용하여 Spanner 변경 내역 데이터를 사용하고 전달하는 방법을 설명합니다.
핵심 개념
다음은 Kafka 커넥터의 핵심 개념을 설명합니다.
Debezium
Debezium은 변경 데이터 캡처를 위한 지연 시간이 짧은 데이터 스트리밍 플랫폼을 제공하는 오픈소스 프로젝트입니다.
Kafka 커넥터
Kafka 커넥터는 Spanner API를 통해 추상화를 제공하여 Spanner 변경 내역을 Kafka에 게시합니다. 이 커넥터를 사용하면 Spanner API를 직접 사용할 때 필요한 변경 내역 파티션 수명 주기를 관리할 필요가 없습니다.
Kafka 커넥터는 모든 데이터 변경 레코드 모드에 대해 변경 이벤트를 생성하고 변경 이벤트 레코드 다운스트림을 각 변경 내역 추적 테이블에 대한 별도의 Kafka 주제로 전송합니다. 데이터 변경 레코드 모드는 캡처된 단일 수정(삽입, 업데이트 또는 삭제)을 나타냅니다. 단일 데이터 변경 레코드에는 mod가 두 개 이상 포함될 수 있습니다.
Kafka 커넥터 출력
Kafka 커넥터는 변경 내역 레코드를 별도의 Kafka 주제로 직접 전달합니다. 출력 주제 이름은 connector_name
.table_name
이어야 합니다. 주제가 없으면 Kafka 커넥터가 해당 이름으로 주제를 자동으로 만듭니다.
주제 라우팅 변환을 구성하여 레코드를 지정된 주제로 다시 라우팅할 수도 있습니다. 주제 라우팅을 사용하려면 낮은 워터마크 기능을 사용 중지합니다.
레코드 순서
레코드는 Kafka 주제의 기본 키별 커밋 타임스탬프별로 정렬됩니다. 서로 다른 기본 키에 속한 레코드는 순서를 보장하지 않습니다. 기본 키가 동일한 레코드는 같은 Kafka 주제 파티션에 저장됩니다. 전체 트랜잭션을 처리하려면 데이터 변경 레코드 server_transaction_id
및 number_of_records_in_transaction
필드를 사용하여 Spanner 트랜잭션을 조합하면 됩니다.
변경 이벤트
Kafka 커넥터는 각 INSERT
, UPDATE
, DELETE
작업마다 데이터 변경 이벤트를 생성합니다. 각 이벤트에는 변경된 행의 키와 값이 포함됩니다.
Kafka Connect 변환기를 사용하여 Protobuf
, AVRO
, JSON
또는 JSON Schemaless
형식의 데이터 변경 이벤트를 생성할 수 있습니다. 스키마를 생성하는 Kafka Connect 변환기를 사용하면 이벤트에 키와 값의 스키마가 별도로 포함됩니다. 그렇지 않으면 이벤트에 키와 값만 포함됩니다.
키의 스키마는 변경되지 않습니다. 값의 스키마는 커넥터 시작 시간 이후 변경 내역이 추적한 모든 열의 병합입니다.
JSON 이벤트가 생성되도록 커넥터를 구성하면 출력 변경 이벤트에는 필드 5개가 포함됩니다.
첫 번째
schema
필드는 Spanner 키 스키마를 설명하는 Kafka Connect 스키마를 지정합니다.첫 번째
payload
필드에는 이전schema
필드에 설명된 구조가 있으며 변경된 행의 키가 포함됩니다.두 번째
schema
필드는 변경된 행의 스키마를 설명하는 Kafka Connect 스키마를 지정합니다.두 번째
payload
필드에는 이전schema
필드에 설명된 구조가 있으며 변경된 행의 실제 데이터가 포함됩니다.source
필드는 이벤트의 소스 메타데이터를 설명하는 필수 필드입니다.
다음은 데이터 변경 이벤트 예시입니다.
{ // The schema for the Spanner key. "schema": { "type": "struct", "name": "customers.Key", "optional": false, "fields": [ { "type": "int64", "optional": "false" "field": "false" } ] }, // The value of the Spanner key. "payload": { "id": "1" }, // The schema for the payload, which contains the before and after values // of the changed row. The schema for the payload contains all the // columns that the change stream has tracked since the connector start // time. "schema": { "type": "struct", "fields": [ { // The schema for the before values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": true, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "before" }, { // The schema for the after values of the changed row. "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "id" }, { "type": "string", "optional": false, "field": "first_name" } ], "optional": true, "name": "customers.Value", "field": "after" }, { // The schema for the source metadata for the event. "type": "struct", "fields": [ { "type": "string", "optional": false, "field": "version" }, { "type": "string", "optional": false, "field": "connector" }, { "type": "string", "optional": false, "field": "name" }, { "type": "int64", "optional": false, "field": "ts_ms" }, { "type": "boolean", "optional": true, "default": false, "field": "snapshot" }, { "type": "string", "optional": false, "field": "db" }, { "type": "string", "optional": false, "field": "sequence" }, { "type": "string", "optional": false, "field": "project_id" }, { "type": "string", "optional": false, "field": "instance_id" }, { "type": "string", "optional": false, "field": "database_id" }, { "type": "string", "optional": false, "field": "change_stream_name" }, { "type": "string", "optional": true, "field": "table" } { "type": "string", "optional": true, "field": "server_transaction_id" } { "type": "int64", "optional": true, "field": "low_watermark" } { "type": "int64", "optional": true, "field": "read_at_timestamp" } { "type": "int64", "optional": true, "field": "number_of_records_in_transaction" } { "type": "string", "optional": true, "field": "transaction_tag" } { "type": "boolean", "optional": true, "field": "system_transaction" } { "type": "string", "optional": true, "field": "value_capture_type" } { "type": "string", "optional": true, "field": "partition_token" } { "type": "int32", "optional": true, "field": "mod_number" } { "type": "boolean", "optional": true, "field": "is_last_record_in_transaction_in_partition" } { "type": "int64", "optional": true, "field": "number_of_partitions_in_transaction" } ], "optional": false, "name": "io.debezium.connector.spanner.Source", "field": "source" }, ] { "type": "string", "optional": false, "field": "op" }, { "type": "int64", "optional": true, "field": "ts_ms" } ], "optional": false, "name": "connector_name.customers.Envelope" }, "payload": { // The values of the row before the event. "before": null, // The values of the row after the event. "after": { "id": 1, "first_name": "Anne", } }, // The source metadata. "source": { "version": "{debezium-version}", "connector": "spanner", "name": "spanner_connector", "ts_ms": 1670955531785, "snapshot": "false", "db": "database", "sequence": "1", "project_id": "project", "instance_id": "instance", "database_id": "database", "change_stream_name": "change_stream", "table": "customers", "server_transaction_id": "transaction_id", "low_watermark": 1670955471635, "read_at_timestamp": 1670955531791, "number_records_in_transaction": 2, "transaction_tag": "", "system_transaction": false, "value_capture_type": "OLD_AND_NEW_VALUES", "partition_token": "partition_token", "mod_number": 0, "is_last_record_in_transaction_in_partition": true, "number_of_partitions_in_transaction": 1 }, "op": "c", "ts_ms": 1559033904863 // }
낮은 워터마크
낮은 워터마크는 Kafka 커넥터에서 타임스탬프가 T보다 이전인 모든 이벤트를 스트리밍하고 Kafka 주제에 게시한 것으로 보장되는 시간 T를 설명합니다.
gcp.spanner.low-watermark.enabled
파라미터를 사용하여 Kafka 커넥터에서 낮은 워터마크를 사용 설정할 수 있습니다. 이 매개변수는 기본적으로 사용 중지됩니다. 낮은 워터마크가 사용 설정되면 변경 내역 데이터 변경 레코드의 low_watermark
필드가 Kafka 커넥터의 현재 낮은 워터마크 타임스탬프로 채워집니다.
생성되는 레코드가 없으면 Kafka 커넥터는 주기적인 워터마크 '하트비트'를 커넥터에서 감지한 Kafka 출력 주제에 전송합니다.
이러한 워터마크 하트비트는 비어 있는 레코드입니다(low_watermark
필드 제외). 그런 다음 낮은 워터마크를 사용하여 시간 기반 집계를 수행할 수 있습니다.
예를 들어 낮은 워터마크를 사용하여 기본 키 전체에서 커밋 타임스탬프별로 이벤트를 정렬할 수 있습니다.
메타데이터 주제
Kafka 커넥터와 Kafka Connect 프레임워크는 여러 메타데이터 주제를 만들어 커넥터 관련 정보를 저장합니다. 이러한 메타데이터 주제의 구성 또는 콘텐츠를 수정하는 않는 것이 좋습니다.
다음은 메타데이터 주제입니다.
_consumer_offsets
: Kafka에서 자동으로 만든 주제입니다. Kafka 커넥터에서 생성된 소비자의 소비자 오프셋을 저장합니다._kafka-connect-offsets
: Kafka Connect에서 자동으로 만든 주제입니다. 커넥터 오프셋을 저장합니다._sync_topic_spanner_connector_connectorname
: 커넥터에서 자동으로 만든 주제입니다. 변경 내역 파티션에 대한 메타데이터를 저장합니다._rebalancing_topic_spanner_connector_connectorname
: 커넥터에서 자동으로 만든 주제입니다. 커넥터 태스크 활성 상태를 확인하는 데 사용됩니다._debezium-heartbeat.connectorname
: Spanner 변경 내역 하트비트를 처리하는 데 사용되는 주제입니다.
Kafka 커넥터 런타임
다음에서는 Kafka 커넥터 런타임을 설명합니다.
확장성
Kafka 커넥터는 수평 확장이 가능하며 여러 Kafka Connect 작업자 간에 분산된 태스크 하나 이상에서 실행됩니다.
메시지 전송 보장
Kafka 커넥터는 최소 1회 이상 전송을 보장합니다.
내결함성
Kafka 커넥터는 오류를 허용합니다. Kafka 커넥터는 변경사항을 읽고 이벤트를 생성하면서 각 변경 내역 파티션에 대해 처리된 마지막 커밋 타임스탬프를 기록합니다. 어떤 이유로든(예: 통신 실패, 네트워크 문제 또는 소프트웨어 오류) Kafka 커넥터가 중지되면 다시 시작 시 Kafka 커넥터는 마지막으로 중단된 지점부터 레코드 계속 스트리밍합니다.
Kafka 커넥터는 Kafka 커넥터의 시작 타임스탬프에서 정보 스키마를 읽어 스키마 정보를 검색합니다. 기본적으로 Spanner는 버전 보관 기간(기본값: 1시간) 전에 읽기 타임스탬프에서 정보 스키마를 읽을 수 없습니다. 1시간 이전부터 커넥터를 시작하려면 데이터베이스의 버전 보관 기간을 늘려야 합니다.
Kafka 커넥터 설정
변경 스트림 만들기
변경 스트림을 만드는 방법은 변경 스트림 만들기를 참조하세요. 다음 단계를 계속하려면 변경 스트림이 구성된 Spanner 인스턴스가 필요합니다.
각 데이터 변경 이벤트에서 변경된 열과 변경되지 않은 열 모두를 반환하려면 값 캡처 유형 NEW_ROW
를 사용합니다. 자세한 내용은 값 캡처 유형을 참조하세요.
Kafka 커넥터 JAR 설치
Zookeeper, Kafka 및 Kafka Connect가 설치된 상태에서 Kafka 커넥터를 배포하기 위해 남은 작업은 커넥터의 플러그인 보관 파일을 다운로드하고 JAR 파일을 Kafka Connect 환경으로 추출하고 JAR 파일이 포함된 디렉터리를 Kafka Connect의 plugin.path
에 추가하는 것입니다.
그런 다음 Kafka Connect 프로세스를 다시 시작하여 새 JAR 파일을 선택해야 합니다.
변경할 수 없는 컨테이너를 사용하는 경우 Zookeeper, Kafka, Kafka Connect용 Debezium 컨테이너 이미지에서 이미지를 가져올 수 있습니다. Kafka Connect 이미지에는 Spanner 커넥터가 사전 설치되어 있습니다.
Debezium 기반 Kafka 커넥터 JAR을 설치하는 방법에 대한 자세한 내용은 Debezium 설치를 참조하세요.
Kafka 커넥터 구성
다음은 test-instance
인스턴스 및 프로젝트 test-project
의 users
데이터베이스에서 changeStreamAll
이라는 변경 내역에 연결되는 Kafka 커넥터의 구성 예시입니다.
"name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{"client_id": user@example.com}", "gcp.spanner.database.role": "cdc-role", "tasks.max": "10" }
이 구성에는 다음이 포함됩니다.
Kafka Connect 서비스에 등록된 커넥터의 이름입니다.
이 Spanner 커넥터 클래스 이름입니다.
프로젝트 ID입니다.
Spanner 인스턴스 ID입니다.
Spanner 데이터베이스 ID입니다.
변경 내역 이름입니다.
서비스 계정 키의 JSON 객체입니다.
(선택사항) 사용할 Spanner 데이터베이스 역할입니다.
최대 태스크 수입니다.
커넥터 속성의 전체 목록은 Kafka 커넥터 구성 속성을 참조하세요.
Kafka Connect에 커넥터 구성 추가
Spanner 커넥터를 실행하려면 다음 안내를 따르세요.
Spanner 커넥터 구성을 만듭니다.
Kafka Connect REST API를 사용하여 해당 커넥터 구성을 Kafka Connect 클러스터에 추가합니다.
POST
명령어를 사용하여 이 구성을 실행 중인 Kafka Connect 서비스로 전송할 수 있습니다. 기본적으로 Kafka Connect 서비스는 포트 8083
에서 실행됩니다.
서비스는 구성을 기록하고 Spanner 데이터베이스에 연결하고 변경 이벤트 레코드를 Kafka 주제로 스트리밍하는 커넥터 태스크를 시작합니다.
다음은 POST
명령어 예시입니다.
POST /connectors HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "name": "spanner-connector" "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" } }
성공적인 응답의 예시는 다음과 같습니다.
HTTP/1.1 201 Created Content-Type: application/json { "name": "spanner-connector", "config": { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }, "tasks": [ { "connector": "spanner-connector", "task": 1 }, { "connector": "spanner-connector", "task": 2 }, { "connector": "spanner-connector", "task": 3 } ] }
Kafka 커넥터 구성 업데이트
커넥터 구성을 업데이트하려면 PUT
명령어를 커넥터 이름이 같은 실행 중인 Kafka Connect 서비스로 전송합니다.
이전 섹션의 구성으로 실행 중인 커넥터가 있다고 가정해 보겠습니다. 다음은 PUT
명령어 예시입니다.
PUT /connectors/spanner-connector/config HTTP/1.1 Host: http://localhost:8083 Accept: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
성공적인 응답의 예시는 다음과 같습니다.
HTTP/1.1 200 OK Content-Type: application/json { "connector.class": "io.debezium.connector.spanner.SpannerConnector", "tasks.max": "10", "gcp.spanner.project.id": "test-project", "gcp.spanner.instance.id": "test-instance", "gcp.spanner.database.id": "users", "gcp.spanner.change.stream": "changeStreamAll", "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }", "heartbeat.interval.ms": "100", "tasks.max": "10" }
Kafka 커넥터 중지
커넥터를 중지하려면 DELETE
명령어를 커넥터 이름이 같은 실행 중인 Kafka Connect 서비스로 전송합니다.
이전 섹션의 구성으로 실행 중인 커넥터가 있다고 가정해 보겠습니다. 다음은 DELETE
명령어 예시입니다.
DELETE /connectors/spanner-connector HTTP/1.1 Host: http://localhost:8083
성공적인 응답의 예시는 다음과 같습니다.
HTTP/1.1 204 No Content
Kafka 커넥터 모니터링
Kafka 커넥터는 표준 Kafka Connect 및 Debezium 측정항목 외에도 자체 측정항목을 내보냅니다.
MilliSecondsLowWatermark
: 커넥터 태스크의 현재 낮은 워터마크(밀리초)입니다. 낮은 워터마크는 커넥터ㅇ에서 타임스탬프가 T보다 이전인 모든 이벤트를 스트리밍한 것으로 보장되는 시간 T를 설명합니다.MilliSecondsLowWatermarkLag
: 현재 시간보다 이후인 워터마크 지연 시간(밀리초)입니다. 타임스탬프가 T보다 이전인 모든 이벤트를 스트리밍했습니다.LatencyLowWatermark<Variant>MilliSeconds
: 현재 시간보다 이후인 워터마크 지연 시간(밀리초)입니다. P50, P95, P99, 평균, 최솟값, 최댓값 옵션이 제공됩니다.LatencySpanner<Variant>MilliSeconds
: Spanner-commit-timestamp-to-connector-read 지연 시간입니다. P50, P95, P99, 평균, 최솟값, 최댓값 옵션이 제공됩니다.LatencyReadToEmit<Variant>MilliSeconds
: Spanner-read-timestamp-to-connector-emit 지연 시간입니다. P50, P95, P99, 평균, 최솟값, 최댓값 옵션이 제공됩니다.LatencyCommitToEmit<Variant>tMilliSeconds
: Spanner-commit-timestamp-to-connector-emit 지연 시간입니다. P50, P95, P99, 평균, 최솟값, 최댓값 옵션이 제공됩니다.LatencyCommitToPublish<Variant>MilliSeconds
: Spanner-commit-timestamp-to Kafka-publish-timestamp 지연 시간입니다. P50, P95, P99, 평균, 최솟값, 최댓값 옵션이 제공됩니다.NumberOfChangeStreamPartitionsDetected
: 현재 커넥터 태스크에서 감지한 파티션 총개수입니다.NumberOfChangeStreamQueriesIssued
: 현재 태스크에서 실행된 총 변경 내역 쿼리 수입니다.NumberOfActiveChangeStreamQueries
: 현재 커넥터 태스크에서 감지한 활성 변경 내역 쿼리 수입니다.SpannerEventQueueCapacity
: 변경 내역 쿼리에서 수신된 요소를 저장하는 큐인StreamEventQueue
의 총 용량입니다.SpannerEventQueueCapacity
: 남은StreamEventQueue
용량입니다.TaskStateChangeEventQueueCapacity
: 커넥터에서 발생하는 이벤트를 저장하는 큐인TaskStateChangeEventQueue
의 총 용량입니다.RemainingTaskStateChangeEventQueueCapacity
: 남은TaskStateChangeEventQueue
용량입니다.NumberOfActiveChangeStreamQueries
: 현재 커넥터 태스크에서 감지한 활성 변경 내역 쿼리 수입니다.
Kafka 커넥터 구성 속성
다음은 커넥터의 필수 구성 속성입니다.
name
: 커넥터의 고유한 이름입니다. 같은 이름으로 다시 등록하려고 하면 실패합니다. 이 속성은 모든 Kafka Connect 커넥터에 필요합니다.connector.class
: 커넥터의 Java 클래스 이름입니다. Kafka 커넥터에는 항상io.debezium.connector.spanner.SpannerConnector
값을 사용합니다.tasks.max
: 이 커넥터에 대해 만들어야 하는 최대 태스크 수입니다.gcp.spanner.project.id
: 프로젝트 IDgcp.spanner.instance.id
: Spanner 인스턴스 ID입니다.gcp.spanner.database.id
: Spanner 데이터베이스 ID입니다.gcp.spanner.change.stream
: Spanner 변경 내역 이름입니다.gcp.spanner.credentials.json
: 서비스 계정 키 JSON 객체입니다.gcp.spanner.credentials.path
: 서비스 계정 키 JSON 객체의 파일 경로입니다. 위 필드가 제공되지 않은 경우 필수입니다.gcp.spanner.database.role
: 사용할 Spanner 데이터베이스 역할입니다. 이는 세분화된 액세스 제어로 변경 내역이 보호되는 경우에만 필요합니다. 데이터베이스 역할에는 변경 내역에 대한SELECT
권한과 변경 내역의 읽기 함수에 대한EXECUTE
권한이 있어야 합니다. 자세한 내용은 변경 내역에 대해 세분화된 액세스 제어를 참조하세요.
다음 고급 구성 속성에는 대부분의 상황에서 작동하는 기본값이 있으므로 커넥터 구성에서 고급 구성 속성을 지정할 필요가 거의 없습니다.
gcp.spanner.low-watermark.enabled
: 커넥터에 낮은 워터마크가 사용 설정되어 있는지 여부를 나타냅니다. 기본값은 false입니다.gcp.spanner.low-watermark.update-period.ms
: 낮은 워터마크가 업데이트되는 간격입니다. 기본값은 1,000밀리초입니다.heartbeat.interval.ms
: Spanner 하트비트 간격입니다. 기본값은 300,000(5분)입니다.gcp.spanner.start.time
: 커넥터 시작 시간입니다. 기본값은 현재 시간입니다.gcp.spanner.end.time
: 커넥터 종료 시간입니다. 기본값은 무한대입니다.tables.exclude.list
: 변경 이벤트를 제외할 테이블입니다. 기본값은 빈 값입니다.tables.include.list
: 변경 이벤트를 포함할 테이블입니다. 채우지 않으면 모든 테이블이 포함됩니다. 기본값은 빈 값입니다.gcp.spanner.stream.event.queue.capacity
: Spanner 이벤트 큐 용량입니다. 기본값은 10000입니다.connector.spanner.task.state.change.event.queue.capacity
: 태스크 상태 변경 이벤트 큐 용량입니다. 기본값은 1000입니다.connector.spanner.max.missed.heartbeats
: 예외가 발생하기 전에 변경 내역 쿼리의 누락된 최대 하트비트 수입니다. 기본값은 10입니다.scaler.monitor.enabled
: 태스크 자동 확장이 사용 설정되어 있는지 여부를 나타냅니다. 기본값은 false입니다.tasks.desired.partitions
: 태스크당 선호하는 변경 내역 파티션 수입니다. 이 파라미터는 태스크 자동 확장에 필요합니다. 기본값은 2입니다.tasks.min
: 최소 태스크 수입니다. 이 파라미터는 태스크 자동 확장에 필요합니다. 기본값은 1입니다.connector.spanner.sync.topic
: 동기화 주제 이름입니다. 동기화 주제는 태스크 간의 통신을 저장하는 데 사용되는 내부 커넥터 주제입니다. 사용자가 이름을 제공하지 않은 경우 기본값은_sync_topic_spanner_connector_connectorname
입니다.connector.spanner.sync.poll.duration
: 동기화 주제의 폴링 기간입니다. 기본값은 500밀리초입니다.connector.spanner.sync.request.timeout.ms
: 동기화 주제에 대한 요청 제한 시간입니다. 기본값은 5,000밀리초입니다.connector.spanner.sync.delivery.timeout.ms
: 동기화 주제에 게시할 제한 시간입니다. 기본값은 15,000밀리초입니다.connector.spanner.sync.commit.offsets.interval.ms
: 오프셋이 동기화 주제에 커밋되는 간격입니다. 기본값은 60,000밀리초입니다.connector.spanner.sync.publisher.wait.timeout
: 메시지가 동기화 주제에 게시되는 간격입니다. 기본값은 5밀리초입니다.connector.spanner.rebalancing.topic
: 재균형 조정 주제 이름입니다. 재균형 조정 주제는 태스크 활성 상태를 확인하는 데 사용되는 내부 커넥터 주제입니다. 사용자가 이름을 제공하지 않은 경우 기본값은_rebalancing_topic_spanner_connector_connectorname
입니다.connector.spanner.rebalancing.poll.duration
: 재균형 조정 주제의 폴링 기간입니다. 기본값은 5,000밀리초입니다.connector.spanner.rebalancing.commit.offsets.timeout
: 재균형 조정 주제의 오프셋 커밋 제한 시간입니다. 기본값은 5,000밀리초입니다.connector.spanner.rebalancing.commit.offsets.interval.ms
: 오프셋이 동기화 주제에 커밋되는 간격입니다. 기본값은 60,000밀리초입니다.connector.spanner.rebalancing.task.waiting.timeout
: 재균형 조정 이벤트를 처리하기 전에 태스크가 대기하는 시간입니다. 기본값은 1,000밀리초입니다.
구성 가능한 커넥터 속성의 자세한 목록은 GitHub 저장소를 참조하세요.
제한사항
이 커넥터는 스냅샷 이벤트를 스트리밍할 수 없습니다.
커넥터에서 워터마킹이 사용 설정되면 Debezium 주제 라우팅 변환을 구성할 수 없습니다.