构建与 Kafka 的变更数据流连接

本页面介绍了如何使用 Kafka 连接器来 和转发 Spanner 变更流数据。

核心概念

下文介绍了 Kafka 连接器的核心概念。

Debezium

Debezium 是一个开源项目,旨在提供 用于变更数据捕获的低延迟数据流平台。

Kafka 连接器

Kafka 连接器提供了一个 Spanner API 抽象,用于将 Spanner 更改数据流发布到 Kafka。借助此连接器 则不必管理变更数据流分区生命周期, 在直接使用 Spanner API 时,您需要执行此操作。

Kafka 连接器会针对每个数据更改记录 mod 生成更改事件,并针对每个更改流跟踪表将更改事件记录发送到下游的单独 Kafka 主题。数据更改记录 mod 表示捕获的单个修改(插入、更新或删除)。单个数据更改记录可以包含多个 mod。

Kafka 连接器输出

Kafka 连接器直接转发变更数据流记录 一个单独的 Kafka 主题。输出主题名称应为 connector_name.table_name。如果该主题不存在,Kafka 连接器会自动使用该名称创建一个主题。

您还可以配置主题路由转换,将记录重新路由到您指定的主题。如果您要使用主题路由,请停用低水印功能。

记录排序

记录按每个主键的提交时间戳进行排序 Kafka 主题属于不同主键的记录不包含 排序保证具有相同主键的记录会存储在同一 Kafka 主题分区中。如果您想处理整个事务,还可以使用数据更改记录server_transaction_idnumber_of_records_in_transaction 字段来组装 Spanner 事务。

更改事件

Kafka 连接器为每个 INSERTUPDATE、 和 DELETE 运算。每个事件都包含已更改行的键和值。

您可以使用 Kafka Connect 转换器ProtobufAVROJSONJSON Schemaless 格式生成数据更改事件。如果您使用 生成架构的 Kafka Connect 转换器,事件包含 键和值使用单独的架构否则,事件只包含键和值。

键的架构永远不会更改。这些值的架构是 更改流跟踪的所有列的并集 。

如果您将连接器配置为生成 JSON 事件,输出更改事件将包含以下五个字段:

  • 第一个 schema 字段指定描述 Spanner 密钥架构的 Kafka Connect 架构。

  • 第一个 payload 字段具有前面的 schema 字段描述的结构,并且包含已更改行的键。

  • 第二个 schema 字段指定 Kafka Connect 架构,用于描述已更改行的架构。

  • 第二个 payload 字段具有前面的 schema 字段描述的结构,它包含发生更改的行的实际数据。

  • source 字段是必填字段,用于描述事件的来源元数据。

以下是数据更改事件的示例:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

低水印

低水位描述了 Kafka 连接器保证已流式传输并发布到 Kafka 主题的所有事件的时间 T(时间戳 < T)。

您可以使用 gcp.spanner.low-watermark.enabled 参数在 Kafka 连接器中启用低水位标记。此参数已停用 默认情况。如果启用了低水印,则变更数据流数据更改记录中的 low_watermark 字段会填充 Kafka 连接器的当前低水印时间戳。

如果没有生成任何记录,Kafka 连接器会定期发送 “心跳”水印到连接器检测到的 Kafka 输出主题。

这些水印检测信号是除 low_watermark 字段。然后,您可以使用低水位执行基于时间的汇总。例如,您可以使用低水位按提交对事件进行排序 时间戳。

元数据主题

Kafka 连接器和 Kafka Connect 框架可以创建 元数据主题来存储连接器相关信息。建议不要修改这些元数据主题的配置或内容。

以下是元数据主题:

  • _consumer_offsets:由 Kafka 自动创建的主题。存储在 Kafka 连接器中创建的使用方的使用方偏移量。
  • _kafka-connect-offsets:由 Kafka Connect 自动创建的主题。存储连接器偏移量。
  • _sync_topic_spanner_connector_connectorname:连接器自动创建的主题。存储有关变更数据流分区的元数据。
  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的主题。用于确定连接器任务是否活跃。
  • _debezium-heartbeat.connectorname:用于处理 Spanner 变更数据流心跳的主题。

Kafka 连接器运行时

以下内容介绍了 Kafka 连接器运行时。

可扩缩性

Kafka 连接器可横向扩展,并在多个 Kafka Connect 工作器之间分散的一个或多个任务上运行。

消息传送保证

Kafka 连接器支持“至少传送一次”保证。

容错性

Kafka 连接器可容忍故障。当 Kafka 连接器读取更改和 生成事件时,它会记录为每个更改处理的上次提交时间戳 流分区。如果 Kafka 连接器因任何原因(包括 通信故障、网络问题或软件故障) Kafka 连接器会从上次中断的位置继续流式传输记录。

Kafka 连接器在启动时读取信息架构 时间戳以检索架构信息。默认情况下,Spanner 无法在版本保留期限(默认为 1 小时)之前的时间戳读取信息架构。如果您想从以下时间之前启动连接器: 您必须延长数据库的版本保留期限 。

设置 Kafka 连接器

创建变更流

如需详细了解如何创建变更数据流,请参阅创建变更数据流。如需继续执行后续步骤,需要一个配置了变更数据流的 Spanner 实例。

请注意,如果您希望在每一列都返回已更改的列和未更改的列 数据更改事件,请使用值捕获类型 NEW_ROW。如需了解详情,请参阅值捕获类型

安装 Kafka 连接器 JAR

安装 ZookeeperKafkaKafka Connect 后,部署 Kafka 连接器的剩余任务包括下载连接器的插件归档文件、将 JAR 文件提取到 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。然后,您需要重启 Kafka Connect 进程,以提取新的 JAR 文件。

如果您使用的是不可变容器,则可以从 Debezium 的容器映像中拉取 Zookeeper、Kafka 和 Kafka Connect 的映像。Kafka Connect 映像预安装了 Spanner 连接器。

如需详细了解如何安装基于 Debezium 的 Kafka 连接器 JAR,请参阅安装 Debezium

配置 Kafka 连接器

以下是 Kafka 连接器的配置示例 该 API 会连接到名为 changeStreamAll 的变更数据流(位于 实例“test-instance”和项目“test-project”中的数据库“users”。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

此配置包含以下内容:

  • 注册到 Kafka Connect 服务时的连接器名称。

  • 此 Spanner 连接器类的名称。

  • 项目 ID。

  • Spanner 实例 ID。

  • Spanner 数据库 ID。

  • 变更数据流名称。

  • 服务账号密钥的 JSON 对象。

  • (可选)要使用的 Spanner 数据库角色。

  • 任务数量上限。

如需查看连接器属性的完整列表,请参阅 Kafka 连接器配置属性

将连接器配置添加到 Kafka Connect

如需开始运行 Spanner 连接器,请执行以下操作:

  1. 为 Spanner 连接器创建配置。

  2. 使用 Kafka Connect REST API 添加 连接到 Kafka Connect 集群。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。默认情况下,Kafka Connect 服务在端口 8083 上运行。服务记录配置并启动连接器任务 (用于连接到 Spanner 数据库并将变更事件记录流式传输到 Kafka 主题。

以下是 POST 命令示例:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功响应示例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

更新 Kafka 连接器配置

要更新连接器配置,请向正在运行的PUT 具有相同连接器名称的 Kafka Connect 服务。

假设我们有一个使用上一部分中的配置运行的连接器。以下是 PUT 命令示例:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功响应示例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

停止 Kafka 连接器

如需停止连接器,请向正在运行的DELETE 具有相同连接器名称的 Kafka Connect 服务。

假设我们有一个连接器,正在以 上一节。以下是 DELETE 命令示例:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功响应示例:

HTTP/1.1 204 No Content

监控 Kafka 连接器

除了标准 Kafka Connect 和 Debezium 指标之外,Kafka 连接器还会导出自己的指标:

  • MilliSecondsLowWatermark:连接器任务的当前低水位(以毫秒为单位)。通过 低水位标识的时间 T,在该时间 T,连接器保证具有 流式传输所有时间戳 < 的事件周四

  • MilliSecondsLowWatermarkLag:低水位与当前时间的延迟(以毫秒为单位)。流式传输了时间戳小于 T 的所有事件

  • LatencyLowWatermark<Variant>MilliSeconds:低水位相对于当前时间的延迟 (以毫秒为单位)。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencySpanner<Variant>MilliSeconds:从 Spanner 提交时间戳到连接器读取的延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencyReadToEmit<Variant>MilliSeconds:从 Spanner 读取时间戳到连接器发出延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencyCommitToEmit<Variant>tMilliSeconds:Spanner 提交时间戳到连接器发射延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencyCommitToPublish<Variant>MilliSeconds:Spanner-commit-timestamp-to Kafka-publish-timestamp 延迟时间。同时提供 P50、P95、P99、平均、最小和最大变体。

  • NumberOfChangeStreamPartitionsDetected:分区总数 当前连接器任务检测到的任务。

  • NumberOfChangeStreamQueriesIssued:变更数据流查询的总数 当前任务发出的所有任务。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的有效更改数据流查询数量。

  • SpannerEventQueueCapacityStreamEventQueue 的总容量,该队列用于存储从更改流查询收到的元素。

  • SpannerEventQueueCapacity:剩余的 StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacityTaskStateChangeEventQueue 的总容量,该队列用于存储连接器中发生的事件。

  • RemainingTaskStateChangeEventQueueCapacity:剩余的 TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的有效变更数据流查询数量。

Kafka 连接器配置属性

以下是连接器的必需配置属性:

  • name:连接器的唯一名称。尝试使用同一名称重新注册时会导致失败。所有 Kafka Connect 连接器都需要此属性。

  • connector.class:连接器的 Java 类的名称。请始终为 Kafka 连接器使用 io.debezium.connector.spanner.SpannerConnector 值。

  • tasks.max:应为此连接器创建的任务数量上限。

  • gcp.spanner.project.id:项目 ID

  • gcp.spanner.instance.id:Spanner 实例 ID

  • gcp.spanner.database.id:Spanner 数据库 ID

  • gcp.spanner.change.stream:Spanner 变更数据流名称

  • gcp.spanner.credentials.json:服务账号密钥 JSON 对象。

  • gcp.spanner.credentials.path:指向服务账号密钥 JSON 对象的文件路径。如果未提供上述字段,则此字段为必填字段。

  • gcp.spanner.database.role:拥有此角色的 Spanner 数据库角色 。仅当使用精细访问权限控制来保护更改流时,才需要此参数。此数据库角色必须拥有以下项目的 SELECT 权限: 变更数据流以及对变更数据流读取操作的 EXECUTE 权限 函数。如需了解详情,请参阅变更数据流的精细访问权限控制

以下高级配置属性具有适用于大多数情况的默认值,因此很少需要在连接器的配置中指定:

  • gcp.spanner.low-watermark.enabled:指示是否为连接器启用了低水位标记。默认值为 false。

  • gcp.spanner.low-watermark.update-period.ms:低水印的更新时间间隔。默认值为 1000 毫秒。

  • heartbeat.interval.ms:Spanner 心跳间隔。默认值为 300000(5 分钟)。

  • gcp.spanner.start.time:连接器的开始时间。默认值为当前时间。

  • gcp.spanner.end.time:连接器结束时间。默认值为无穷大。

  • tables.exclude.list:要排除更改事件的表。默认值为空。

  • tables.include.list:包含更改事件的表。如果未填充,则包含所有表。默认值为空。

  • gcp.spanner.stream.event.queue.capacity:Spanner 事件队列容量。默认值为 10,000。

  • connector.spanner.task.state.change.event.queue.capacity:任务状态更改事件队列容量。默认值为 1000。

  • connector.spanner.max.missed.heartbeats:更改流查询错过的检测信号的最大数量(超过此数量后,系统会抛出异常)。默认值为 10。

  • scaler.monitor.enabled:指示是否已启用任务自动扩缩。默认值为 false。

  • tasks.desired.partitions:每个任务的变更数据流分区的首选数量。任务自动扩缩需要使用此参数。默认值为 2。

  • tasks.min:任务数量下限。任务自动扩缩需要使用此参数。默认为 1。

  • connector.spanner.sync.topic:同步主题的名称,这是一个内部连接器主题,用于存储任务之间的通信。如果用户未提供名称,则默认为 _sync_topic_spanner_connector_connectorname

  • connector.spanner.sync.poll.duration:同步主题的轮询时长。默认为 500 毫秒。

  • connector.spanner.sync.request.timeout.ms:对同步主题的请求超时时间。默认值为 5000 毫秒。

  • connector.spanner.sync.delivery.timeout.ms:向同步主题发布内容的超时时间。默认值为 15,000 毫秒。

  • connector.spanner.sync.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.sync.publisher.wait.timeout:向同步主题发布消息的时间间隔。默认值为 5 毫秒。

  • connector.spanner.rebalancing.topic:再平衡主题的名称。重新平衡主题是一个内部连接器主题,用于确定任务是否处于活动状态。如果用户未提供名称,则默认为 _rebalancing_topic_spanner_connector_connectorname

  • connector.spanner.rebalancing.poll.duration:重新平衡主题的轮询时长。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.timeout:为再平衡主题提交偏移量的超时时间。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.interval.ms:为同步主题提交偏移的时间间隔。默认值为 60,000 毫秒。

  • connector.spanner.rebalancing.task.waiting.timeout:任务在处理重新平衡事件之前等待的时长。默认值为 1000 毫秒。

如需查看可配置连接器属性的更详细列表,请参阅 GitHub 代码库

限制