Kafka への変更ストリーム接続を構築する

このページでは、Kafka コネクタを使用して Spanner の変更ストリームデータの使用と転送を行う方法について説明します。

基本コンセプト

Kafka コネクタの基本コンセプトは次のとおりです。

Debezium

Debezium は、変更データ キャプチャ用の低レイテンシ データ ストリーミング プラットフォームを提供するオープンソース プロジェクトです。

Kafka コネクタ

Kafka コネクタは、Spanner API を抽象化して、Spanner 変更ストリームを Kafka に公開します。このコネクタを使用すると、変更ストリームのパーティション ライフサイクルを管理する必要がありません。これは、Spanner API を直接使用するときに必要です。

Kafka コネクタは、データ変更レコードのすべてに対して変更イベントを生成し、変更イベント レコードをトラックするテーブルごとに、変更イベント レコードを個別の Kafka トピックにダウンストリームに送信します。データ変更レコードの mod は、キャプチャされた単一の変更(挿入、更新、または削除)を表します。1 つのデータ変更レコードに複数の mod を含めることができます。

Kafka コネクタの出力

Kafka コネクタは、変更ストリーム レコードを別の Kafka トピックに直接転送します。出力トピック名は connector_name.table_name でなければなりません。トピックが存在しない場合、Kafka コネクタはその名前の下にトピックを自動的に作成します。

指定したトピックにレコードを再ルーティングするようにトピック ルーティング変換を構成することもできます。トピックのルーティングを使用する場合は、低ウォーターマーク機能を無効にします。

レコードの順序指定

レコードは、Kafka トピックの主キーごとの commit タイムスタンプで並べ替えられます。異なる主キーに属するレコードには、順序は保証されません。同じ主キーを持つレコードは、同じ Kafka トピック パーティションに保存されます。トランザクション全体を処理する場合は、データ変更レコードserver_transaction_id フィールドと number_of_records_in_transaction フィールドを使用して Spanner トランザクションを組み立てることもできます。

変更イベント

Kafka コネクタは、INSERTUPDATEDELETE のオペレーションごとにデータ変更イベントを生成します。各イベントには、変更された行のキーと値が含まれます。

Kafka Connect コンバータを使用して、ProtobufAVROJSON または JSON Schemaless 形式のデータ変更イベントを生成できます。スキーマを生成する Kafka Connect コンバータを使用する場合、イベントにはキーと値の個別のスキーマが含まれます。それ以外の場合、イベントにはキーと値のみが含まれます。

キーのスキーマは変更されません。値のスキーマは、コネクタの開始時間から変更ストリームが追跡したすべての列を組み合わせたものです。

JSON イベントを生成するようにコネクタを構成すると、出力変更イベントに 5 つのフィールドが含まれます。

  • 最初の schema フィールドには、Spanner キースキーマを記述する Kafka Connect スキーマを指定します。

  • 最初の payload フィールドには、前の schema フィールドで説明した構造があり、変更された行のキーが含まれています。

  • 2 番目の schema フィールドには、変更された行のスキーマを記述する Kafka Connect スキーマを指定します。

  • 2 番目の payload フィールドには、前の schema フィールドで説明した構造があり、変更された行の実際のデータが含まれています。

  • source フィールドは、イベントのソース メタデータを記述する必須フィールドです。

データ変更イベントの例を次に示します。

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": {
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": {
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c",
  "ts_ms": 1559033904863 //
}

低ウォーターマーク

低ウォーターマークは、Kafka コネクタが T 未満のタイムスタンプすべてのイベントをストリーミングして Kafka トピックにパブリッシュされることが保証されている時間 T を表します。

gcp.spanner.low-watermark.enabled パラメータを使用して、Kafka コネクタで低ウォーターマークを有効にできます。このパラメータはデフォルトで無効になっています。低ウォーターマークが有効な場合、変更ストリーム データ変更レコードの low_watermark フィールドには、Kafka コネクタの現在の低ウォーターマーク タイムスタンプが入力されます。

生成されているレコードがない場合、Kafka コネクタはコネクタによって検出された Kafka 出力トピックに定期的なウォーターマーク「ハートビート」を送信します。

これらのウォーターマークのハートビートは、low_watermark フィールドを除いて空のレコードです。その後、低ウォーターマークを使用して時間ベースの集計を実行できます。たとえば、低ウォーターマークを使用して、主キーにわたる commit タイムスタンプごとにイベントを並べ替えることができます。

メタデータのトピック

Kafka コネクタと Kafka Connect フレームワークは、コネクタに関連する情報を保存するためのメタデータ トピックを作成します。これらのメタデータ トピックの構成やコンテンツを変更することはおすすめできません。

メタデータのトピックは次のとおりです。

  • _consumer_offsets: Kafka によって自動的に作成されたトピック。Kafka コネクタで作成されたコンシューマのコンシューマ オフセットを保存します。
  • _kafka-connect-offsets: Kafka Connect によって自動的に作成されるトピック。コネクタ オフセットを保存します。
  • _sync_topic_spanner_connector_connectorname: コネクタによって自動的に作成されるトピック。変更ストリーム パーティションに関するメタデータを保存します。
  • _rebalancing_topic_spanner_connector_connectorname: コネクタによって自動的に作成されるトピック。コネクタタスクのアライブの判定に使用されます。
  • _debezium-heartbeat.connectorname: Spanner 変更ストリーム ハートビートの処理に使用されるトピック。

Kafka コネクタ ランタイム

Kafka コネクタ ランタイムについて説明します。

スケーラビリティ

Kafka コネクタは水平スケーリング可能であり、複数の Kafka Connect ワーカーに分散された 1 つ以上のタスクで実行されます。

メッセージ配信の保証

Kafka コネクタは、at-least-once(最低 1 回)配信の保証をサポートします。

フォールト トレラント

Kafka コネクタはエラーが許容されます。Kafka コネクタが変更を読み取ってイベントを生成すると、変更ストリーム パーティションごとに最後に処理された commit タイムスタンプを記録します。なんらかの理由で(通信障害、ネットワークの問題、ソフトウェア障害など)Kafka コネクタが停止すると、Kafka コネクタは再起動すると、最後に中断されたレコードのストリーミングを続行します。

Kafka コネクタは、Kafka コネクタの開始タイムスタンプで情報スキーマを読み取り、スキーマ情報を取得します。デフォルトでは、Spanner はバージョン保持期間(デフォルトでは 1 時間)より前のタイムスタンプで情報スキーマを読み取ることはできません。1 時間より前のコネクタを開始する場合は、データベースのバージョン保持期間を長くする必要があります。

Kafka コネクタを設定する

変更ストリームを作成

変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。次のステップに進むには、変更ストリームが構成された Spanner インスタンスが必要です。

データ変更イベントごとに変更された列と変更されていない列の両方を返す場合は、値キャプチャ タイプ NEW_ROW を使用します。詳細については、値キャプチャ タイプをご覧ください。

Kafka コネクタ JAR をインストールする

ZookeeperKafkaKafka Connect がインストールされている場合、Kafka コネクタをデプロイする残りのタスクは、Connectors プラグイン アーカイブをダウンロードし、JAR ファイルを Kafka Connect 環境に解凍し、JAR ファイルを含むディレクトリを Kafka Connect の plugin.path に追加します。その後、Kafka Connect プロセスを再起動して新しい JAR ファイルを取得する必要があります。

不変のコンテナを使用している場合は、Zookeeper、Kafka、Kafka Connect 用の Debezium のコンテナ イメージからイメージを pull できます。Kafka Connect イメージには、Spanner コネクタがプリインストールされています。

Debezium ベースの Kafka コネクタ JAR をインストールする方法の詳細については、Debezium のインストールをご覧ください。

Kafka コネクタを構成する

次に示すのは、インスタンス test-instance とプロジェクト test-project のデータベース users にある changeStreamAll という変更ストリームに接続する Kafka コネクタの構成例です。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

この構成には、以下が含まれます。

  • Kafka Connect サービスに登録されたときのコネクタの名前。

  • この Spanner コネクタクラスの名前。

  • プロジェクト ID

  • Spanner インスタンス ID。

  • Spanner データベース ID。

  • 変更ストリームの名前。

  • サービス アカウント キーの JSON オブジェクト。

  • (省略可)使用する Spanner データベース ロール。

  • タスクの最大数。

コネクタ プロパティの一覧については、Kafka コネクタの構成プロパティをご覧ください。

Kafka Connect にコネクタ構成を追加する

Spanner コネクタの実行を開始するには:

  1. Spanner コネクタの構成を作成します。

  2. Kafka Connect REST API を使用して、そのコネクタ構成を Kafka Connect クラスタに追加します。

この構成は、POST コマンドを使用して、実行中の Kafka Connect サービスに送信できます。デフォルトでは、Kafka Connect サービスはポート 8083 で実行されます。このサービスは構成を記録し、Spanner データベースに接続し、変更イベント レコードを Kafka トピックにストリーミングするコネクタタスクを開始します。

POST コマンドの例を以下に示します。

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

成功したレスポンスの例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Kafka コネクタの構成を更新する

コネクタ構成を更新するには、同じコネクタ名で実行中の PUT コマンドを Kafka Connect サービスに送信します。

前のセクションの構成で実行中のコネクタがあるとします。PUT コマンドの例を以下に示します。

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

成功したレスポンスの例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Kafka コネクタを停止する

コネクタを停止するには、同じコネクタ名で実行中の Kafka Connect サービスに DELETE コマンドを送信します。

前のセクションの構成で実行中のコネクタがあるとします。DELETE コマンドの例を以下に示します。

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

成功したレスポンスの例:

HTTP/1.1 204 No Content

Kafka コネクタをモニタリングする

Kafka コネクタは、標準の Kafka Connect と Debezium 指標に加えて、独自の指標をエクスポートします。

  • MilliSecondsLowWatermark: コネクタタスクの現在の低いウォーターマーク(ミリ秒単位)。低ウォーターマークは、コネクタがタイムスタンプ < T を持つすべてのイベントをストリーミングしたことが保証された時間 T を表します

  • MilliSecondsLowWatermarkLag: 現在の時刻より遅れている低レイテンシのウォーターマーク(ミリ秒単位)。タイムスタンプが T 未満のすべてのイベントをストリーミングします。

  • LatencyLowWatermark<Variant>MilliSeconds: 現在の時刻より低いウォーターマークのラグ(ミリ秒単位)。P50、P95、P99、平均、最小、最大のバリアントが用意されています。

  • LatencySpanner<Variant>MilliSeconds: Spanner-commit-timestamp-to-connector-read レイテンシ。 P50、P95、P99、Average、Min、Max のバリアントが用意されています。

  • LatencyReadToEmit<Variant>MilliSeconds: Spanner-read-timestamp-to-connector-emit レイテンシ。P50、P95、P99、平均、最小、最大のバリアントが用意されています。

  • LatencyCommitToEmit<Variant>tMilliSeconds: Spanner-commit-timestamp-to-connector-emit レイテンシ。P50、P95、P99、平均、最小、最大のバリアントが用意されています。

  • LatencyCommitToPublish<Variant>MilliSeconds: Spanner-commit-timestamp-to-Kafka-publish-timestamp のレイテンシ。P50、P95、P99、Average、Min、Max のバリアントが用意されています。

  • NumberOfChangeStreamPartitionsDetected: 現在のコネクタタスクによって検出されたパーティションの合計数。

  • NumberOfChangeStreamQueriesIssued: 現在のタスクによって発行された変更ストリーム クエリの合計数。

  • NumberOfActiveChangeStreamQueries: 現在のコネクタタスクによって検出された変更ストリーム クエリのアクティブな数。

  • SpannerEventQueueCapacity: StreamEventQueue の合計容量(変更ストリーム クエリから受信した要素を保存するキュー)。

  • SpannerEventQueueCapacity: 残りの StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacity: TaskStateChangeEventQueue の合計容量。コネクタで発生するイベントを保存するキューです。

  • RemainingTaskStateChangeEventQueueCapacity: 残りの TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries: 現在のコネクタタスクによって検出された変更ストリーム クエリのアクティブな数。

Kafka コネクタの構成プロパティ

コネクタに必要な構成プロパティは次のとおりです。

  • name: コネクタの一意の名前。同じ名前で再度登録しようとすると、失敗します。このプロパティは、すべての Kafka Connect コネクタで必要になります。

  • connector.class: コネクタの Java クラスの名前。Kafka コネクタには、常に io.debezium.connector.spanner.SpannerConnector の値を使用します。

  • tasks.max: このコネクタに作成するタスクの最大数。

  • gcp.spanner.project.id: プロジェクト ID

  • gcp.spanner.instance.id: Spanner インスタンス ID

  • gcp.spanner.database.id: Spanner データベース ID

  • gcp.spanner.change.stream: Spanner 変更ストリーム名

  • gcp.spanner.credentials.json: サービス アカウント キーの JSON オブジェクト。

  • gcp.spanner.credentials.path: サービス アカウント キーの JSON オブジェクトへのファイルパス。上記のフィールドが指定されていない場合は必須です。

  • gcp.spanner.database.role : 使用する Spanner データベース ロール。これは、変更ストリームがきめ細かいアクセス制御によって保護されている場合にのみ必要です。データベース ロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り関数に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かいアクセス制御をご覧ください。

次の詳細構成プロパティには、ほとんどの場合にデフォルトの設定が適用されるため、コネクタの構成で指定する必要はほとんどありません。

  • gcp.spanner.low-watermark.enabled: 低ウォーターマークがコネクタに対して有効になっているかどうかを示します。デフォルトは false です。

  • gcp.spanner.low-watermark.update-period.ms: 低いウォーターマークが更新される間隔。デフォルトは 1,000 ミリ秒です。

  • heartbeat.interval.ms: Spanner のハートビート間隔。デフォルトは 300000(5 分)です。

  • gcp.spanner.start.time: コネクタの開始時刻。デフォルトは現在の時刻です。

  • gcp.spanner.end.time: コネクタの終了時間。デフォルトは無限です。

  • tables.exclude.list: 変更イベントを除外するテーブル。デフォルトは空です。

  • tables.include.list: 変更イベントを含めるテーブル。入力されていない場合は、すべてのテーブルが含まれます。デフォルトは空です。

  • gcp.spanner.stream.event.queue.capacity: Spanner イベント キューの容量。デフォルトは 10000 です。

  • connector.spanner.task.state.change.event.queue.capacity: タスクの状態変更イベントキューの容量。デフォルトは 1,000 です。

  • connector.spanner.max.missed.heartbeats: 例外がスローされるまでの変更ストリーム クエリの欠落しているハートビートの最大数。デフォルトは 10 です。

  • scaler.monitor.enabled: タスクの自動スケーリングが有効かどうかを示します。デフォルトは false です。

  • tasks.desired.partitions: タスクごとの変更ストリーム パーティションの優先数。このパラメータはタスクの自動スケーリングに必要です。デフォルトは 2 です。

  • tasks.min: タスクの最小数。このパラメータはタスクの自動スケーリングに必要です。デフォルトは 1 です。

  • connector.spanner.sync.topic: 同期トピックの名前。タスク間の通信に使用する内部コネクタトピックです。ユーザーが名前を指定しなかった場合は、デフォルトで _sync_topic_spanner_connector_connectorname になります。

  • connector.spanner.sync.poll.duration: 同期トピックのポーリング期間。デフォルトは 500 ミリ秒です。

  • connector.spanner.sync.request.timeout.ms: 同期トピックへのリクエストのタイムアウト。デフォルトは 5000 ミリ秒です。

  • connector.spanner.sync.delivery.timeout.ms: 同期トピックへのパブリッシュのタイムアウト。デフォルトは 15000 ミリ秒です。

  • connector.spanner.sync.commit.offsets.interval.ms: 同期トピックのオフセットを commit する間隔。デフォルトは 60000 ミリ秒です。

  • connector.spanner.sync.publisher.wait.timeout: 同期トピックにメッセージが公開される間隔。デフォルトは 5 ミリ秒です。

  • connector.spanner.rebalancing.topic: 再調整トピックの名前。再調整トピックは、タスクの有効期間を判断するために使用する内部コネクタのトピックです。ユーザーが名前を指定しなかった場合は、デフォルトで _rebalancing_topic_spanner_connector_connectorname になります。

  • connector.spanner.rebalancing.poll.duration: 再調整トピックのポーリング期間。デフォルトは 5000 ミリ秒です。

  • connector.spanner.rebalancing.commit.offsets.timeout: 再調整トピックのオフセットを commit するタイムアウト。デフォルトは 5000 ミリ秒です。

  • connector.spanner.rebalancing.commit.offsets.interval.ms: 同期トピックのオフセットを commit する間隔。デフォルトは 60000 ミリ秒です。

  • connector.spanner.rebalancing.task.waiting.timeout: 再調整イベントを処理するまでタスクが待機する時間。デフォルトは 1000 ミリ秒です。

構成可能なコネクタのプロパティの詳細については、GitHub リポジトリをご覧ください。

制限事項