构建与 Kafka 的变更数据流连接

本页面介绍了如何使用 Kafka 连接器来 和转发 Spanner 变更流数据。

核心概念

下文介绍了 Kafka 连接器的核心概念。

Debezium

Debezium 是一个开源项目,旨在提供 用于变更数据捕获的低延迟数据流平台。

Kafka 连接器

Kafka 连接器在 Spanner API 上提供抽象, 将 Spanner 变更数据流发布到 Kafka。借助此连接器 则不必管理变更数据流分区生命周期, 在直接使用 Spanner API 时,您必须执行此操作。

Kafka 连接器为每个数据变更记录 mod 生成一个更改事件,并发送 对于每个跟踪变更数据流的表,将变更事件记录下行到单独的 Kafka 主题中。 数据变更记录 mod 表示捕获的单次修改(插入、更新或删除)。单个数据变更记录可以包含多个 mod。

Kafka 连接器输出

Kafka 连接器直接转发变更数据流记录 一个单独的 Kafka 主题。输出主题名称应为 connector_name.table_name。 如果该主题不存在,Kafka 连接器会自动以该名称创建主题。

您还可以配置主题路由转换,以将记录重新路由到您指定的主题。如果您要使用主题路由,请停用低水印功能。

记录排序

记录按每个主键的提交时间戳进行排序 Kafka 主题属于不同主键的记录不包含 排序保证具有相同主键的记录存储在 同一 Kafka 主题分区。如果您想处理整个交易 还使用数据变更记录server_transaction_idnumber_of_records_in_transaction 字段添加到 组建 Spanner 事务。

更改事件

Kafka 连接器为每个 INSERTUPDATE、 和 DELETE 运算。每个事件都包含所更改行对应的键和值。

您可以使用 Kafka Connect 转换器ProtobufAVROJSONJSON Schemaless 格式。如果您使用 生成架构的 Kafka Connect 转换器,事件包含 键和值使用单独的架构否则,该事件仅包含 键和值

键的架构永远不会更改。这些值的架构是 更改流跟踪的所有列的并集 。

如果您将连接器配置为生成 JSON 事件,则输出更改事件 包含五个字段:

  • 第一个 schema 字段指定描述 Spanner 密钥架构的 Kafka Connect 架构。

  • 第一个 payload 字段具有前面的 schema 字段描述的结构,并且包含已更改行的键。

  • 第二个 schema 字段指定 Kafka Connect 架构,用于描述已更改行的架构。

  • 第二个 payload 字段具有前面的 schema 字段描述的结构,它包含发生更改的行的实际数据。

  • source 字段是必填字段,用于描述事件的来源元数据。

以下是数据更改事件的示例:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

低水印

低水印描述时间 T,在该时间 Kafka 连接器保证已流式传输并发布到 Kafka 主题,且时间戳 <的

您可以使用 gcp.spanner.low-watermark.enabled 参数。此参数已停用 默认情况。如果启用了低水位,变更数据流数据中的 low_watermark 字段 使用 Kafka 连接器的当前低水位填充变更记录 时间戳。

如果没有生成任何记录,Kafka 连接器会定期发送 “心跳”水印到连接器检测到的 Kafka 输出主题。

这些水印检测信号是除 low_watermark 字段。然后,您可以使用低水位执行基于时间的汇总。 例如,您可以使用低水位按提交对事件进行排序 时间戳。

元数据主题

Kafka 连接器和 Kafka Connect 框架可以创建 元数据主题来存储连接器相关信息。不建议 修改这些元数据主题的配置或内容。

以下是元数据主题:

  • _consumer_offsets:Kafka 自动创建的主题。 存储在 Kafka 连接器中创建的使用方的使用方偏移。
  • _kafka-connect-offsets:Kafka Connect 自动创建的主题。存储连接器偏移量。
  • _sync_topic_spanner_connector_connectorname:连接器自动创建的主题。存储有关变更数据流分区的元数据。
  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的主题。用于确定连接器任务的活跃性。
  • _debezium-heartbeat.connectorname:用于处理 Spanner 变更数据流检测信号的主题。

Kafka 连接器运行时

下面介绍了 Kafka 连接器运行时。

可扩缩性

Kafka 连接器可横向伸缩,并在分布在多个 Kafka Connect 工作器中的一个或多个任务上运行。

消息传送保证

Kafka 连接器支持保证至少传送一次。

容错性

Kafka 连接器可容忍故障。当 Kafka 连接器读取更改和 生成事件时,它会记录为每个更改处理的上次提交时间戳 流分区。如果 Kafka 连接器因任何原因(包括 通信故障、网络问题或软件故障) Kafka 连接器会从上次中断的位置继续流式传输记录。

Kafka 连接器在启动时读取信息架构 时间戳以检索架构信息。默认情况下,Spanner 无法 在读取时间戳之前读取信息架构, 版本保留期限, 默认为 1 小时。如果您想从以下时间之前启动连接器: 您必须延长数据库的版本保留期限 。

设置 Kafka 连接器

创建变更流

如需详细了解如何创建变更数据流,请参阅创建变更数据流。如需继续执行后续步骤,需要一个配置了变更数据流的 Spanner 实例。

请注意,如果您希望在每一列都返回已更改的列和未更改的列 数据更改事件,请使用值捕获类型 NEW_ROW。如需了解详情,请参阅值捕获类型

安装 Kafka 连接器 JAR

安装 ZookeeperKafkaKafka Connect 后,部署 Kafka 连接器的剩余任务是下载 连接器的插件归档,将 JAR 文件提取到 Kafka Connect 环境中,然后将 目录(包含 JAR 文件)复制到 Kafka Connect 的 plugin.path。 然后,您需要重启 Kafka Connect 进程以获取新的 JAR 文件。

如果您使用的是不可变容器,则可以从以下位置拉取映像: Debezium 的容器映像,用于 Zookeeper、Kafka 和 Kafka Connect。Kafka Connect 映像包含 预安装了 Spanner 连接器。

如需详细了解如何安装基于 Debezium 的 Kafka 连接器 JAR,请参阅 安装 Debezium

配置 Kafka 连接器

以下是 Kafka 连接器的配置示例 该 API 连接到名为 changeStreamAll 的变更数据流(位于 实例“test-instance”和项目“test-project”中的数据库“users”。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

此配置包含以下内容:

  • 注册到 Kafka Connect 服务时的连接器名称。

  • 此 Spanner 连接器类的名称。

  • 项目 ID。

  • Spanner 实例 ID。

  • Spanner 数据库 ID。

  • 变更数据流的名称。

  • 服务账号密钥的 JSON 对象。

  • (可选)要使用的 Spanner 数据库角色。

  • 任务数量上限。

如需查看连接器属性的完整列表,请参阅 Kafka 连接器配置属性

将连接器配置添加到 Kafka Connect

如需开始运行 Spanner 连接器,请执行以下操作:

  1. 为 Spanner 连接器创建配置。

  2. 使用 Kafka Connect REST API 添加 连接到 Kafka Connect 集群。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。默认情况下,Kafka Connect 服务在端口 8083 上运行。 服务会记录配置并启动连接器任务 (用于连接到 Spanner 数据库并将变更事件记录流式传输到 Kafka 主题。

以下是 POST 命令示例:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功响应示例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

更新 Kafka 连接器配置

要更新连接器配置,请向正在运行的PUT 具有相同连接器名称的 Kafka Connect 服务。

假设我们有一个连接器,正在以 上一节。以下是 PUT 命令示例:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功响应示例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

停止 Kafka 连接器

如需停止连接器,请向正在运行的DELETE 具有相同连接器名称的 Kafka Connect 服务。

假设我们有一个连接器,正在以 上一节。以下是 DELETE 命令示例:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功响应示例:

HTTP/1.1 204 No Content

监控 Kafka 连接器

除了标准 Kafka Connect 和 Debezium 指标之外,Kafka 连接器还会导出自己的指标:

  • MilliSecondsLowWatermark:连接器任务的当前低水位(以毫秒为单位)。通过 低水位标识的时间 T,在该时间 T,连接器保证具有 流式传输所有时间戳 < 的事件T

  • MilliSecondsLowWatermarkLag:低水位相对于当前时间的延迟(以毫秒为单位)。 流式传输所有时间戳 < 的事件T

  • LatencyLowWatermark<Variant>MilliSeconds:低水位相对于当前时间的延迟 (以毫秒为单位)。同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencySpanner<Variant>MilliSeconds:Spanner-commit-timestamp-to-connector-read 延迟时间。 同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencyReadToEmit<Variant>MilliSeconds:Spanner-read-timestamp-to-connector-emit 延迟时间。 同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencyCommitToEmit<Variant>tMilliSeconds:Spanner-commit-timestamp-to-connector-emit 延迟时间。 同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencyCommitToPublish<Variant>MilliSeconds:Spanner-commit-timestamp-to Kafka-publish-timestamp 延迟时间。同时提供 P50、P95、P99、平均、最小和最大变体。

  • NumberOfChangeStreamPartitionsDetected:分区总数 当前连接器任务检测到的任务。

  • NumberOfChangeStreamQueriesIssued:变更数据流查询的总数 当前任务发出的输出。

  • NumberOfActiveChangeStreamQueries:变更数据流的活跃数量 当前连接器任务检测到的查询。

  • SpannerEventQueueCapacityStreamEventQueue 的总容量,这是一个队列,用于存储从变更数据流查询接收的元素。

  • SpannerEventQueueCapacity:剩余的 StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacityTaskStateChangeEventQueue 的总容量,这是一个用于存储连接器中发生的事件的队列。

  • RemainingTaskStateChangeEventQueueCapacity:剩余的 TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries:变更数据流的活跃数量 当前连接器任务检测到的查询。

Kafka 连接器配置属性

以下是连接器必需的配置属性:

  • name:连接器的唯一名称。尝试使用同一名称重新注册时会导致失败。所有 Kafka Connect 连接器都需要此属性。

  • connector.class:连接器的 Java 类的名称。请始终为 Kafka 连接器使用 io.debezium.connector.spanner.SpannerConnector 值。

  • tasks.max:应为此连接器创建的任务数上限。

  • gcp.spanner.project.id:项目 ID

  • gcp.spanner.instance.id:Spanner 实例 ID

  • gcp.spanner.database.id:Spanner 数据库 ID

  • gcp.spanner.change.stream:Spanner 变更数据流名称

  • gcp.spanner.credentials.json:服务账号密钥 JSON 对象。

  • gcp.spanner.credentials.path:指向服务账号密钥 JSON 对象的文件路径。如果未提供上述字段,则为必填。

  • gcp.spanner.database.role:拥有此角色的 Spanner 数据库角色 。仅当变更数据流受到 精细的访问权限控制此数据库角色必须拥有以下项目的 SELECT 权限: 变更数据流以及对变更数据流读取操作的 EXECUTE 权限 函数。有关详情,请参阅对变更的精细访问权限控制 数据流

以下高级配置属性具有适用于大多数情况的默认值 因此很少需要在连接器的配置中指定:

  • gcp.spanner.low-watermark.enabled:指示是否为连接器启用了低水印。默认值为 false。

  • gcp.spanner.low-watermark.update-period.ms:低水印的更新时间间隔。默认值为 1000 毫秒。

  • heartbeat.interval.ms:Spanner 检测信号间隔。默认值为 300000(五分钟)。

  • gcp.spanner.start.time:连接器开始时间。默认值为当前时间。

  • gcp.spanner.end.time:连接器结束时间。默认值为无穷大。

  • tables.exclude.list:要排除更改事件的表。默认值为空。

  • tables.include.list:包含更改事件的表。如果未填充,则包含所有表。默认值为空。

  • gcp.spanner.stream.event.queue.capacity:Spanner 事件队列容量。默认值为 10,000。

  • connector.spanner.task.state.change.event.queue.capacity:任务状态更改事件队列容量。默认值为 1000。

  • connector.spanner.max.missed.heartbeats:在抛出异常之前变更数据流查询错过的检测信号数量上限。默认值为 10。

  • scaler.monitor.enabled:指示是否已启用任务自动扩缩。默认值为 false。

  • tasks.desired.partitions:每个任务的变更数据流分区的首选数量。任务自动扩缩需要使用此参数。默认值为 2。

  • tasks.min:最小任务数量。任务自动扩缩需要使用此参数。默认为 1。

  • connector.spanner.sync.topic:同步主题的名称,这是一个内部连接器主题,用于存储任务之间的通信。如果用户未提供姓名,则默认为 _sync_topic_spanner_connector_connectorname

  • connector.spanner.sync.poll.duration:同步主题的轮询时长。默认值为 500 毫秒。

  • connector.spanner.sync.request.timeout.ms:向同步主题发送请求的超时。默认值为 5000 毫秒。

  • connector.spanner.sync.delivery.timeout.ms:发布到同步主题的超时时间。默认值为 15,000 毫秒。

  • connector.spanner.sync.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.sync.publisher.wait.timeout:向同步主题发布消息的时间间隔。默认值为 5 毫秒。

  • connector.spanner.rebalancing.topic:再平衡主题的名称。再平衡主题是一个内部连接器主题,用于确定任务是否活跃。如果用户未提供姓名,则默认为 _rebalancing_topic_spanner_connector_connectorname

  • connector.spanner.rebalancing.poll.duration:再平衡主题的轮询时长。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.timeout:为再平衡主题提交偏移量的超时时间。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.rebalancing.task.waiting.timeout:任务在处理再平衡事件前等待的时长。默认值为 1000 毫秒。

有关可配置连接器属性的更详细的列表,请参阅 GitHub 代码库

限制