변경 내역 연결을 Kafka로 빌드

이 페이지에서는 Kafka 커넥터를 사용하여 Spanner 변경 내역 데이터를 사용하고 전달하는 방법을 설명합니다.

핵심 개념

다음은 Kafka 커넥터의 핵심 개념을 설명합니다.

Debezium

Debezium은 변경 데이터 캡처를 위한 지연 시간이 짧은 데이터 스트리밍 플랫폼을 제공하는 오픈소스 프로젝트입니다.

Kafka 커넥터

Kafka 커넥터는 Spanner API를 통해 추상화를 제공하여 Spanner 변경 내역을 Kafka에 게시합니다. 이 커넥터를 사용하면 Spanner API를 직접 사용할 때 필요한 변경 내역 파티션 수명 주기를 관리할 필요가 없습니다.

Kafka 커넥터는 모든 데이터 변경 레코드 모드에 대해 변경 이벤트를 생성하고 변경 이벤트 레코드 다운스트림을 각 변경 내역 추적 테이블에 대한 별도의 Kafka 주제로 전송합니다. 데이터 변경 레코드 모드는 캡처된 단일 수정(삽입, 업데이트 또는 삭제)을 나타냅니다. 단일 데이터 변경 레코드에는 2개 이상의 모드가 포함될 수 있습니다.

Kafka 커넥터 출력

Kafka 커넥터는 변경 내역 레코드를 별도의 Kafka 주제로 직접 전달합니다. 출력 주제 이름은 connector_name.table_name이어야 합니다. 주제가 없으면 Kafka 커넥터가 해당 이름으로 주제를 자동으로 만듭니다.

주제 라우팅 변환을 구성하여 레코드를 지정한 주제로 다시 라우팅할 수도 있습니다. 주제 라우팅을 사용하려면 낮은 워터마크 기능을 사용 중지합니다.

레코드 순서 지정

레코드는 Kafka 주제의 기본 키별로 커밋 타임스탬프에 따라 정렬됩니다. 서로 다른 기본 키에 속하는 레코드는 순서 보장을 하지 않습니다. 기본 키가 동일한 레코드는 같은 Kafka 주제 파티션에 저장됩니다. 전체 트랜잭션을 처리하려면 데이터 변경 레코드 server_transaction_idnumber_of_records_in_transaction 필드를 사용하여 Spanner 트랜잭션을 조합하면 됩니다.

변경 이벤트

Kafka 커넥터는 각 INSERT, UPDATE, DELETE 작업에 대해 데이터 변경 이벤트를 생성합니다. 각 이벤트에는 변경된 행의 키와 값이 포함됩니다.

Kafka Connect 변환기를 사용하여 Protobuf, AVRO, JSON 또는 JSON Schemaless 형식으로 데이터 변경 이벤트를 생성할 수 있습니다. 스키마를 생성하는 Kafka Connect 변환기를 사용하면 이벤트에 키 및 값에 대한 별도의 스키마가 포함됩니다. 그렇지 않으면 이벤트에 키와 값만 포함됩니다.

키의 스키마는 변경되지 않습니다. 값의 스키마는 커넥터 시작 시간 이후 변경 내역이 추적한 모든 열의 병합입니다.

JSON 이벤트를 생성하도록 커넥터를 구성하면 출력 변경 이벤트에 5개 필드가 포함됩니다.

  • 첫 번째 schema 필드는 Spanner 키 스키마를 설명하는 Kafka Connect 스키마를 지정합니다.

  • 첫 번째 payload 필드에는 이전 schema 필드에 설명된 구조가 있으며 변경된 행의 키가 포함됩니다.

  • 두 번째 schema 필드는 변경된 행의 스키마를 설명하는 Kafka Connect 스키마를 지정합니다.

  • 두 번째 payload 필드에는 이전 schema 필드에 설명된 구조가 있으며 변경된 행의 실제 데이터가 포함됩니다.

  • source 필드는 이벤트의 소스 메타데이터를 설명하는 필수 필드입니다.

다음은 데이터 변경 이벤트의 예시입니다.

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

낮은 워터마크

낮은 워터마크는 Kafka 커넥터에서 타임스탬프가 T보다 이전인 모든 이벤트를 스트리밍하고 Kafka 주제에 게시한 것으로 보장되는 시간 T를 설명합니다.

gcp.spanner.low-watermark.enabled 매개변수를 사용하여 Kafka 커넥터에서 낮은 워터마크를 사용 설정할 수 있습니다. 이 매개변수는 기본적으로 사용 중지됩니다. 낮은 워터마크가 사용 설정되면 변경 내역 데이터 변경 레코드의 low_watermark 필드가 Kafka 커넥터의 현재 낮은 워터마크 타임스탬프로 채워집니다.

생성되는 레코드가 없으면 Kafka 커넥터는 커넥터에서 감지된 Kafka 출력 주제로 주기적으로 워터마크 '하트비트'를 전송합니다.

이러한 워터마크 하트비트는 low_watermark 필드를 제외하고 비어 있는 레코드입니다. 그런 다음 낮은 워터마크를 사용하여 시간 기반 집계를 수행할 수 있습니다. 예를 들어 낮은 워터마크를 사용하여 기본 키에서 커밋 타임스탬프를 기준으로 이벤트를 정렬할 수 있습니다.

메타데이터 주제

Kafka 커넥터와 Kafka Connect 프레임워크는 커넥터 관련 정보를 저장하기 위해 여러 메타데이터 주제를 만듭니다. 이러한 메타데이터 주제의 구성 또는 콘텐츠를 수정하는 않는 것이 좋습니다.

다음은 메타데이터 주제입니다.

  • _consumer_offsets: Kafka가 자동으로 생성하는 주제입니다. Kafka 커넥터에서 생성된 소비자의 소비자 오프셋을 저장합니다.
  • _kafka-connect-offsets: Kafka Connect에서 자동으로 생성된 주제입니다. 커넥터 오프셋을 저장합니다.
  • _sync_topic_spanner_connector_connectorname: 커넥터가 자동으로 생성하는 주제입니다. 변경 내역 파티션과 관련된 메타데이터를 저장합니다.
  • _rebalancing_topic_spanner_connector_connectorname: 커넥터가 자동으로 생성하는 주제입니다. 커넥터 태스크 활성을 결정하는 데 사용됩니다.
  • _debezium-heartbeat.connectorname: Spanner 변경 내역 하트비트를 처리하는 데 사용되는 주제입니다.

Kafka 커넥터 런타임

다음은 Kafka 커넥터 런타임을 설명합니다.

확장성

Kafka 커넥터는 수평 확장이 가능하며 여러 Kafka Connect 작업자 간에 분산된 하나 이상의 태스크에서 실행됩니다.

메시지 전송 보장

Kafka 커넥터는 최소 1회 전송 보장을 지원합니다.

내결함성

Kafka 커넥터는 내결함성을 지원합니다. Kafka 커넥터는 변경사항을 읽고 이벤트를 생성하면서 각 변경 내역 파티션에 대해 처리된 마지막 커밋 타임스탬프를 기록합니다. 통신 실패, 네트워크 문제 또는 소프트웨어 오류 등의 이유로 Kafka 커넥터가 중지되면 Kafka 커넥터를 다시 시작하면 마지막에 중단된 지점부터 스트리밍 레코드가 계속 실행됩니다.

Kafka 커넥터는 Kafka 커넥터의 시작 타임스탬프에서 정보 스키마를 읽어 스키마 정보를 검색합니다. 기본적으로 Spanner는 버전 보관 기간(기본값: 1시간) 전에 읽기 타임스탬프에서 정보 스키마를 읽을 수 없습니다. 1시간 이전부터 커넥터를 시작하려면 데이터베이스의 버전 보관 기간을 늘려야 합니다.

Kafka 커넥터 설정

변경 스트림 만들기

변경 스트림을 만드는 방법은 변경 스트림 만들기를 참조하세요. 다음 단계를 계속하려면 변경 스트림이 구성된 Spanner 인스턴스가 필요합니다.

각 데이터 변경 이벤트에서 변경된 열과 변경되지 않은 열을 모두 반환하려면 값 캡처 유형 NEW_ROW를 사용합니다. 자세한 내용은 값 캡처 유형을 참조하세요.

Kafka 커넥터 JAR 설치

Zookeeper, KafkaKafka Connect가 설치된 상태에서 Kafka 커넥터를 배포하기 위해 남은 작업은 커넥터의 플러그인 보관 파일을 다운로드하고 JAR 파일을 Kafka Connect 환경으로 추출하고 JAR 파일이 포함된 디렉터리를 Kafka Connect의 plugin.path에 추가하는 것입니다. 그런 다음 Kafka Connect 프로세스를 다시 시작하여 새 JAR 파일을 선택해야 합니다.

변경할 수 없는 컨테이너를 사용하는 경우 Zookeeper, Kafka, Kafka Connect용 Debezium 컨테이너 이미지에서 이미지를 가져올 수 있습니다. Kafka Connect 이미지에는 Spanner 커넥터가 사전 설치되어 있습니다.

Debezium 기반 Kafka 커넥터 JAR을 설치하는 방법에 대한 자세한 내용은 Debezium 설치를 참조하세요.

Kafka 커넥터 구성

다음은 test-instance 인스턴스 및 프로젝트 test-projectusers 데이터베이스에서 changeStreamAll이라는 변경 내역에 연결되는 Kafka 커넥터의 구성 예시입니다.

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

이 구성에는 다음이 포함됩니다.

  • Kafka Connect 서비스에 등록할 때 커넥터의 이름입니다.

  • 이 Spanner 커넥터 클래스의 이름입니다.

  • 프로젝트 ID입니다.

  • Spanner 인스턴스 ID입니다.

  • Spanner 데이터베이스 ID입니다.

  • 변경 내역 이름입니다.

  • 서비스 계정 키의 JSON 객체입니다.

  • (선택사항) 사용할 Spanner 데이터베이스 역할입니다.

  • 최대 태스크 수입니다.

커넥터 속성의 전체 목록은 Kafka 커넥터 구성 속성을 참조하세요.

Kafka Connect에 커넥터 구성 추가

Spanner 커넥터 실행을 시작하려면 다음 안내를 따르세요.

  1. Spanner 커넥터의 구성을 만듭니다.

  2. Kafka Connect REST API를 사용하여 Kafka Connect 클러스터에 해당 커넥터 구성을 추가합니다.

POST 명령어를 사용하여 이 구성을 실행 중인 Kafka Connect 서비스에 전송할 수 있습니다. 기본적으로 Kafka Connect 서비스는 포트 8083에서 실행됩니다. 이 서비스는 구성을 기록하고 Spanner 데이터베이스에 연결하는 커넥터 태스크를 시작하고 변경 이벤트 레코드를 Kafka 주제로 스트리밍합니다.

다음은 POST 명령어의 예시입니다.

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

성공적인 응답 예시:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Kafka 커넥터 구성 업데이트

커넥터 구성을 업데이트하려면 동일한 커넥터 이름으로 실행 중인 Kafka Connect 서비스에 PUT 명령어를 보냅니다.

이전 섹션의 구성으로 커넥터가 실행된다고 가정해 보겠습니다. 다음은 PUT 명령어의 예시입니다.

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

성공적인 응답 예시:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Kafka 커넥터 중지

커넥터를 중지하려면 동일한 커넥터 이름을 사용하여 실행 중인 Kafka Connect 서비스에 DELETE 명령어를 보냅니다.

이전 섹션의 구성으로 커넥터가 실행된다고 가정해 보겠습니다. 다음은 DELETE 명령어의 예시입니다.

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

성공적인 응답 예시:

HTTP/1.1 204 No Content

Kafka 커넥터 모니터링

Kafka 커넥터는 표준 Kafka Connect 및 Debezium 측정항목 외에도 자체 측정항목을 내보냅니다.

  • MilliSecondsLowWatermark: 커넥터 태스크의 현재 하위 워터마크(밀리초)입니다. 낮은 워터마크는 커넥터ㅇ에서 타임스탬프가 T보다 이전인 모든 이벤트를 스트리밍한 것으로 보장되는 시간 T를 설명합니다.

  • MilliSecondsLowWatermarkLag: 현재 시간보다 이후인 워터마크 지연 시간(밀리초)입니다. 타임스탬프가 T보다 이전인 모든 이벤트를 스트리밍했습니다.

  • LatencyLowWatermark<Variant>MilliSeconds: 현재 시간보다 이후인 워터마크 지연 시간(밀리초)입니다. P50, P95, P99, 평균, 최소, 최대 변이가 제공됩니다.

  • LatencySpanner<Variant>MilliSeconds: Spanner-commit-timestamp-to-connector-read 지연 시간입니다. P50, P95, P99, 평균, 최소, 최대 변이가 제공됩니다.

  • LatencyReadToEmit<Variant>MilliSeconds: Spanner-read-timestamp-to-connector-emit 지연 시간입니다. P50, P95, P99, 평균, 최소, 최대 변이가 제공됩니다.

  • LatencyCommitToEmit<Variant>tMilliSeconds: Spanner-commit-timestamp-to-connector-emit 지연 시간입니다. P50, P95, P99, 평균, 최소, 최대 변이가 제공됩니다.

  • LatencyCommitToPublish<Variant>MilliSeconds: Spanner-commit-timestamp-to Kafka-publish-timestamp 지연 시간입니다. P50, P95, P99, 평균, 최소, 최대 변이가 제공됩니다.

  • NumberOfChangeStreamPartitionsDetected: 현재 커넥터 작업에서 감지한 총 파티션 수입니다.

  • NumberOfChangeStreamQueriesIssued: 현재 태스크에서 실행된 총 변경 내역 쿼리 수입니다.

  • NumberOfActiveChangeStreamQueries: 현재 커넥터 태스크에서 감지한 활성 변경 내역 쿼리 수입니다.

  • SpannerEventQueueCapacity: 변경 내역 쿼리에서 수신된 요소를 저장하는 큐인 StreamEventQueue의 총 용량입니다.

  • SpannerEventQueueCapacity: 남은 StreamEventQueue 용량입니다.

  • TaskStateChangeEventQueueCapacity: 커넥터에서 발생하는 이벤트를 저장하는 큐인 TaskStateChangeEventQueue의 총 용량입니다.

  • RemainingTaskStateChangeEventQueueCapacity: 남은 TaskStateChangeEventQueue 용량입니다.

  • NumberOfActiveChangeStreamQueries: 현재 커넥터 태스크에서 감지한 활성 변경 내역 쿼리 수입니다.

Kafka 커넥터 구성 속성

다음은 커넥터의 필수 구성 속성입니다.

  • name: 커넥터의 고유한 이름입니다. 같은 이름으로 다시 등록하려고 하면 실패합니다. 이 속성은 모든 Kafka Connect 커넥터에 필요합니다.

  • connector.class: 커넥터의 자바 클래스 이름입니다. Kafka 커넥터에 항상 io.debezium.connector.spanner.SpannerConnector 값을 사용합니다.

  • tasks.max: 이 커넥터에 생성되어야 하는 최대 태스크 수입니다.

  • gcp.spanner.project.id: 프로젝트 ID

  • gcp.spanner.instance.id: Spanner 인스턴스 ID입니다.

  • gcp.spanner.database.id: Spanner 데이터베이스 ID입니다.

  • gcp.spanner.change.stream: Spanner 변경 내역 이름입니다.

  • gcp.spanner.credentials.json: 서비스 계정 키 JSON 객체입니다.

  • gcp.spanner.credentials.path: 서비스 계정 키 JSON 객체의 파일 경로입니다. 위 필드가 제공되지 않은 경우 필수입니다.

  • gcp.spanner.database.role : 사용할 Spanner 데이터베이스 역할입니다. 이는 세분화된 액세스 제어로 변경 내역이 보호되는 경우에만 필요합니다. 데이터베이스 역할에는 변경 내역에 대한 SELECT 권한과 변경 내역의 읽기 함수에 대한 EXECUTE 권한이 있어야 합니다. 자세한 내용은 변경 내역에 대한 세분화된 액세스 제어를 참조하세요.

다음 고급 구성 속성에는 대부분의 상황에서 작동하는 기본값도 있으므로 커넥터 구성에 지정할 필요가 거의 없습니다.

  • gcp.spanner.low-watermark.enabled: 커넥터에 낮은 워터마크가 사용 설정되었는지 여부를 나타냅니다. 기본값은 false입니다.

  • gcp.spanner.low-watermark.update-period.ms: 낮은 워터마크가 업데이트되는 간격입니다. 기본값은 1,000ms입니다.

  • heartbeat.interval.ms: Spanner 하트비트 간격입니다. 기본값은 300,000(5분)입니다.

  • gcp.spanner.start.time: 커넥터 시작 시간입니다. 기본값은 현재 시간입니다.

  • gcp.spanner.end.time: 커넥터 종료 시간입니다. 기본값은 무한대입니다.

  • tables.exclude.list: 변경 이벤트를 제외할 테이블입니다. 기본값은 빈 값입니다.

  • tables.include.list: 변경 이벤트를 포함할 테이블입니다. 채워지지 않으면 모든 테이블이 포함됩니다. 기본값은 빈 값입니다.

  • gcp.spanner.stream.event.queue.capacity: Spanner 이벤트 큐 용량입니다. 기본값은 10000입니다.

  • connector.spanner.task.state.change.event.queue.capacity: 태스크 상태가 이벤트 큐 용량을 변경합니다. 기본값은 1000입니다.

  • connector.spanner.max.missed.heartbeats: 예외가 발생하기 전에 변경 내역 쿼리의 누락된 최대 하트비트 수입니다. 기본값은 10입니다.

  • scaler.monitor.enabled: 태스크 자동 확장이 사용 설정되었는지 여부를 나타냅니다. 기본값은 false입니다.

  • tasks.desired.partitions: 태스크당 선호하는 변경 내역 파티션 수입니다. 이 매개변수는 태스크 자동 확장에 필요합니다. 기본값은 2입니다.

  • tasks.min: 최소 태스크 수입니다. 이 매개변수는 태스크 자동 확장에 필요합니다. 기본값은 1입니다.

  • connector.spanner.sync.topic: 태스크 간 통신을 저장하는 데 사용되는 내부 커넥터 주제인 동기화 주제의 이름입니다. 사용자가 이름을 제공하지 않은 경우 기본값은 _sync_topic_spanner_connector_connectorname입니다.

  • connector.spanner.sync.poll.duration: 동기화 주제의 폴링 기간입니다. 기본값은 500밀리초입니다.

  • connector.spanner.sync.request.timeout.ms: 동기화 주제에 대한 요청 제한 시간입니다. 기본값은 5,000밀리초입니다.

  • connector.spanner.sync.delivery.timeout.ms: 동기화 주제에 게시하기 위한 제한 시간입니다. 기본값은 15,000밀리초입니다.

  • connector.spanner.sync.commit.offsets.interval.ms: 동기화 주제에 대해 오프셋이 커밋되는 간격입니다. 기본값은 60,000밀리초입니다.

  • connector.spanner.sync.publisher.wait.timeout: 동기화 주제에 메시지가 게시되는 간격입니다. 기본값은 5밀리초입니다.

  • connector.spanner.rebalancing.topic: 재균등화 주제의 이름입니다. 재균등화 주제는 태스크 활성을 확인하는 데 사용되는 내부 커넥터 주제입니다. 사용자가 이름을 제공하지 않은 경우 기본값은 _rebalancing_topic_spanner_connector_connectorname입니다.

  • connector.spanner.rebalancing.poll.duration: 재조정 주제의 폴링 기간입니다. 기본값은 5,000밀리초입니다.

  • connector.spanner.rebalancing.commit.offsets.timeout: 재균등화 주제의 오프셋을 커밋하는 데 사용되는 제한 시간입니다. 기본값은 5,000밀리초입니다.

  • connector.spanner.rebalancing.commit.offsets.interval.ms: 동기화 주제에 대해 오프셋이 커밋되는 간격입니다. 기본값은 60,000밀리초입니다.

  • connector.spanner.rebalancing.task.waiting.timeout: 재균등화 이벤트를 처리하기 전에 태스크가 대기하는 기간입니다. 기본값은 1,000밀리초입니다.

구성 가능한 커넥터 속성의 더 자세한 목록은 GitHub 저장소를 참조하세요.

제한사항