构建变更数据流与 Kafka 的连接

本页面介绍了如何使用 Kafka 连接器使用和转发 Spanner 变更数据流数据。

核心概念

下面介绍了 Kafka 连接器的核心概念。

去锕

Debezium 是一个开源项目,提供用于变更数据捕获的低延迟数据流平台。

Kafka 连接器

Kafka 连接器对 Spanner API 提供了抽象化,用于将 Spanner 变更数据流发布到 Kafka。借助此连接器,您无需管理变更数据流分区生命周期,而这在直接使用 Spanner API 时就需要管理。

Kafka 连接器会为每个数据变更记录 mod 生成一个更改事件,并针对每个变更数据流跟踪表将更改事件记录发送到下游单独的 Kafka 主题。数据更改记录 mod 表示捕获的单次修改(插入、更新或删除)。单个数据更改记录可以包含多个 mod。

Kafka 连接器输出

Kafka 连接器会将变更数据流记录直接转发到单独的 Kafka 主题。输出主题名称应为 connector_name.table_name。如果该主题不存在,Kafka 连接器会在该名称下自动创建一个主题。

您还可以配置主题路由转换,将记录重新路由到您指定的主题。如果要使用主题路由,请停用低水印功能。

记录排序

记录按 Kafka 主题中每个主键的提交时间戳进行排序。属于不同主键的记录不保证排序。具有相同主键的记录存储在同一 Kafka 主题分区中。如果要处理整个事务,您还可以使用数据变更记录server_transaction_idnumber_of_records_in_transaction 字段来组建 Spanner 事务。

更改事件

Kafka 连接器会为每个 INSERTUPDATEDELETE 操作生成一个数据更改事件。每个事件都包含已更改行的键和值。

您可以使用 Kafka Connect 转换器生成 ProtobufAVROJSONJSON Schemaless 格式的数据更改事件。如果您使用生成架构的 Kafka Connect 转换器,则事件会为键和值包含单独的架构。否则,事件将仅包含键和值。

键的架构从不改变。这些值的架构是自连接器启动以来变更数据流所跟踪的所有列的组合。

如果将连接器配置为生成 JSON 事件,则输出更改事件将包含五个字段:

  • 第一个 schema 字段指定描述 Spanner 键架构的 Kafka Connect 架构。

  • 第一个 payload 字段具有前一个 schema 字段描述的结构,并且包含发生更改的行的键。

  • 第二个 schema 字段指定 Kafka Connect 架构,用于描述已更改行的架构。

  • 第二个 payload 字段具有前面的 schema 字段描述的结构,并且包含已更改的行的实际数据。

  • source 字段是必填字段,用于说明事件的源元数据。

以下是数据更改事件的示例:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

低水印

低水印描述了 Kafka 连接器保证已流式传输并发布到 Kafka 主题的所有时间戳 < T 的事件的时间 T。

您可以使用 gcp.spanner.low-watermark.enabled 参数在 Kafka 连接器中启用低水印。此参数默认处于停用状态。如果启用了低水印,则变更数据流数据变更记录中的 low_watermark 字段将填充 Kafka 连接器的当前低水印时间戳。

如果没有生成记录,Kafka 连接器会定期向连接器检测到的 Kafka 输出主题发送水印“心跳”。

这些水印检测信号是空记录(low_watermark 字段除外)。然后,您可以使用低水印执行基于时间的汇总。例如,您可以使用低水印,按主键的提交时间戳对事件进行排序。

元数据主题

Kafka 连接器和 Kafka Connect 框架会创建多个元数据主题,用于存储连接器相关信息。不建议修改这些元数据主题的配置或内容。

以下是元数据主题:

  • _consumer_offsets:Kafka 自动创建的主题。存储在 Kafka 连接器中创建的使用方的使用方偏移。
  • _kafka-connect-offsets:Kafka Connect 自动创建的主题。存储连接器的偏移量。
  • _sync_topic_spanner_connector_connectorname:连接器自动创建的主题。存储有关变更数据流分区的元数据。
  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的主题。用于确定连接器任务的活动性。
  • _debezium-heartbeat.connectorname:用于处理 Spanner 变更数据流检测信号的主题。

Kafka 连接器运行时

下面介绍了 Kafka 连接器运行时。

可伸缩性

Kafka 连接器可水平伸缩,并可在分布在多个 Kafka Connect 工作器中的一项或多项任务上运行。

消息传送保证

Kafka 连接器支持至少传送一次保证。

容错性

Kafka 连接器可以容忍故障。当 Kafka 连接器读取更改并生成事件时,它会记录对每个更改流分区处理的上次提交时间戳。如果 Kafka 连接器因任何原因(包括通信故障、网络问题或软件故障)停止运行,则重启后,Kafka 连接器会继续将记录流式传输到上次停止的位置。

Kafka 连接器会读取 Kafka 连接器开始时间戳的信息架构,以检索架构信息。默认情况下,Spanner 无法在版本保留期限(默认为一小时)之前的读取时间戳处读取信息架构。如果要在过去一小时之前启动连接器,则必须增加数据库的版本保留期限。

设置 Kafka 连接器

创建变更流

如需详细了解如何创建变更流,请参阅创建变更流。如需继续执行后续步骤,需要一个配置了变更数据流的 Spanner 实例。

请注意,如果您希望在每个数据更改事件中都返回已更改的列和未更改的列,请使用值捕获类型 NEW_ROW。如需了解详情,请参阅值捕获类型

安装 Kafka 连接器 JAR

安装 ZookeeperKafkaKafka Connect 后,部署 Kafka 连接器的其余任务是下载连接器的插件归档,将 JAR 文件提取到 Kafka Connect 环境,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path 中。然后,您需要重启 Kafka Connect 进程以选择新的 JAR 文件。

如果您使用的是不可变容器,则可以从 Debezium 的容器映像中针对 Zookeeper、Kafka 和 Kafka Connect 拉取映像。Kafka Connect 映像预安装了 Spanner 连接器。

如需详细了解如何安装基于 Debezium 的 Kafka 连接器 JAR,请参阅安装 Debezium

配置 Kafka 连接器

以下 Kafka 连接器的配置示例会连接到实例 test-instance 和项目 test-project 中数据库 users 中名为 changeStreamAll 的变更数据流。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

此配置包含以下内容:

  • 注册 Kafka Connect 服务时的连接器名称。

  • 此 Spanner 连接器类的名称。

  • 项目 ID。

  • Spanner 实例 ID。

  • Spanner 数据库 ID。

  • 变更数据流的名称。

  • 服务帐号密钥的 JSON 对象。

  • (可选)要使用的 Spanner 数据库角色。

  • 任务数量上限。

如需查看连接器属性的完整列表,请参阅 Kafka 连接器配置属性

将连接器配置添加到 Kafka Connect

如需开始运行 Spanner 连接器,请执行以下操作:

  1. 为 Spanner 连接器创建配置。

  2. 使用 Kafka Connect REST API 将该连接器配置添加到 Kafka Connect 集群。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。默认情况下,Kafka Connect 服务在端口 8083 上运行。该服务会记录配置并启动连接器任务,该任务连接到 Spanner 数据库并将更改事件记录流式传输到 Kafka 主题。

以下是一个 POST 命令示例:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功响应示例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

更新 Kafka 连接器配置

如需更新连接器配置,请向正在运行的具有相同连接器名称的 Kafka Connect 服务发送 PUT 命令。

假设我们有一个使用上一部分中的配置运行的连接器。以下是一个 PUT 命令示例:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功响应示例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

停止 Kafka 连接器

如需停止连接器,请向正在运行的具有相同连接器名称的 Kafka Connect 服务发送 DELETE 命令。

假设我们有一个使用上一部分中的配置运行的连接器。以下是一个 DELETE 命令示例:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功响应示例:

HTTP/1.1 204 No Content

监控 Kafka 连接器

除了标准的 Kafka Connect 和 Debezium 指标之外,Kafka 连接器还会导出自己的指标:

  • MilliSecondsLowWatermark:连接器任务的当前低水印(以毫秒为单位)。低水印表示时间 T,此时连接器保证已流式传输出时间戳小于 T 的所有事件

  • MilliSecondsLowWatermarkLag:低水印相对于当前时间的延迟时间(以毫秒为单位)。流式传输了时间戳 < T 的所有事件

  • LatencyLowWatermark<Variant>MilliSeconds:低水印当前时间之后的延迟(以毫秒为单位)。我们提供了 P50、P95、P99、Average、Min 和 Max 变体。

  • LatencySpanner<Variant>MilliSeconds:Spanner-commit-timestamp-to-connector-read 延迟时间。提供 P50、P95、P99、平均水平、最小值、最大值等规格。

  • LatencyReadToEmit<Variant>MilliSeconds:Spanner-read-timestamp-to-connector-emit 延迟时间。我们提供了 P50、P95、P99、Average、Min 和 Max 变体。

  • LatencyCommitToEmit<Variant>tMilliSeconds:Spanner-commit-timestamp-to-connector-emit 延迟时间。我们提供了 P50、P95、P99、Average、Min 和 Max 变体。

  • LatencyCommitToPublish<Variant>MilliSeconds:Spanner-commit-timestamp-to Kafka-publish-timestamp 延迟时间。提供 P50、P95、P99、平均水平、最小值、最大值等规格。

  • NumberOfChangeStreamPartitionsDetected:当前连接器任务检测到的分区总数。

  • NumberOfChangeStreamQueriesIssued:当前任务发出的变更数据流查询总数。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的变更数据流查询的活跃数量。

  • SpannerEventQueueCapacityStreamEventQueue 的总容量,该队列用于存储从变更数据流查询接收的元素。

  • SpannerEventQueueCapacity:剩余的 StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacityTaskStateChangeEventQueue(用于存储连接器中发生的事件的队列)的总容量。

  • RemainingTaskStateChangeEventQueueCapacity:剩余的 TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的变更数据流查询的活跃数量。

Kafka 连接器配置属性

以下是连接器必需的配置属性:

  • name:连接器的唯一名称。尝试使用相同名称重新注册会导致失败。所有 Kafka Connect 连接器都需要此属性。

  • connector.class:连接器的 Java 类的名称。始终为 Kafka 连接器使用 io.debezium.connector.spanner.SpannerConnector 值。

  • tasks.max:应为此连接器创建的任务数上限。

  • gcp.spanner.project.id:项目 ID

  • gcp.spanner.instance.id:Spanner 实例 ID

  • gcp.spanner.database.id:Spanner 数据库 ID

  • gcp.spanner.change.stream:Spanner 变更数据流名称

  • gcp.spanner.credentials.json:服务帐号密钥 JSON 对象。

  • gcp.spanner.credentials.path:服务帐号密钥 JSON 对象的文件路径。如果未提供上述字段,则为必填字段。

  • gcp.spanner.database.role:要使用的 Spanner 数据库角色。仅当变更数据流受精细访问权限控制时才需要。数据库角色必须具有变更数据流的 SELECT 权限,以及变更数据流读取函数的 EXECUTE 权限。如需了解详情,请参阅更改流的精细访问权限控制

以下高级配置属性具有适合大多数情况的默认值,因此很少需要在连接器的配置中指定:

  • gcp.spanner.low-watermark.enabled:用于指明连接器是否启用了低水印。默认值为 false。

  • gcp.spanner.low-watermark.update-period.ms:低水印的更新时间间隔。默认值为 1000 毫秒。

  • heartbeat.interval.ms:Spanner 检测信号间隔。默认值为 300,000(五分钟)。

  • gcp.spanner.start.time:连接器的开始时间。默认设置为当前时间。

  • gcp.spanner.end.time:连接器结束时间。默认值为无穷大。

  • tables.exclude.list:要为其排除更改事件的表。默认值为空。

  • tables.include.list:要包含更改事件的表。如果未填充,则会包含所有表。默认值为空。

  • gcp.spanner.stream.event.queue.capacity:Spanner 事件队列容量。默认值为 10,000。

  • connector.spanner.task.state.change.event.queue.capacity:任务状态更改事件队列容量。默认值为 1000。

  • connector.spanner.max.missed.heartbeats:在抛出异常之前,变更流查询丢失的检测信号次数上限。默认值为 10。

  • scaler.monitor.enabled:指示是否已启用任务自动扩缩。默认值为 false。

  • tasks.desired.partitions:每个任务的首选变更数据流分区数量。任务自动扩缩需要使用此参数。默认值为 2。

  • tasks.min:任务数量下限。任务自动扩缩需要使用此参数。默认为 1。

  • connector.spanner.sync.topic:同步主题的名称,它是用于存储任务间通信的内部连接器主题。如果用户未提供姓名,则默认为 _sync_topic_spanner_connector_connectorname

  • connector.spanner.sync.poll.duration:同步主题的轮询时长。默认值为 500 毫秒。

  • connector.spanner.sync.request.timeout.ms:向同步主题发送的请求的超时时间。默认值为 5,000 毫秒。

  • connector.spanner.sync.delivery.timeout.ms:发布到同步主题的超时时间。默认值为 15000 毫秒。

  • connector.spanner.sync.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.sync.publisher.wait.timeout:消息发布到同步主题的时间间隔。默认值为 5 毫秒。

  • connector.spanner.rebalancing.topic:再平衡主题的名称。再平衡主题是用于确定任务活跃性的内部连接器主题。如果用户未提供姓名,则默认为 _rebalancing_topic_spanner_connector_connectorname

  • connector.spanner.rebalancing.poll.duration:再平衡主题的轮询时长。默认值为 5,000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.timeout:为再平衡主题提交偏移量的超时。默认值为 5,000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.rebalancing.task.waiting.timeout:任务在处理再平衡事件之前等待的时长。默认值为 1,000 毫秒。

如需更详细的可配置连接器属性列表,请参阅 GitHub 代码库

限制