Cambia particiones, registros y consultas de transmisiones

En esta página, se describen en detalle los siguientes atributos de los flujos de cambios:

  • Su modelo de partición basado en divisiones
  • El formato y el contenido de los registros de flujos de cambios
  • La sintaxis de bajo nivel que se usa para consultar esos registros
  • Un ejemplo del flujo de trabajo de las consultas

La información de esta página es más relevante para usar la API de Spanner para consultar flujos de cambios de forma directa. Las aplicaciones que, en cambio, usan Dataflow para leer datos del flujo de cambios no necesitan trabajar directamente con el modelo de datos que se describe aquí.

Para obtener una guía de introducción más amplia sobre los flujos de cambios, consulta Descripción general de los flujos de cambios.

Cambia particiones de transmisión

Cuando se produce un cambio en una tabla observada por una transmisión de cambios, Spanner escribe un registro de flujo de cambios correspondiente en la base de datos, de forma síncrona en la misma transacción que los datos. Esto garantiza que, si la transacción se realiza correctamente, Spanner también capturó y mantuvo el cambio de forma correcta. De forma interna, Spanner ubica el registro del flujo de cambios y los datos cambian de modo que el mismo servidor los procese para minimizar la sobrecarga de escritura.

Como parte del DML en una división en particular, Spanner agrega la escritura a la división de datos del flujo de cambios correspondiente en la misma transacción. Debido a esta colocación, los flujos de cambios no agregan coordinación adicional entre los recursos de entrega, lo que minimiza la sobrecarga de confirmación de la transacción.

imagen

Spanner divide y combina datos de forma dinámica según la carga y el tamaño de la base de datos, y distribuye las divisiones entre los recursos de entrega.

Para habilitar las operaciones de escritura y lectura de flujos de cambios a escala, Spanner divide y combina el almacenamiento de flujos de cambios internos junto con los datos de la base de datos, lo que evita los hotspots de forma automática. A fin de admitir la lectura de registros de flujos de cambios casi en tiempo real a medida que se escalan las escrituras de la base de datos, la API de Spanner está diseñada para que se consulte un flujo de cambios de forma simultánea mediante particiones de flujos de cambios. Las particiones de flujo de cambios se asignan para cambiar las divisiones de datos de transmisión que contienen los registros del flujo de cambios. Las particiones de un flujo de cambios cambian de forma dinámica con el tiempo y se correlacionan con la forma en que Spanner divide y combina los datos de la base de datos de forma dinámica.

Una partición de flujo de cambios contiene registros para un rango de claves inmutable para un intervalo de tiempo específico. Cualquier partición de flujo de cambios se puede dividir en una o más particiones, o bien combinarse con otras particiones de flujos de cambios. Cuando ocurren estos eventos de división o combinación, se crean particiones secundarias para capturar los cambios en sus respectivos rangos de claves inmutables para el próximo intervalo de tiempo. Además de los registros de cambios de datos, una consulta de flujo de cambios muestra registros de partición secundarios para notificar a los lectores sobre las nuevas particiones de flujo de cambios que deben consultarse, así como registros de señal de monitoreo de funcionamiento para indicar el progreso hacia delante cuando no se realizaron operaciones de escritura recientemente.

Cuando se consulta una partición de flujo de cambios en particular, los registros de cambios se muestran en orden de marca de tiempo de confirmación. Cada registro de cambios se muestra exactamente una vez. En las particiones de los flujos de cambios, no hay un orden garantizado de los registros de cambios. Los registros de cambios de una clave primaria específica se muestran solo en una partición durante un intervalo de tiempo determinado.

Debido al linaje de particiones superior-secundario, para procesar los cambios de una clave en particular en el orden de la marca de tiempo de confirmación, los registros que se muestran desde las particiones secundarias deben procesarse solo después de que se hayan procesado los registros de todas las particiones superiores.

Cambia las funciones de lectura y la sintaxis de consulta del flujo

GoogleSQL

Puedes consultar los flujos de cambios mediante la API de ExecuteStreamingSql. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nombres de las funciones de lectura es READ_change_stream_name.

Suponiendo que existe un SingersNameStream de flujo de cambios en la base de datos, la sintaxis de consulta para GoogleSQL es la siguiente:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

La función de lectura acepta los siguientes argumentos:

Nombre del argumento Tipo ¿Es obligatorio? Descripción
start_timestamp TIMESTAMP Requeridos Especifica que se deben mostrar los registros con commit_timestamp mayor o igual que start_timestamp. El valor debe estar dentro del período de retención del flujo de cambios y debe ser menor o igual que la hora actual y mayor o igual que la marca de tiempo de la creación del flujo de cambios.
end_timestamp TIMESTAMP Opcional (predeterminado: NULL) Especifica que se deben mostrar los registros con commit_timestamp menor o igual que end_timestamp. El valor debe estar dentro del período de retención del flujo de cambios y debe ser mayor o igual que start_timestamp. La consulta finaliza después de mostrar todos los ChangeRecords hasta end_timestamp o cuando el usuario finaliza la conexión. Si es NULL o no se especifica, la consulta se ejecuta hasta que se muestren todos los ChangeRecords o el usuario finalice la conexión.
partition_token STRING Opcional (predeterminado: NULL) Especifica qué partición de transmisión de cambios se debe consultar según el contenido de los registros de particiones secundarias. Si es NULL o no se especifica, significa que el lector consulta el flujo de cambios por primera vez y no obtuvo tokens de partición específicos para consultar.
heartbeat_milliseconds INT64 Requeridos Determina la frecuencia con la que se muestra un ChangeRecord de señal de monitoreo de funcionamiento en caso de que no haya transacciones confirmadas en esta partición. El valor debe estar entre 1,000 (un segundo) y 30,0000 (cinco minutos).
read_options ARRAY Opcional (predeterminado: NULL) Opciones de lectura adicionales reservadas para uso futuro. En este momento, el único valor permitido es NULL.

Te recomendamos crear un método conveniente para compilar el texto de la consulta de la función de lectura y vincular parámetros a ella, como se muestra en el siguiente ejemplo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Puedes consultar las flujos de cambios mediante la API de ExecuteStreamingSql. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nombres de las funciones de lectura es spanner.read_json_change_stream_name.

Si suponemos que existe un SingersNameStream de flujo de cambios en la base de datos, la sintaxis de consulta para PostgreSQL es la siguiente:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

La función de lectura acepta los siguientes argumentos:

Nombre del argumento Tipo ¿Es obligatorio? Descripción
start_timestamp timestamp with time zone Requeridos Especifica que se deben mostrar los registros de cambio con commit_timestamp mayor o igual que start_timestamp. El valor debe estar dentro del período de retención del flujo de cambios y debe ser menor o igual que la hora actual y mayor o igual que la marca de tiempo de la creación del flujo de cambios.
end_timestamp timestamp with timezone Opcional (predeterminado: NULL) Especifica que se deben mostrar los registros de cambio con un commit_timestamp menor o igual que end_timestamp. El valor debe estar dentro del período de retención del flujo de cambios y debe ser mayor o igual que start_timestamp. La consulta finaliza después de mostrar todos los registros de cambios hasta end_timestamp o el usuario cancela la conexión. Si es NULL, la consulta se ejecuta hasta que se muestren todos los registros de cambios o hasta que el usuario finalice la conexión.
partition_token text Opcional (predeterminado: NULL) Especifica qué partición de transmisión de cambios se debe consultar según el contenido de los registros de particiones secundarias. Si es NULL o no se especifica, significa que el lector consulta el flujo de cambios por primera vez y no obtuvo tokens de partición específicos para consultar.
heartbeat_milliseconds bigint Requeridos Determina la frecuencia con la que se mostrará un ChangeRecord de señal de monitoreo de funcionamiento en caso de que no haya transacciones confirmadas en esta partición. El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco minutos).
null null Requeridos Reservado para uso futuro

Te recomendamos que crees un método conveniente para compilar el texto de la función de lectura y vincular parámetros a ella, como se muestra en el siguiente ejemplo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Cambiar el formato de registro de las transmisiones

GoogleSQL

La función de lectura de flujos de cambios muestra una sola columna ChangeRecord de tipo ARRAY<STRUCT<...>>. En cada fila, este array siempre contiene un solo elemento.

Los elementos del array tienen los siguientes tipos:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Esta struct tiene tres campos: data_change_record, heartbeat_record y child_partitions_record, cada uno de tipo ARRAY<STRUCT<...>>. En cualquier fila que muestre la función de lectura de flujos de cambios, solo uno de estos tres campos contiene un valor; los otros dos están vacíos o son NULL. Estos campos de array contienen, como máximo, un elemento.

En las siguientes secciones, se examina cada uno de estos tres tipos de registro.

PostgreSQL

La función de lectura de flujos de cambios muestra una sola columna ChangeRecord de tipo JSON con la siguiente estructura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Hay tres claves posibles en este objeto: data_change_record, heartbeat_record y child_partitions_record; el tipo de valor correspondiente es JSON. En cualquier fila que muestre la función de lectura de flujos de cambios, solo existe una de estas tres claves.

En las siguientes secciones, se examina cada uno de estos tres tipos de registro.

Registros de cambios de datos

Un registro de cambios de datos contiene un conjunto de cambios en una tabla con el mismo tipo de modificación (inserción, actualización o eliminación) confirmados en la misma marca de tiempo de confirmación en una partición de flujo de cambios para la misma transacción. Se pueden mostrar varios registros de cambios de datos para la misma transacción en varias particiones de flujos de cambios.

Todos los registros de cambios de datos tienen campos commit_timestamp, server_transaction_id y record_sequence, que juntos determinan el orden en el flujo de cambios para un registro de transmisión. Estos tres campos son suficientes para derivar el orden de los cambios y proporcionar coherencia externa.

Ten en cuenta que varias transacciones pueden tener la misma marca de tiempo de confirmación si tocan datos no superpuestos. El campo server_transaction_id ofrece la capacidad de distinguir qué conjunto de cambios (posiblemente en particiones del flujo de cambios) se emitió dentro de la misma transacción. La sincronización con los campos record_sequence y number_of_records_in_transaction también te permite almacenar en búfer y ordenar todos los registros de una transacción en particular.

Los campos de un registro de cambios de datos incluyen lo siguiente:

GoogleSQL

Campo Tipo Descripción
commit_timestamp TIMESTAMP La marca de tiempo en la que se confirmó el cambio
record_sequence STRING Es el número de secuencia del registro dentro de la transacción. Se garantiza que los números de secuencia sean únicos y que aumenten monótonamente (pero no necesariamente contiguos) en una transacción. Ordena los registros de la misma server_transaction_id por record_sequence para reconstruir el orden de los cambios dentro de la transacción.
server_transaction_id STRING Es una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo se debe usar en el contexto del procesamiento de registros de flujos de cambios y no está correlacionado con el ID de transacción en la API de Spanner.
is_last_record_in_transaction_in_partition BOOL Indica si este es el último registro de una transacción en la partición actual.
table_name STRING Nombre de la tabla afectada por el cambio.
value_capture_type STRING

Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio.

El tipo de captura de valores puede ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". De forma predeterminada, es "OLD_AND_NEW_VALUES". Para obtener más información, consulta los tipos de captura de valores.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
El nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna según se define en el esquema ("ordinal_position"). La primera columna de una tabla en el esquema tendría una posición ordinal de "1". El tipo de columna puede anidarse en columnas de array. El formato coincide con la estructura de tipos descrita en la referencia de la API de Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Describe los cambios que se realizaron, incluidos los valores clave primaria, los valores anteriores y los nuevos de las columnas modificadas o con seguimiento. La disponibilidad y el contenido de los valores anteriores y nuevos dependerán del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas sin clave.
mod_type STRING Describe el tipo de cambio. Puede ser INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 La cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones del flujo de cambios.
number_of_partitions_in_transaction INT64 La cantidad de particiones que mostrarán registros de cambios de datos para esta transacción.
transaction_tag STRING Es la etiqueta de transacción asociada con esta transacción.
is_system_transaction BOOL Indica si la transacción es una transacción del sistema.

PostgreSQL

Campo Tipo Descripción
commit_timestamp STRING La marca de tiempo en la que se confirmó el cambio
record_sequence STRING Es el número de secuencia del registro dentro de la transacción. Se garantiza que los números de secuencia sean únicos y que aumenten monótonamente (pero no necesariamente contiguos) en una transacción. Ordena los registros del mismo “server_transaction_id” por “record_Sequence” para reconstruir el orden de los cambios dentro de la transacción.
server_transaction_id STRING Es una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo debe usarse en el contexto del procesamiento de registros de flujos de cambios y no está correlacionado con el ID de transacción en la API de Spanner
is_last_record_in_transaction_in_partition BOOLEAN Indica si este es el último registro de una transacción en la partición actual.
table_name STRING Nombre de la tabla afectada por el cambio.
value_capture_type STRING

Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio.

El tipo de captura de valores puede ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". De forma predeterminada, es "OLD_AND_NEW_VALUES". Para obtener más información, consulta los tipos de captura de valores.

column_types

[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
El nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna según se define en el esquema ("ordinal_position"). La primera columna de una tabla en el esquema tendría una posición ordinal de "1". El tipo de columna puede anidarse en columnas de array. El formato coincide con la estructura de tipos descrita en la referencia de la API de Spanner.
mods

[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Describe los cambios que se realizaron, incluidos los valores clave primaria, los valores anteriores y los nuevos de las columnas modificadas o con seguimiento. La disponibilidad y el contenido de los valores anteriores y nuevos dependerán del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas sin clave.
mod_type STRING Describe el tipo de cambio. Puede ser INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 La cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones del flujo de cambios.
number_of_partitions_in_transaction NUMBER La cantidad de particiones que mostrarán registros de cambios de datos para esta transacción.
transaction_tag STRING Es la etiqueta de transacción asociada con esta transacción.
is_system_transaction BOOLEAN Indica si la transacción es una transacción del sistema.

A continuación, se incluye un par de ejemplos de registros de cambios en los datos. Describen una sola transacción en la que hay una transferencia entre dos cuentas. Ten en cuenta que las dos cuentas se encuentran en particiones de transmisión de cambios separadas.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

El siguiente registro de cambios de datos es un ejemplo de un registro con el tipo de captura de valor "NEW_VALUES". Ten en cuenta que solo se propagan los valores nuevos. Solo se modificó la columna "LastUpdate", por lo que solo se mostró esa columna.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

El siguiente registro de cambios de datos es un ejemplo de un registro con el tipo de captura de valor "NEW_ROW". Solo se modificó la columna "LastUpdate", pero se muestran todas las columnas con seguimiento.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

El siguiente registro de cambios de datos es un ejemplo de un registro con el tipo de captura de valor "NEW_ROW_AND_OLD_VALUES". Solo se modificó la columna "LastUpdate", pero se muestran todas las columnas con seguimiento. Este tipo de captura de valores captura el valor nuevo y el anterior de LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Récords de señales de monitoreo de funcionamiento

Cuando se muestra un registro de señal de monitoreo de funcionamiento, esto indica que se mostraron todos los cambios con un commit_timestamp menor o igual que el timestamp del registro de señal de monitoreo de funcionamiento, y los registros de datos futuros en esta partición deben tener marcas de tiempo de confirmación más altas que las que muestra el registro de señal de monitoreo de funcionamiento. Los registros de señal de monitoreo de funcionamiento se muestran cuando no hay cambios de datos escritos en una partición. Cuando hay cambios de datos escritos en la partición, se puede usar data_change_record.commit_timestamp en lugar de heartbeat_record.timestamp para indicar que el lector avanza en la lectura de la partición.

Puedes usar los registros de señal de monitoreo de funcionamiento que se muestran en las particiones para sincronizar los lectores en todas las particiones. Una vez que todos los lectores hayan recibido una señal de monitoreo de funcionamiento mayor que o igual a alguna marca de tiempo A, o bien hayan recibido datos o registros de partición secundaria superiores o iguales a la marca de tiempo A, los lectores sabrán que recibieron todos los registros confirmados en esa marca de tiempo A o antes, y pueden comenzar a procesar los registros almacenados en búfer, por ejemplo, ordenar los registros de partición cruzada por marca de tiempo y agruparlos por server_transaction_id.

Un registro de señal de monitoreo de funcionamiento contiene solo un campo:

GoogleSQL

Campo Tipo Descripción
timestamp TIMESTAMP La marca de tiempo del registro de la señal de monitoreo de funcionamiento.

PostgreSQL

Campo Tipo Descripción
timestamp STRING La marca de tiempo del registro de la señal de monitoreo de funcionamiento.

Un ejemplo de registro de señal de monitoreo de funcionamiento, que informa que se mostraron todos los registros con marcas de tiempo menores o iguales que la marca de tiempo de este registro:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Registros de particiones secundarias

Un registro de particiones secundarias muestra información sobre las particiones secundarias: los tokens de partición, los tokens de las particiones superiores y el start_timestamp que representa la marca de tiempo más temprana para la que las particiones secundarias contienen registros de cambios. Los registros cuyas marcas de tiempo de confirmación son inmediatamente anteriores a child_partitions_record.start_timestamp se muestran en la partición actual. Después de mostrar todos los registros de particiones secundarias para esta partición, esta consulta mostrará un estado de éxito, lo que indica que todos los registros se mostraron para esta partición.

Los campos de un registro de particiones secundarias incluyen lo siguiente:

GoogleSQL

Campo Tipo Descripción
start_timestamp TIMESTAMP Los registros de cambios de datos que muestran las particiones secundarias en este registro de partición secundaria tienen una marca de tiempo de confirmación mayor o igual que start_timestamp. Cuando se consulta una partición secundaria, la consulta debe especificar el token de partición secundaria y un start_timestamp mayor o igual que child_partitions_token.start_timestamp. Todos los registros de particiones secundarias que muestra una partición tienen el mismo start_timestamp y la marca de tiempo siempre se encuentra entre el start_timestamp y la end_timestamp especificados de la consulta.
record_sequence STRING Un número de secuencia monótonamente creciente que se puede usar para definir el orden del registro de particiones secundarias cuando hay varios registros de particiones secundarias que se muestran con el mismo start_timestamp en una partición específica. El token de partición, start_timestamp y record_sequence, identifican de forma única un registro de partición secundaria.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Muestra un conjunto de particiones secundarias y su información asociada. Esto incluye la string de token de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones superiores.

PostgreSQL

Campo Tipo Descripción
start_timestamp STRING Los registros de cambios de datos que muestran las particiones secundarias en este registro tienen una marca de tiempo de confirmación mayor o igual que start_timestamp. Cuando se consulta una partición secundaria, la consulta debe especificar el token de partición secundaria y un start_timestamp mayor o igual que child_partitions_token.start_timestamp. Todos los registros de particiones secundarias que muestra una partición tienen el mismo start_timestamp y la marca de tiempo siempre se encuentra entre el start_timestamp y la end_timestamp especificados de la consulta.
record_sequence STRING Un número de secuencia monótonamente creciente que se puede usar para definir el orden del registro de particiones secundarias cuando hay varios registros de particiones secundarias que se muestran con el mismo start_timestamp en una partición específica. El token de partición, start_timestamp y record_sequence, identifican de forma única un registro de partición secundaria.
child_partitions

[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Muestra un arreglo de particiones secundarias y su información asociada. Esto incluye la string de token de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones superiores.

El siguiente es un ejemplo de un registro de partición secundaria:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Cambia el flujo de trabajo de las consultas de transmisiones

Ejecuta consultas de flujos de cambios mediante la API de ExecuteStreamingSql, con una transacción de solo lectura de un solo uso y un límite de marca de tiempo sólido. La función de lectura de flujos de cambios te permite especificar los valores de start_timestamp y end_timestamp para el intervalo de tiempo de interés. Se puede acceder a todos los registros de cambios dentro del período de retención con el límite de marca de tiempo de solo lectura.

Todos los demás TransactionOptions no son válidos para las consultas de flujos de cambios. Además, si TransactionOptions.read_only.return_read_timestamp se configura como verdadero, se mostrará un valor especial de kint64max - 1 en el mensaje Transaction que describe la transacción, en lugar de una marca de tiempo de lectura válida. Este valor especial se debe descartar y no se debe usar para consultas posteriores.

Cada consulta de flujo de cambios puede mostrar cualquier cantidad de filas, cada una con un registro de cambios de datos, un registro de señal de monitoreo de funcionamiento o un registro de particiones secundarias. No es necesario establecer una fecha límite para la solicitud.

Ejemplo:

El flujo de trabajo de la consulta de transmisión comienza con la emisión de la primera consulta de transmisión de cambios mediante la especificación de partition_token como NULL. La consulta debe especificar la función de lectura para la transmisión de cambios, la marca de tiempo de interés de inicio y finalización, y el intervalo de la señal de monitoreo de funcionamiento. Cuando end_timestamp es NULL, la consulta sigue mostrando cambios de datos hasta que finaliza la partición.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Procesa los registros de datos de esta consulta hasta que se muestren los registros de partición secundarios. En el siguiente ejemplo, se muestran dos registros de partición secundarios y tres tokens de partición; luego, finaliza la consulta. Los registros de partición secundarios de una consulta específica siempre comparten el mismo start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Para procesar los cambios futuros después de 2022-05-01T09:00:01Z, crea tres consultas nuevas y ejecútalas en paralelo. Las tres consultas juntas muestran los cambios de datos futuros para el mismo rango de claves que cubre su superior. Siempre configura start_timestamp como start_timestamp en el mismo registro de partición secundaria y usa el mismo end_timestamp y el mismo intervalo de señal de monitoreo de funcionamiento para procesar los registros de manera coherente en todas las consultas.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Después de un tiempo, la consulta en child_token_2 finaliza después de mostrar otro registro de partición secundaria. Este registro indica que una partición nueva cubrirá los cambios futuros de child_token_2 y child_token_3 a partir del 2022-05-01T09:30:15Z. La consulta en child_token_3 mostrará el mismo registro, ya que ambos son particiones superiores de la child_token_4 nueva. Para garantizar un procesamiento ordenado estricto de los registros de datos de una clave en particular, la consulta en child_token_4 solo debe comenzar después de que hayan finalizado todos los superiores, que en este caso son child_token_2 y child_token_3. Solo debes crear una consulta para cada token de partición secundaria; el diseño del flujo de trabajo de consultas debe designar un superior para que espere y programe la consulta en child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Encuentra ejemplos de cómo controlar y analizar los registros de flujos de cambios en el conector de Dataflow SpannerIO de Apache Beam en GitHub.