构建与 Kafka 的变更数据流连接

本页介绍了如何使用 Kafka 连接器来使用和转发 Spanner 更改数据流数据。

核心概念

以下是 Kafka 连接器的核心概念。

Debezium

Debezium 是一个开源项目,可为更改数据捕获提供低延迟数据流式传输平台。

Kafka 连接器

Kafka 连接器提供了一个 Spanner API 抽象,用于将 Spanner 更改数据流发布到 Kafka。借助此连接器,您无需管理更改流分区生命周期,而当您直接使用 Spanner API 时,则必须管理更改流分区生命周期。

Kafka 连接器会针对每个数据更改记录 mod 生成更改事件,并针对每个更改流跟踪表将更改事件记录发送到下游的单独 Kafka 主题。数据更改记录 mod 表示捕获的单个修改(插入、更新或删除)。单个数据更改记录可以包含多个 mod。

Kafka 连接器输出

Kafka 连接器会将更改流记录直接转发到单独的 Kafka 主题。输出主题名称应为 connector_name.table_name。如果该主题不存在,Kafka 连接器会自动使用该名称创建一个主题。

您还可以配置主题路由转换,将记录重新路由到您指定的主题。如果您想使用主题路由,请停用低水位功能。

记录排序

记录按 Kafka 主题中每个主键的提交时间戳排序。属于不同主键的记录没有排序保证。具有相同主键的记录会存储在同一 Kafka 主题分区中。如果您想处理整个事务,还可以使用数据更改记录server_transaction_idnumber_of_records_in_transaction 字段来组装 Spanner 事务。

更改事件

Kafka 连接器会为每个 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件都包含已更改行的键和值。

您可以使用 Kafka Connect 转换器ProtobufAVROJSONJSON Schemaless 格式生成数据更改事件。如果您使用的是生成架构的 Kafka Connect 转换器,则事件会包含键和值的单独架构。否则,事件只包含键和值。

密钥的架构永远不会更改。值的架构是连接器启动时间以来更改流跟踪的所有列的集合。

如果您将连接器配置为生成 JSON 事件,输出更改事件将包含以下五个字段:

  • 第一个 schema 字段指定了用于描述 Spanner 键架构的 Kafka Connect 架构。

  • 第一个 payload 字段具有上一个 schema 字段所述的结构,并包含所更改行的键。

  • 第二个 schema 字段指定了 Kafka Connect 架构,用于描述已更改行的架构。

  • 第二个 payload 字段具有上一个 schema 字段所述的结构,并且包含已更改行的实际数据。

  • source 字段是必填字段,用于描述事件的来源元数据。

以下是数据更改事件的示例:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

低水印

低水位描述了 Kafka 连接器保证已流式传输并发布到 Kafka 主题的所有事件的时间 T(时间戳 < T)。

您可以使用 gcp.spanner.low-watermark.enabled 参数在 Kafka 连接器中启用低水位标记。此参数在默认情况下处于停用状态。如果启用了低水印,则变更数据流数据更改记录中的 low_watermark 字段会填充 Kafka 连接器的当前低水印时间戳。

如果未生成任何记录,Kafka 连接器会向连接器检测到的 Kafka 输出主题发送定期的水印“心跳”。

这些水印心跳记录除 low_watermark 字段外均为空。然后,您可以使用低水位执行基于时间的汇总。例如,您可以使用低水位标记按主键的提交时间戳对事件进行排序。

元数据主题

Kafka 连接器以及 Kafka Connect 框架会创建多个元数据主题来存储与连接器相关的信息。建议不要修改这些元数据主题的配置或内容。

以下是元数据主题:

  • _consumer_offsets:由 Kafka 自动创建的主题。存储在 Kafka 连接器中创建的使用方的使用方偏移量。
  • _kafka-connect-offsets:由 Kafka Connect 自动创建的主题。存储连接器偏移量。
  • _sync_topic_spanner_connector_connectorname:连接器自动创建的主题。存储与变更数据流分区相关的元数据。
  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的主题。用于确定连接器任务是否活跃。
  • _debezium-heartbeat.connectorname:用于处理 Spanner 变更数据流心跳的主题。

Kafka 连接器运行时

以下内容介绍了 Kafka 连接器运行时。

可伸缩性

Kafka 连接器可横向扩展,并在多个 Kafka Connect 工作器之间分散的一个或多个任务上运行。

消息传送保证

Kafka 连接器支持“至少传送一次”保证。

容错性

Kafka 连接器可以容错。当 Kafka 连接器读取更改并生成事件时,它会记录为每个更改流分区处理的上次提交时间戳。如果 Kafka 连接器因任何原因(包括通信故障、网络问题或软件故障)而停止,则在重启后,Kafka 连接器会从上次停止的位置继续流式传输记录。

Kafka 连接器会在 Kafka 连接器的开始时间戳读取信息架构,以检索架构信息。默认情况下,Spanner 无法在版本保留期限(默认为 1 小时)之前的时间戳读取信息架构。如果您想从过去 1 小时之前开始启动连接器,则必须延长数据库的版本保留期限。

设置 Kafka 连接器

创建变更流

如需详细了解如何创建变更数据流,请参阅创建变更数据流。如需继续执行后续步骤,您需要有一个已配置变更数据流的 Spanner 实例。

请注意,如果您希望在每次数据更改事件中返回已更改和未更改的列,请使用值捕获类型 NEW_ROW。如需了解详情,请参阅值捕获类型

安装 Kafka 连接器 JAR

安装 ZookeeperKafkaKafka Connect 后,部署 Kafka 连接器的剩余任务包括下载连接器的插件归档文件、将 JAR 文件提取到 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path。然后,您需要重启 Kafka Connect 进程,以提取新的 JAR 文件。

如果您使用的是不可变容器,则可以从 Debezium 的容器映像中拉取 Zookeeper、Kafka 和 Kafka Connect 的映像。Kafka Connect 映像预安装了 Spanner 连接器。

如需详细了解如何安装基于 Debezium 的 Kafka 连接器 JAR,请参阅安装 Debezium

配置 Kafka 连接器

以下是连接到实例 test-instance 和项目 test-project 中数据库 users 中名为 changeStreamAll 的更改流的 Kafka 连接器的配置示例。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

此配置包含以下内容:

  • 在向 Kafka Connect 服务注册时连接器的名称。

  • 此 Spanner 连接器类的名称。

  • 项目 ID。

  • Spanner 实例 ID。

  • Spanner 数据库 ID。

  • 变更数据流名称。

  • 服务账号密钥的 JSON 对象。

  • (可选)要使用的 Spanner 数据库角色。

  • 任务数上限。

如需查看连接器属性的完整列表,请参阅 Kafka 连接器配置属性

将连接器配置添加到 Kafka Connect

如需开始运行 Spanner 连接器,请执行以下操作:

  1. 为 Spanner 连接器创建配置。

  2. 使用 Kafka Connect REST API 将该连接器配置添加到 Kafka Connect 集群。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。默认情况下,Kafka Connect 服务在端口 8083 上运行。该服务会记录配置并启动连接到 Spanner 数据库并将更改事件记录流式传输到 Kafka 主题的连接器任务。

以下是一个示例 POST 命令:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功响应示例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

更新 Kafka 连接器配置

如需更新连接器配置,请向运行的 Kafka Connect 服务发送具有相同连接器名称的 PUT 命令。

假设我们有一个使用上一部分中的配置运行的连接器。以下是一个示例 PUT 命令:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功响应示例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

停止 Kafka 连接器

如需停止连接器,请向具有相同连接器名称的正在运行的 Kafka Connect 服务发送 DELETE 命令。

假设我们有一个使用上一部分中的配置运行的连接器。以下是一个示例 DELETE 命令:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功响应示例:

HTTP/1.1 204 No Content

监控 Kafka 连接器

除了标准 Kafka Connect 和 Debezium 指标之外,Kafka 连接器还会导出自己的指标:

  • MilliSecondsLowWatermark:连接器任务的当前低水位(以毫秒为单位)。低水印描述了连接器保证已流式传输出时间戳小于 T 的所有事件的时间 T

  • MilliSecondsLowWatermarkLag:低水位与当前时间的延迟(以毫秒为单位)。流式传输了时间戳小于 T 的所有事件

  • LatencyLowWatermark<Variant>MilliSeconds:低水位与当前时间之间的延迟(以毫秒为单位)。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencySpanner<Variant>MilliSeconds:从 Spanner 提交时间戳到连接器读取的延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencyReadToEmit<Variant>MilliSeconds:从 Spanner 读取时间戳到连接器发出延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencyCommitToEmit<Variant>tMilliSeconds:Spanner 提交时间戳到连接器发射延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • LatencyCommitToPublish<Variant>MilliSeconds:Spanner 提交时间戳到 Kafka 发布时间戳的延迟时间。提供 P50、P95、P99、平均值、最小值和最大值变体。

  • NumberOfChangeStreamPartitionsDetected:当前连接器任务检测到的分区总数。

  • NumberOfChangeStreamQueriesIssued:当前任务发出的变更数据流查询的总数。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的有效更改数据流查询数量。

  • SpannerEventQueueCapacityStreamEventQueue 的总容量,该队列用于存储从更改流查询收到的元素。

  • SpannerEventQueueCapacity:剩余的 StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacityTaskStateChangeEventQueue 的总容量,该队列用于存储在连接器中发生的事件。

  • RemainingTaskStateChangeEventQueueCapacity:剩余的 TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的有效更改数据流查询数量。

Kafka 连接器配置属性

以下是连接器的必需配置属性:

  • name:连接器的唯一名称。尝试使用相同名称重新注册会导致失败。所有 Kafka Connect 连接器都需要此属性。

  • connector.class:连接器的 Java 类的名称。始终为 Kafka 连接器使用 io.debezium.connector.spanner.SpannerConnector 值。

  • tasks.max:应为此连接器创建的任务数量上限。

  • gcp.spanner.project.id:项目 ID

  • gcp.spanner.instance.id:Spanner 实例 ID

  • gcp.spanner.database.id:Spanner 数据库 ID

  • gcp.spanner.change.stream:Spanner 变更数据流名称

  • gcp.spanner.credentials.json:服务账号密钥 JSON 对象。

  • gcp.spanner.credentials.path:服务账号密钥 JSON 对象的文件路径。如果未提供上述字段,则此字段为必填字段。

  • gcp.spanner.database.role:要使用的 Spanner 数据库角色。仅当使用精细访问权限控制来保护更改流时,才需要此参数。数据库角色必须拥有变更数据流的 SELECT 特权和变更数据流的读取函数的 EXECUTE 特权。如需了解详情,请参阅变更数据流的精细访问权限控制

以下高级配置属性具有适用于大多数情况的默认值,因此很少需要在连接器的配置中指定:

  • gcp.spanner.low-watermark.enabled:指示是否为连接器启用了低水位标记。默认值为 false。

  • gcp.spanner.low-watermark.update-period.ms:更新低水位的时间间隔。默认值为 1000 毫秒。

  • heartbeat.interval.ms:Spanner 心跳间隔。默认值为 300000(5 分钟)。

  • gcp.spanner.start.time:连接器的开始时间。默认为当前时间。

  • gcp.spanner.end.time:连接器结束时间。默认值为无穷大。

  • tables.exclude.list:要排除更改事件的表。默认值为空。

  • tables.include.list:要包含更改事件的表。如果未填充,则包含所有表。默认值为空。

  • gcp.spanner.stream.event.queue.capacity:Spanner 事件队列容量。默认值为 10000。

  • connector.spanner.task.state.change.event.queue.capacity:任务状态更改事件队列容量。默认值为 1000。

  • connector.spanner.max.missed.heartbeats:更改流查询错过的检测信号的数量上限(超过此数量后,系统会抛出异常)。默认值为 10。

  • scaler.monitor.enabled:指示是否启用了任务自动扩缩功能。默认值为 false。

  • tasks.desired.partitions:每个任务的首选变更数据流分区数量。此参数是任务自动扩缩所必需的。默认值为 2。

  • tasks.min:任务数量下限。此参数是任务自动扩缩所必需的。默认为 1。

  • connector.spanner.sync.topic:同步主题的名称,这是一个内部连接器主题,用于存储任务之间的通信。如果用户未提供名称,则默认为 _sync_topic_spanner_connector_connectorname

  • connector.spanner.sync.poll.duration:同步主题的轮询时长。默认值为 500 毫秒。

  • connector.spanner.sync.request.timeout.ms:对同步主题的请求超时时间。默认值为 5000 毫秒。

  • connector.spanner.sync.delivery.timeout.ms:向同步主题发布内容的超时时间。默认值为 15,000 毫秒。

  • connector.spanner.sync.commit.offsets.interval.ms:为同步主题提交偏移的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.sync.publisher.wait.timeout:向同步主题发布消息的时间间隔。默认为 5 毫秒。

  • connector.spanner.rebalancing.topic:重新平衡主题的名称。重新平衡主题是一个内部连接器主题,用于确定任务是否处于活动状态。如果用户未提供名称,则默认为 _rebalancing_topic_spanner_connector_connectorname

  • connector.spanner.rebalancing.poll.duration:重新平衡主题的轮询时长。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.timeout:提交重新平衡主题的偏移量的超时时间。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.interval.ms:为同步主题提交偏移的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.rebalancing.task.waiting.timeout:任务在处理重新平衡事件之前等待的时长。默认值为 1000 毫秒。

如需查看可配置连接器属性的更详细列表,请参阅 GitHub 代码库

限制