Kafka への変更ストリーム接続を構築する

このページでは、Kafka コネクタを使用して Spanner の変更ストリームデータの使用と転送を行う方法について説明します。

基本コンセプト

以下では、Kafka コネクタのコアコンセプトについて説明します。

Debezium

Debezium は、変更データ キャプチャ用の低レイテンシのデータ ストリーミング プラットフォームを提供するオープンソース プロジェクトです。

Kafka コネクタ

Kafka コネクタは、Spanner API の抽象化を提供し、Spanner 変更ストリームを Kafka にパブリッシュします。このコネクタを使用すると、Spanner API を直接使用する際に必要な変更ストリーム パーティションのライフサイクルを管理する必要がなくなります。

Kafka コネクタは、データ変更レコードの変更ごとに変更イベントを生成し、変更ストリームで追跡されるテーブルごとに、変更イベント レコードをダウンストリームの個別の Kafka トピックに送信します。データ変更レコードの mod は、キャプチャされた単一の変更(挿入、更新、または削除)を表します。1 つのデータ変更レコードに複数の mod を含めることができます。

Kafka コネクタの出力

Kafka コネクタは、変更ストリーム レコードを個別の Kafka トピックに直接転送します。出力トピック名は connector_name.table_name にする必要があります。トピックが存在しない場合、Kafka コネクタはその名前でトピックを自動的に作成します。

トピック ルーティング変換を構成して、指定したトピックにレコードを再ルーティングすることもできます。トピック ルーティングを使用する場合は、低水位マーカー機能を無効にします。

レコードの順序

レコードは、Kafka トピックの主キーごとに commit タイムスタンプで並べ替えられます。異なる主キーに属するレコードには、順序が保証されません。同じ主キーを持つレコードは、同じ Kafka トピック パーティションに保存されます。トランザクション全体を処理する場合は、データ変更レコードserver_transaction_id フィールドと number_of_records_in_transaction フィールドを使用して Spanner トランザクションを組み立てることもできます。

変更イベント

Kafka コネクタは、INSERTUPDATEDELETE オペレーションごとにデータ変更イベントを生成します。各イベントには、変更された行のキーと値が含まれています。

Kafka Connect コンバータを使用して、ProtobufAVROJSONJSON Schemaless 形式のデータ変更イベントを生成できます。スキーマを生成する Kafka Connect コンバータを使用する場合、イベントにはキーと値の個別のスキーマが含まれます。それ以外の場合、イベントにはキーと値のみが含まれます。

鍵のスキーマは変更されません。値のスキーマは、コネクタの開始時間以降に変更ストリームが追跡したすべての列の統合です。

JSON イベントを生成するコネクタを構成すると、出力変更イベントには次の 5 つのフィールドが含まれます。

  • 最初の schema フィールドには、Spanner キースキーマを記述する Kafka Connect スキーマを指定します。

  • 最初の payload フィールドは、前の schema フィールドで記述されている構造を持ち、変更された行のキーが含まれています。

  • 2 番目の schema フィールドには、変更された行のスキーマを記述する Kafka Connect スキーマを指定します。

  • 2 番目の payload フィールドは、前の schema フィールドで記述されている構造を持ち、変更された行の実際のデータが含まれています。

  • source フィールドは、イベントのソース メタデータを記述する必須フィールドです。

データ変更イベントの例を次に示します。

{
  // The schema for the Spanner key.
  "schema": {
    "type": "struct",
    "name": "customers.Key",
    "optional": false,
    "fields": [
      {
        "type": "int64",
        "optional": "false"
        "field": "false"
      }
    ]
  },
  // The value of the Spanner key.
  "payload": {
      "id": "1"
  },
  // The schema for the payload, which contains the before and after values
  // of the changed row. The schema for the payload contains all the
  // columns that the change stream has tracked since the connector start
  // time.
  "schema": { 
    "type": "struct",
    "fields": [
      {
        // The schema for the before values of the changed row.
        "type": "struct",
        "fields": [
            {
                "type": "int32",
                "optional": false,
                "field": "id"
            },
            {
                "type": "string",
                "optional": true,
                "field": "first_name"
            }
        ],
        "optional": true,
        "name": "customers.Value",
        "field": "before"
      },
      {
        // The schema for the after values of the changed row.
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          }
        ],
          "optional": true,
          "name": "customers.Value",
          "field": "after"
        },
        {
          // The schema for the source metadata for the event.
          "type": "struct",
          "fields": [
            {
                "type": "string",
                "optional": false,
                "field": "version"
            },
            {
                "type": "string",
                "optional": false,
                "field": "connector"
            },
            {
                "type": "string",
                "optional": false,
                "field": "name"
            },
            {
                "type": "int64",
                "optional": false,
                "field": "ts_ms"
            },
            {
                "type": "boolean",
                "optional": true,
                "default": false,
                "field": "snapshot"
            },
            {
                "type": "string",
                "optional": false,
                "field": "db"
            },
            {
                "type": "string",
                "optional": false,
                "field": "sequence"
            },
            {
                "type": "string",
                "optional": false,
                "field": "project_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "instance_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "database_id"
            },
            {
                "type": "string",
                "optional": false,
                "field": "change_stream_name"
            },
            {
                "type": "string",
                "optional": true,
                "field": "table"
            }
            {
                "type": "string",
                "optional": true,
                "field": "server_transaction_id"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "low_watermark"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "read_at_timestamp"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_records_in_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "transaction_tag"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "system_transaction"
            }
            {
                "type": "string",
                "optional": true,
                "field": "value_capture_type"
            }
            {
                "type": "string",
                "optional": true,
                "field": "partition_token"
            }
            {
                "type": "int32",
                "optional": true,
                "field": "mod_number"
            }
            {
                "type": "boolean",
                "optional": true,
                "field": "is_last_record_in_transaction_in_partition"
            }
            {
                "type": "int64",
                "optional": true,
                "field": "number_of_partitions_in_transaction"
            }
          ],
          "optional": false,
          "name": "io.debezium.connector.spanner.Source",
          "field": "source"
        },
      ]
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      }
    ],
    "optional": false,
    "name": "connector_name.customers.Envelope"
  },
  "payload": {
    // The values of the row before the event.
    "before": null,
    // The values of the row after the event.
    "after": { 
        "id": 1,
        "first_name": "Anne",
    }
  },
  // The source metadata.
  "source": {
    "version": "{debezium-version}",
    "connector": "spanner",
    "name": "spanner_connector",
    "ts_ms": 1670955531785,
    "snapshot": "false",
    "db": "database",
    "sequence": "1",
    "project_id": "project",
    "instance_id": "instance",
    "database_id": "database",
    "change_stream_name": "change_stream",
    "table": "customers",
    "server_transaction_id": "transaction_id",
    "low_watermark": 1670955471635,
    "read_at_timestamp": 1670955531791,
    "number_records_in_transaction": 2,
    "transaction_tag": "",
    "system_transaction": false,
    "value_capture_type": "OLD_AND_NEW_VALUES",
    "partition_token": "partition_token",
    "mod_number": 0,
    "is_last_record_in_transaction_in_partition": true,
    "number_of_partitions_in_transaction": 1
  },
  "op": "c", 
  "ts_ms": 1559033904863 //
}

低水位マーカー

低ウォーターマークは、Kafka コネクタが T 未満のタイムスタンプすべてのイベントをストリーミングして Kafka トピックにパブリッシュされることが保証されている時間 T を表します。

Kafka コネクタで低ウォーターマークを有効にするには、gcp.spanner.low-watermark.enabled パラメータを使用します。このパラメータはデフォルトで無効になっています。低ウォーターマークが有効になっている場合、変更ストリーム データ変更レコードの low_watermark フィールドには、Kafka コネクタの現在の低ウォーターマーク タイムスタンプが入力されます。

レコードが生成されていない場合、Kafka コネクタは、コネクタによって検出された Kafka 出力トピックに定期的なウォーターマーク「ハートビート」を送信します。

これらのウォーターマーク ハートビート レコードは、low_watermark フィールドを除き空です。その後、低ウォーターマークを使用して時間ベースの集計を実行できます。たとえば、低ウォーターマークを使用して、主キー全体で commit タイムスタンプ別にイベントを並べ替えることができます。

メタデータのトピック

Kafka コネクタと Kafka Connect フレームワークは、コネクタ関連情報を保存するために複数のメタデータ トピックを作成します。これらのメタデータ トピックの構成やコンテンツを変更することはおすすめしません。

メタデータのトピックは次のとおりです。

  • _consumer_offsets: Kafka によって自動的に作成されるトピック。Kafka コネクタで作成されたコンシューマのコンシューマ オフセットを保存します。
  • _kafka-connect-offsets: Kafka Connect によって自動的に作成されるトピック。コネクタのオフセットを保存します。
  • _sync_topic_spanner_connector_connectorname: コネクタによって自動的に作成されるトピック。変更ストリーム パーティションに関するメタデータを格納します。
  • _rebalancing_topic_spanner_connector_connectorname: コネクタによって自動的に作成されるトピック。コネクタ タスクの存続を確認するために使用されます。
  • _debezium-heartbeat.connectorname: Spanner 変更ストリームのハートビートを処理するために使用されるトピック。

Kafka コネクタ ランタイム

以下では、Kafka コネクタ ランタイムについて説明します。

スケーラビリティ

Kafka コネクタは水平方向にスケーリング可能で、複数の Kafka Connect ワーカーに分散された 1 つ以上のタスクで実行されます。

メッセージ配信の保証

Kafka コネクタは、at-least-once 配信の保証をサポートしています。

フォールト トレラント

Kafka コネクタは障害に耐性があります。変更を読み取ってイベントを生成する際に、Kalfa コネクタは、処理された直近の commit タイムスタンプを変更ストリームのパーティションごとに記録します。Kafka コネクタがなんらかの理由(通信障害、ネットワークの問題、ソフトウェア障害など)で停止した場合、再起動すると、Kafka コネクタは最後に停止したところからレコードのストリーミングを再開します。

Kafka コネクタは、Kafka コネクタの開始タイムスタンプで情報スキーマを読み取り、スキーマ情報を取得します。デフォルトでは、Spanner はバージョン保持期間(デフォルトは 1 時間)より前の読み取りタイムスタンプで情報スキーマを読み取ることができません。1 時間より前のコネクタを開始する場合は、データベースのバージョン保持期間を長くする必要があります。

Kafka コネクタを設定する

変更ストリームを作成

変更ストリームの作成方法の詳細については、変更ストリームの作成をご覧ください。次のステップに進むには、変更ストリームが構成された Spanner インスタンスが必要です。

データ変更イベントごとに変更された列と変更されていない列の両方を返す場合は、値キャプチャ タイプ NEW_ROW を使用します。詳細については、値キャプチャ タイプをご覧ください。

Kafka コネクタ JAR をインストールする

ZookeeperKafkaKafka Connect がインストールされている場合、Kafka コネクタをデプロイする残りのタスクは、Connectors プラグイン アーカイブをダウンロードし、JAR ファイルを Kafka Connect 環境に解凍し、JAR ファイルを含むディレクトリを Kafka Connect の plugin.path に追加します。その後、Kafka Connect プロセスを再起動して新しい JAR ファイルを取得する必要があります。

不変のコンテナを使用している場合は、Zookeeper、Kafka、Kafka Connect の Debezium のコンテナ イメージからイメージを pull できます。Kafka Connect イメージには、Spanner コネクタがプリインストールされています。

Debezium ベースの Kafka コネクタ JAR のインストール方法については、Debezium のインストールをご覧ください。

Kafka コネクタを構成する

次の例は、インスタンス test-instance とプロジェクト test-project のデータベース userschangeStreamAll という変更ストリームに接続する Kafka コネクタの構成を示しています。

"name": "spanner-connector",
"config": {
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{"client_id": user@example.com}",
    "gcp.spanner.database.role": "cdc-role",
    "tasks.max": "10"
}

この構成には次のものが含まれます。

  • Kafka Connect サービスに登録されたときのコネクタの名前。

  • この Spanner コネクタ クラスの名前。

  • プロジェクト ID

  • Spanner インスタンス ID。

  • Spanner データベース ID。

  • 変更ストリーム名。

  • サービス アカウント キーの JSON オブジェクト。

  • (省略可)使用する Spanner データベース ロール。

  • タスクの最大数。

コネクタ プロパティの一覧については、Kafka コネクタの構成プロパティをご覧ください。

Kafka Connect にコネクタ構成を追加する

Spanner コネクタの実行を開始するには:

  1. Spanner コネクタの構成を作成します。

  2. Kafka Connect REST API を使用して、そのコネクタ構成を Kafka Connect クラスタに追加します。

この構成は、POST コマンドを使用して、実行中の Kafka Connect サービスに送信できます。デフォルトでは、Kafka Connect サービスはポート 8083 で実行されます。サービスは構成を記録し、Spanner データベースに接続して変更イベント レコードを Kafka トピックにストリーミングするコネクタタスクを開始します。

POST コマンドの例を以下に示します。

POST /connectors HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
  "name": "spanner-connector"
  "config": {
      "connector.class": "io.debezium.connector.spanner.SpannerConnector",
      "gcp.spanner.project.id": "test-project",
      "gcp.spanner.instance.id": "test-instance",
      "gcp.spanner.database.id": "users",
      "gcp.spanner.change.stream": "changeStreamAll",
      "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
      "heartbeat.interval.ms": "100",
      "tasks.max": "10"
  }
}

正常なレスポンスの例:

HTTP/1.1 201 Created
Content-Type: application/json
{
    "name": "spanner-connector",
    "config": {
        "connector.class": "io.debezium.connector.spanner.SpannerConnector",
        "gcp.spanner.project.id": "test-project",
        "gcp.spanner.instance.id": "test-instance",
        "gcp.spanner.database.id": "users",
        "gcp.spanner.change.stream": "changeStreamAll",
        "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
        "heartbeat.interval.ms": "100",
        "tasks.max": "10"
    },
    "tasks": [
        { "connector": "spanner-connector", "task": 1 },
        { "connector": "spanner-connector", "task": 2 },
        { "connector": "spanner-connector", "task": 3 }
    ]
}

Kafka コネクタの構成を更新する

コネクタ構成を更新するには、同じコネクタ名で実行中の Kafka Connect サービスに PUT コマンドを送信します。

前のセクションの構成で実行されているコネクタがあるとします。PUT コマンドの例を以下に示します。

PUT /connectors/spanner-connector/config HTTP/1.1
Host: http://localhost:8083
Accept: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

正常なレスポンスの例:

HTTP/1.1 200 OK
Content-Type: application/json
{
    "connector.class": "io.debezium.connector.spanner.SpannerConnector",
    "tasks.max": "10",
    "gcp.spanner.project.id": "test-project",
    "gcp.spanner.instance.id": "test-instance",
    "gcp.spanner.database.id": "users",
    "gcp.spanner.change.stream": "changeStreamAll",
    "gcp.spanner.credentials.json": "{\"client_id\": \"XXXX\".... }",
    "heartbeat.interval.ms": "100",
    "tasks.max": "10"
}

Kafka コネクタを停止する

コネクタを停止するには、同じコネクタ名の実行中の Kafka Connect サービスに DELETE コマンドを送信します。

前のセクションの構成で実行されているコネクタがあるとします。DELETE コマンドの例を以下に示します。

DELETE /connectors/spanner-connector HTTP/1.1
Host: http://localhost:8083

正常なレスポンスの例:

HTTP/1.1 204 No Content

Kafka コネクタをモニタリングする

Kafka コネクタは、標準の Kafka Connect 指標と Debezium 指標に加えて、独自の指標をエクスポートします。

  • MilliSecondsLowWatermark: コネクタタスクの現在の低水位マーカー(ミリ秒単位)。低ウォーターマークは、コネクタが T 未満のタイムスタンプを持つすべてのイベントをストリーミングしたことが保証されている時間 T を表します。

  • MilliSecondsLowWatermarkLag: 低ウォーターマークが現在の時刻から遅れている時間(ミリ秒単位)。タイムスタンプが T より小さいすべてのイベントをストリーミング出力

  • LatencyLowWatermark<Variant>MilliSeconds: 低ウォーターマークの現在の時刻からの遅延(ミリ秒単位)。P50、P95、P99、平均、最小、最大の各バリアントがあります。

  • LatencySpanner<Variant>MilliSeconds: Spanner の commit タイムスタンプからコネクタ読み取りまでのレイテンシ。P50、P95、P99、平均、最小、最大の各バリアントがあります。

  • LatencyReadToEmit<Variant>MilliSeconds: Spanner の読み取りタイムスタンプからコネクタへの出力までのレイテンシ。P50、P95、P99、平均、最小、最大の各バリアントがあります。

  • LatencyCommitToEmit<Variant>tMilliSeconds: Spanner の commit タイムスタンプからコネクタへのエミットのレイテンシ。P50、P95、P99、平均、最小、最大の各バリアントがあります。

  • LatencyCommitToPublish<Variant>MilliSeconds: Spanner の commit タイムスタンプから Kafka の publish タイムスタンプまでのレイテンシ。P50、P95、P99、平均、最小、最大の各バリアントがあります。

  • NumberOfChangeStreamPartitionsDetected: 現在のコネクタタスクで検出されたパーティションの合計数。

  • NumberOfChangeStreamQueriesIssued: 現在のタスクによって発行された変更ストリーム クエリの合計数。

  • NumberOfActiveChangeStreamQueries: 現在のコネクタタスクによって検出されたアクティブな変更ストリーム クエリの数。

  • SpannerEventQueueCapacity: 変更ストリーム クエリから受信した要素を格納するキューである StreamEventQueue の合計容量。

  • SpannerEventQueueCapacity: 残りの StreamEventQueue 容量。

  • TaskStateChangeEventQueueCapacity: コネクタで発生したイベントを格納するキューである TaskStateChangeEventQueue の合計容量。

  • RemainingTaskStateChangeEventQueueCapacity: 残りの TaskStateChangeEventQueue 容量。

  • NumberOfActiveChangeStreamQueries: 現在のコネクタタスクによって検出されたアクティブな変更ストリーム クエリの数。

Kafka コネクタの構成プロパティ

コネクタに必要な構成プロパティは次のとおりです。

  • name: コネクタの一意の名前。同じ名前で再度登録しようとすると、失敗します。このプロパティは、すべての Kafka Connect コネクタで必須です。

  • connector.class: コネクタの Java クラスの名前。Kafka コネクタには常に io.debezium.connector.spanner.SpannerConnector の値を使用します。

  • tasks.max: このコネクタ用に作成するタスクの最大数。

  • gcp.spanner.project.id: プロジェクト ID

  • gcp.spanner.instance.id: Spanner インスタンス ID

  • gcp.spanner.database.id: Spanner データベース ID

  • gcp.spanner.change.stream: Spanner 変更ストリーム名

  • gcp.spanner.credentials.json: サービス アカウント キーの JSON オブジェクト。

  • gcp.spanner.credentials.path: サービス アカウント キーの JSON オブジェクトのファイルパス。上記のフィールドが指定されていない場合は必須です。

  • gcp.spanner.database.role: 使用する Spanner データベース ロール。これは、変更ストリームがきめ細かいアクセス制御で保護されている場合にのみ必要です。データベースのロールには、変更ストリームに対する SELECT 権限と、変更ストリームの読み取り機能に対する EXECUTE 権限が必要です。詳細については、変更ストリームのきめ細かなアクセス制御をご覧ください。

次の高度な構成プロパティには、ほとんどの状況で機能するデフォルト値があるため、コネクタの構成で指定する必要はほとんどありません。

  • gcp.spanner.low-watermark.enabled: コネクタで低ウォーターマークが有効かどうかを示します。デフォルトは false です。

  • gcp.spanner.low-watermark.update-period.ms: 低ウォーターマークが更新される間隔。デフォルトは 1,000 ミリ秒です。

  • heartbeat.interval.ms: Spanner ハートビート間隔。デフォルトは 300000(5 分)です。

  • gcp.spanner.start.time: コネクタの開始時間。デフォルトは現在の時刻です。

  • gcp.spanner.end.time: コネクタの終了時間。デフォルトは無限大です。

  • tables.exclude.list: 変更イベントを除外するテーブル。デフォルトは空です。

  • tables.include.list: 変更イベントを含めるテーブル。入力されていない場合、すべてのテーブルが含まれます。デフォルトは空です。

  • gcp.spanner.stream.event.queue.capacity: Spanner イベントキューの容量。デフォルトは 10000 です。

  • connector.spanner.task.state.change.event.queue.capacity: タスク状態変更イベントキューの容量。デフォルトは 1,000 です。

  • connector.spanner.max.missed.heartbeats: 例外がスローされる前に変更ストリーム クエリで発生するハートビートの最大数。デフォルトは 10 です。

  • scaler.monitor.enabled: タスクの自動スケーリングが有効かどうかを示します。デフォルトは false です。

  • tasks.desired.partitions: タスクあたりの変更ストリーム パーティションの推奨数。このパラメータは、タスクの自動スケーリングに必要です。デフォルトは 2 です。

  • tasks.min: タスクの最小数。このパラメータは、タスクの自動スケーリングに必要です。デフォルトは 1 です。

  • connector.spanner.sync.topic: 同期トピックの名前。タスク間の通信に使用される内部コネクタ トピックです。ユーザーが名前を指定しなかった場合、デフォルトは _sync_topic_spanner_connector_connectorname です。

  • connector.spanner.sync.poll.duration: 同期トピックのポーリング時間。デフォルトは 500 ミリ秒です。

  • connector.spanner.sync.request.timeout.ms: 同期トピックへのリクエストのタイムアウト。デフォルトは 5000 ミリ秒です。

  • connector.spanner.sync.delivery.timeout.ms: 同期トピックへのパブリッシュのタイムアウト。デフォルトは 15000 ミリ秒です。

  • connector.spanner.sync.commit.offsets.interval.ms: 同期トピックのオフセットが commit される間隔。デフォルトは 60000 ミリ秒です。

  • connector.spanner.sync.publisher.wait.timeout: 同期トピックにメッセージがパブリッシュされる間隔。デフォルトは 5 ミリ秒です。

  • connector.spanner.rebalancing.topic: 再バランシング トピックの名前。再バランシング トピックは、タスクの存続を確認するために使用される内部コネクタ トピックです。ユーザーが名前を指定しなかった場合、デフォルトは _rebalancing_topic_spanner_connector_connectorname です。

  • connector.spanner.rebalancing.poll.duration: 再バランシング トピックのポーリング時間。デフォルトは 5000 ミリ秒です。

  • connector.spanner.rebalancing.commit.offsets.timeout: 再バランシング トピックのオフセットを commit するタイムアウト。デフォルトは 5000 ミリ秒です。

  • connector.spanner.rebalancing.commit.offsets.interval.ms: 同期トピックのオフセットが commit される間隔。デフォルトは 60000 ミリ秒です。

  • connector.spanner.rebalancing.task.waiting.timeout: タスクが再バランシング イベントを処理するまでの待機時間。デフォルトは 1000 ミリ秒です。

構成可能なコネクタ プロパティの詳細なリストについては、GitHub リポジトリをご覧ください。

制限事項