构建与 Kafka 的变更数据流连接

本页面介绍了如何使用 Kafka 连接器来处理和转发 Spanner 变更数据流数据。

核心概念

下文介绍了 Kafka 连接器的核心概念。

德贝兹乌姆

Debezium 是一个开源项目,提供用于变更数据捕获的低延迟数据流平台。

Kafka 连接器

Kafka 连接器通过 Spanner API 提供抽象,以将 Spanner 变更数据流发布到 Kafka。借助此连接器,您不必管理变更数据流分区生命周期,这在直接使用 Spanner API 时是必需的。

Kafka 连接器为每个数据变更记录 mod 生成一个更改事件,并将更改事件记录发送到下游的每个变更数据流跟踪表的单独 Kafka 主题中。数据变更记录 mod 表示捕获的单次修改(插入、更新或删除)。单个数据变更记录可以包含多个 mod。

Kafka 连接器输出

Kafka 连接器将变更数据流记录直接转发到单独的 Kafka 主题。输出主题名称应为 connector_name.table_name。如果主题不存在,Kafka 连接器会自动在该名称下创建一个主题。

您还可以配置主题路由转换,以将记录重新路由到您指定的主题。如果您要使用主题路由,请停用低水印功能。

记录排序

记录按 Kafka 主题中每个主键的提交时间戳进行排序。属于不同主键的记录没有顺序保证。具有相同主键的记录存储在同一 Kafka 主题分区中。如果要处理整个事务,还可以使用数据变更记录server_transaction_idnumber_of_records_in_transaction 字段来组建 Spanner 事务。

更改事件

Kafka 连接器为每个 INSERTUPDATEDELETE 操作生成数据更改事件。每个事件都包含所更改行对应的键和值。

您可以使用 Kafka Connect 转换器生成 ProtobufAVROJSONJSON Schemaless 格式的数据更改事件。如果您使用可生成架构的 Kafka Connect 转换器,则事件包含键和值的单独架构。否则,事件将仅包含键和值。

键的架构永远不会更改。这些值的架构是自连接器启动以来变更数据流跟踪的所有列的组合。

如果您将连接器配置为生成 JSON 事件,则输出更改事件将包含五个字段:

  • 第一个 schema 字段指定描述 Spanner 密钥架构的 Kafka Connect 架构。

  • 第一个 payload 字段具有前面的 schema 字段描述的结构,并且包含已更改行的键。

  • 第二个 schema 字段指定 Kafka Connect 架构,用于描述已更改行的架构。

  • 第二个 payload 字段具有前面的 schema 字段描述的结构,它包含发生更改的行的实际数据。

  • source 字段是必填字段,用于描述事件的来源元数据。

以下是数据更改事件的示例:

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

低水印

低水印描述时间 T,在该时间 Kafka 连接器保证已流式传输并发布到 Kafka 主题且时间戳小于 T 的所有事件。

您可以使用 gcp.spanner.low-watermark.enabled 参数在 Kafka 连接器中启用低水印。默认情况下,此参数处于停用状态。如果启用了低水位,变更数据流数据更改记录中的 low_watermark 字段会填充 Kafka 连接器的当前低水位时间戳。

如果没有生成任何记录,Kafka 连接器会定期向连接器检测到的 Kafka 输出主题发送“检测信号”。

这些水印检测信号是除 low_watermark 字段外的空记录。然后,您可以使用低水位执行基于时间的汇总。例如,您可以使用低水印按主键的提交时间戳对事件进行排序。

元数据主题

Kafka 连接器以及 Kafka Connect 框架会创建多个元数据主题来存储连接器相关信息。建议不要修改这些元数据主题的配置或内容。

以下是元数据主题:

  • _consumer_offsets:Kafka 自动创建的主题。 存储在 Kafka 连接器中创建的使用方的使用方偏移。
  • _kafka-connect-offsets:Kafka Connect 自动创建的主题。存储连接器偏移量。
  • _sync_topic_spanner_connector_connectorname:连接器自动创建的主题。存储有关变更数据流分区的元数据。
  • _rebalancing_topic_spanner_connector_connectorname:连接器自动创建的主题。用于确定连接器任务的活跃性。
  • _debezium-heartbeat.connectorname:用于处理 Spanner 变更数据流检测信号的主题。

Kafka 连接器运行时

下面介绍了 Kafka 连接器运行时。

可伸缩性

Kafka 连接器可横向伸缩,并在分布在多个 Kafka Connect 工作器中的一个或多个任务上运行。

消息传送保证

Kafka 连接器支持保证至少传送一次。

容错性

Kafka 连接器可容忍故障。当 Kafka 连接器读取更改并生成事件时,它会记录为每个更改流分区处理的上次提交时间戳。如果 Kafka 连接器因任何原因(包括通信故障、网络问题或软件故障)停止运行,则重启后,Kafka 连接器会继续从上次中断的地方流式传输记录。

Kafka 连接器读取 Kafka 连接器起始时间戳的信息架构以检索架构信息。默认情况下,Spanner 无法在版本保留期限(默认为 1 小时)之前的读取时间戳处读取信息架构。如果要在过去一小时之前启动连接器,则必须延长数据库的版本保留期限。

设置 Kafka 连接器

创建变更流

如需详细了解如何创建变更数据流,请参阅创建变更数据流。如需继续执行后续步骤,需要一个配置了变更数据流的 Spanner 实例。

请注意,如果您希望在每个数据更改事件发生时都返回已更改的列和未更改的列,请使用值捕获类型 NEW_ROW。如需了解详情,请参阅值捕获类型

安装 Kafka 连接器 JAR

安装 ZookeeperKafkaKafka Connect 后,部署 Kafka 连接器的剩余任务是下载连接器的插件归档文件,将 JAR 文件提取到 Kafka Connect 环境中,并将包含 JAR 文件的目录添加到 Kafka Connect 的 plugin.path 中。然后,您需要重启 Kafka Connect 进程以获取新的 JAR 文件。

如果您使用的是不可变容器,则可以从适用于 Zookeeper、Kafka 和 Kafka Connect 的 Debezium 容器映像拉取映像。Kafka Connect 映像预安装了 Spanner 连接器。

如需详细了解如何安装基于 Debezium 的 Kafka 连接器 JAR,请参阅安装 Debezium

配置 Kafka 连接器

以下是 Kafka 连接器的配置示例,该连接器连接到实例 test-instance 和项目 test-project 的数据库 users 中名为 changeStreamAll 的变更数据流。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

此配置包含以下内容:

  • 注册到 Kafka Connect 服务时的连接器名称。

  • 此 Spanner 连接器类的名称。

  • 项目 ID。

  • Spanner 实例 ID。

  • Spanner 数据库 ID。

  • 变更数据流的名称。

  • 服务帐号密钥的 JSON 对象。

  • (可选)要使用的 Spanner 数据库角色。

  • 任务数量上限。

如需查看连接器属性的完整列表,请参阅 Kafka 连接器配置属性

将连接器配置添加到 Kafka Connect

如需开始运行 Spanner 连接器,请执行以下操作:

  1. 为 Spanner 连接器创建配置。

  2. 使用 Kafka Connect REST API 将该连接器配置添加到您的 Kafka Connect 集群。

您可以使用 POST 命令将此配置发送到正在运行的 Kafka Connect 服务。默认情况下,Kafka Connect 服务在端口 8083 上运行。该服务会记录配置并启动连接到 Spanner 数据库的连接器任务,并将更改事件记录流式传输到 Kafka 主题。

以下是 POST 命令示例:

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功响应示例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

更新 Kafka 连接器配置

要更新连接器配置,请向正在运行的 Kafka Connect 服务发送具有相同名称的 PUT 命令。

假设我们有一个使用上一部分中的配置运行的连接器。以下是 PUT 命令示例:

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功响应示例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

停止 Kafka 连接器

如需停止该连接器,请向正在运行的 Kafka Connect 服务发送一条具有相同连接器名称的 DELETE 命令。

假设我们有一个使用上一部分中的配置运行的连接器。以下是 DELETE 命令示例:

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功响应示例:

HTTP/1.1 204 No Content

监控 Kafka 连接器

除了标准 Kafka Connect 和 Debezium 指标之外,Kafka 连接器还会导出自己的指标:

  • MilliSecondsLowWatermark:连接器任务的当前低水位(以毫秒为单位)。低水印描述了时间 T,在该时间点 T 保证连接器已流式传输时间戳 < T 的所有事件

  • MilliSecondsLowWatermarkLag:低水印相对于当前时间的延迟(以毫秒为单位)。流式传输了时间戳 < T 的所有事件

  • LatencyLowWatermark<Variant>MilliSeconds:低水位相对于当前时间的延迟(以毫秒为单位)。同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencySpanner<Variant>MilliSeconds:Spanner-commit-timestamp-to-connector-read 延迟时间。同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencyReadToEmit<Variant>MilliSeconds:Spanner-read-timestamp-to-connector-emit 延迟时间。同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencyCommitToEmit<Variant>tMilliSeconds:Spanner-commit-timestamp-to-connector-emit 延迟时间。同时提供 P50、P95、P99、平均、最小和最大变体。

  • LatencyCommitToPublish<Variant>MilliSeconds:Spanner-commit-timestamp-to Kafka-publish-timestamp 延迟时间。同时提供 P50、P95、P99、平均、最小和最大变体。

  • NumberOfChangeStreamPartitionsDetected:当前连接器任务检测到的分区总数。

  • NumberOfChangeStreamQueriesIssued:当前任务发出的变更数据流查询总数。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的变更数据流查询的有效数量。

  • SpannerEventQueueCapacityStreamEventQueue 的总容量,这是一个队列,用于存储从变更数据流查询接收的元素。

  • SpannerEventQueueCapacity:剩余的 StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacityTaskStateChangeEventQueue 的总容量,这是一个用于存储连接器中发生的事件的队列。

  • RemainingTaskStateChangeEventQueueCapacity:剩余的 TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries:当前连接器任务检测到的变更数据流查询的有效数量。

Kafka 连接器配置属性

以下是连接器必需的配置属性:

  • name:连接器的唯一名称。尝试使用同一名称重新注册时会导致失败。所有 Kafka Connect 连接器都需要此属性。

  • connector.class:连接器的 Java 类的名称。请始终为 Kafka 连接器使用 io.debezium.connector.spanner.SpannerConnector 值。

  • tasks.max:应为此连接器创建的任务数上限。

  • gcp.spanner.project.id:项目 ID

  • gcp.spanner.instance.id:Spanner 实例 ID

  • gcp.spanner.database.id:Spanner 数据库 ID

  • gcp.spanner.change.stream:Spanner 变更数据流名称

  • gcp.spanner.credentials.json:服务帐号密钥 JSON 对象。

  • gcp.spanner.credentials.path:指向服务帐号密钥 JSON 对象的文件路径。如果未提供上述字段,则为必填。

  • gcp.spanner.database.role:要使用的 Spanner 数据库角色。仅当变更数据流受精细访问权限控制时保护时,才需要执行此操作。数据库角色必须拥有变更数据流的 SELECT 权限,以及变更数据流读取函数的 EXECUTE 权限。如需了解详情,请参阅变更数据流的精细访问权限控制

以下高级配置属性具有适用于大多数情况的默认值,因此很少需要在连接器的配置中指定:

  • gcp.spanner.low-watermark.enabled:指示是否为连接器启用了低水印。默认值为 false。

  • gcp.spanner.low-watermark.update-period.ms:低水印的更新时间间隔。默认值为 1000 毫秒。

  • heartbeat.interval.ms:Spanner 检测信号间隔。默认值为 300000(五分钟)。

  • gcp.spanner.start.time:连接器开始时间。默认值为当前时间。

  • gcp.spanner.end.time:连接器结束时间。默认值为无穷大。

  • tables.exclude.list:要排除更改事件的表。默认值为空。

  • tables.include.list:包含更改事件的表。如果未填充,则包含所有表。默认值为空。

  • gcp.spanner.stream.event.queue.capacity:Spanner 事件队列容量。默认值为 10,000。

  • connector.spanner.task.state.change.event.queue.capacity:任务状态更改事件队列容量。默认值为 1000。

  • connector.spanner.max.missed.heartbeats:在抛出异常之前变更数据流查询错过的检测信号数量上限。默认值为 10。

  • scaler.monitor.enabled:指示是否已启用任务自动扩缩。默认值为 false。

  • tasks.desired.partitions:每个任务的变更数据流分区的首选数量。任务自动扩缩需要使用此参数。默认值为 2。

  • tasks.min:最小任务数量。任务自动扩缩需要使用此参数。默认为 1。

  • connector.spanner.sync.topic:同步主题的名称,这是一个内部连接器主题,用于存储任务之间的通信。如果用户未提供姓名,则默认为 _sync_topic_spanner_connector_connectorname

  • connector.spanner.sync.poll.duration:同步主题的轮询时长。默认值为 500 毫秒。

  • connector.spanner.sync.request.timeout.ms:向同步主题发送请求的超时。默认值为 5000 毫秒。

  • connector.spanner.sync.delivery.timeout.ms:发布到同步主题的超时时间。默认值为 15,000 毫秒。

  • connector.spanner.sync.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.sync.publisher.wait.timeout:向同步主题发布消息的时间间隔。默认值为 5 毫秒。

  • connector.spanner.rebalancing.topic:再平衡主题的名称。再平衡主题是一个内部连接器主题,用于确定任务是否活跃。如果用户未提供姓名,则默认为 _rebalancing_topic_spanner_connector_connectorname

  • connector.spanner.rebalancing.poll.duration:再平衡主题的轮询时长。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.timeout:为再平衡主题提交偏移量的超时时间。默认值为 5000 毫秒。

  • connector.spanner.rebalancing.commit.offsets.interval.ms:为同步主题提交偏移量的时间间隔。默认值为 60000 毫秒。

  • connector.spanner.rebalancing.task.waiting.timeout:任务在处理再平衡事件前等待的时长。默认值为 1000 毫秒。

有关可配置连接器属性的更详细的列表,请参阅 GitHub 代码库

限制