En esta página, se describen los flujos de cambios en Spanner para bases de datos con dialecto Google SQL y bases de datos con dialecto PostgreSQL, incluidos los siguientes temas:
- El modelo de partición basado en divisiones
- El formato y el contenido de los registros del flujo de cambios
- La sintaxis de bajo nivel que se usa para consultar esos registros
- Ejemplo del flujo de trabajo de consulta
Usas la API de Spanner para consultar flujos de cambios directamente. Las aplicaciones que, en cambio, usan Dataflow para leer datos del flujo de cambios no necesitan trabajar directamente con el modelo de datos que se describe aquí.
Para obtener una guía introductoria más amplia sobre los flujos de cambios, consulta Descripción general de los flujos de cambios.
Cómo cambiar las particiones de flujo de cambios
Cuando se produce un cambio en una tabla que supervisa un flujo de cambios, Spanner escribe un registro de flujo de cambios correspondiente en la base de datos, de forma síncrona en la misma transacción que el cambio de datos. Esto significa que, si la transacción se realiza correctamente, Spanner también capturó y persistió el cambio de forma correcta. De forma interna, Spanner coloca en la misma ubicación el registro del flujo de cambios y el cambio de datos para que el mismo servidor los procese y minimice la sobrecarga de escritura.
Como parte de la DML en una división en particular, Spanner adjunta la operación de escritura a la división de datos de la secuencia de cambios correspondiente en la misma transacción. Debido a esta colocalización, los flujos de cambios no agregan coordinación adicional entre los recursos de publicación, lo que minimiza la sobrecarga de confirmación de transacciones.
Spanner se escala dividiendo y combinando datos de forma dinámica según la carga y el tamaño de la base de datos, y distribuyendo las divisiones entre los recursos de entrega.
Para permitir que las operaciones de lectura y escritura de los flujos de cambios se ajusten a escala, Spanner divide y combina el almacenamiento interno del flujo de cambios junto con los datos de la base de datos, lo que evita automáticamente los puntos calientes. Para admitir la lectura de registros de flujo de cambios casi en tiempo real a medida que se escalan las operaciones de escritura de la base de datos, la API de Spanner está diseñada para que se pueda consultar un flujo de cambios de forma simultánea con particiones de flujo de cambios. Las particiones de flujo de cambios se asignan a las divisiones de datos de flujo de cambios que contienen los registros de flujo de cambios. Las particiones de un flujo de cambios cambian de forma dinámica con el tiempo y se correlacionan con la forma en que Spanner divide y combina de forma dinámica los datos de la base de datos.
Una partición de transmisión de cambios contiene registros de un rango de claves inmutable para un período específico. Cualquier partición del flujo de cambios se puede dividir en una o más particiones del flujo de cambios, o bien combinarse con otras particiones del flujo de cambios. Cuando se producen estos eventos de división o combinación, se crean particiones secundarias para capturar los cambios de sus respectivos rangos de claves inmutables para el siguiente período. Además de los registros de cambios de datos, una consulta de flujo de cambios muestra registros de particiones secundarias para notificar a los lectores sobre las nuevas particiones de flujo de cambios que se deben consultar, así como registros de señales de actividad para indicar el progreso cuando no se realizaron operaciones de escritura recientemente.
Cuando consultas una partición de flujo de cambios en particular, los registros de cambios se muestran en orden de marca de tiempo de confirmación. Cada registro de cambios se muestra exactamente una vez. No se garantiza el orden de los registros de cambios en las particiones del flujo de cambios. Los registros de cambios de una clave primaria en particular se devuelven solo en una partición para un período determinado.
Debido al linaje de particiones principales y secundarias, para procesar los cambios de una clave en particular en el orden de la marca de tiempo de confirmación, los registros que se devuelven de las particiones secundarias solo deben procesarse después de que se hayan procesado los registros de todas las particiones principales.
Cambios en las funciones de lectura de flujo y la sintaxis de consulta
GoogleSQL
Para consultar flujos de cambios, usa la API de ExecuteStreamingSql
. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nombres de la función de lectura es READ_change_stream_name
.
Si suponemos que existe un flujo de cambios SingersNameStream
en la base de datos, la
sintaxis de consulta de GoogleSQL es la siguiente:
SELECT ChangeRecord
FROM READ_SingersNameStream (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
read_options
)
La función read acepta los siguientes argumentos:
Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
---|---|---|---|
start_timestamp |
TIMESTAMP |
Obligatorio | Especifica que se deben mostrar los registros con commit_timestamp mayor o igual que start_timestamp . El valor debe estar dentro del período de retención del flujo de cambios y debe ser inferior o igual a la hora actual, y superior o igual a la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
TIMESTAMP |
Opcional (predeterminado: NULL ) |
Especifica que se deben mostrar los registros con un commit_timestamp menor o igual que end_timestamp . El valor debe estar dentro del período de retención del flujo de cambios y ser mayor o igual que start_timestamp . La consulta finaliza después de mostrar todos los ChangeRecords hasta el end_timestamp o cuando finalizas la conexión. Si end_timestamp se establece en NULL
o no se especifica, la consulta continúa la ejecución hasta que se muestran todos los
ChangeRecords o hasta que finalizas la
conexión. |
partition_token |
STRING |
Opcional (predeterminado: NULL ) |
Especifica qué partición de flujo de cambios consultar en función del contenido de los registros de particiones secundarias. Si es NULL o no se especifica, significa que el lector consulta el flujo de cambios por primera vez y no obtuvo ningún token de partición específico para consultar. |
heartbeat_milliseconds |
INT64 |
Obligatorio | Determina la frecuencia con la que se muestra un ChangeRecord de señal de actividad en caso de que no haya transacciones confirmadas en esta partición.
El valor debe estar entre 1,000 (un segundo) y
300,000 (cinco minutos). |
read_options |
ARRAY |
Opcional (predeterminado: NULL ) |
Agrega opciones de lectura reservadas para usarlas en el futuro. El único valor permitido es NULL . |
Te recomendamos que crees un método auxiliar para compilar el texto de la consulta de la función de lectura y vincularle parámetros, como se muestra en el siguiente ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT ChangeRecord FROM READ_SingersNameStream" + "(" + " start_timestamp => @startTimestamp," + " end_timestamp => @endTimestamp," + " partition_token => @partitionToken," + " heartbeat_milliseconds => @heartbeatMillis" + ")"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("startTimestamp") .to(startTimestamp) .bind("endTimestamp") .to(endTimestamp) .bind("partitionToken") .to(partitionToken) .bind("heartbeatMillis") .to(heartbeatMillis) .build(); }
PostgreSQL
Para consultar flujos de cambios, usa la API de ExecuteStreamingSql
. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nombres de la función de lectura es spanner.read_json_change_stream_name
.
Si suponemos que existe un flujo de cambios SingersNameStream
en la base de datos, la sintaxis de consulta para PostgreSQL es la siguiente:
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
start_timestamp,
end_timestamp,
partition_token,
heartbeat_milliseconds,
null
)
La función read acepta los siguientes argumentos:
Nombre del argumento | Tipo | ¿Es obligatorio? | Descripción |
---|---|---|---|
start_timestamp |
timestamp with time zone |
Obligatorio | Especifica que se deben mostrar los registros de cambios con commit_timestamp
mayor o igual que start_timestamp . El valor debe estar dentro del período de retención del flujo de cambios y debe ser inferior o igual a la hora actual, y superior o igual a la marca de tiempo de la creación del flujo de cambios. |
end_timestamp |
timestamp with timezone |
Opcional (predeterminado: NULL ) |
Especifica que se deben mostrar los registros de cambios con commit_timestamp inferior o igual a end_timestamp . El valor debe estar dentro del período de retención del flujo de cambios y ser mayor o igual que start_timestamp .
La consulta finaliza después de mostrar todos los registros de cambios hasta el end_timestamp o hasta que finalices la conexión.
Si es NULL , la consulta continúa la ejecución hasta que se muestran todos los registros de cambios o hasta que finalizas la conexión. |
partition_token |
text |
Opcional (predeterminado: NULL ) |
Especifica qué partición de flujo de cambios consultar en función del contenido de los registros de particiones secundarias. Si es NULL o no se especifica, significa que el lector consulta el flujo de cambios por primera vez y no obtuvo ningún token de partición específico para consultar. |
heartbeat_milliseconds |
bigint |
Obligatorio | Determina con qué frecuencia se muestra un ChangeRecord de sondeo cuando no hay transacciones confirmadas en esta partición.
El valor debe estar entre 1,000 (un segundo) y
300,000 (cinco minutos). |
null |
null |
Obligatorio | Reservado para uso futuro |
Te recomendamos que crees un método auxiliar para compilar el texto de la función de lectura y vincularle parámetros, como se muestra en el siguiente ejemplo.
Java
private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE = "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\"" + "($1, $2, $3, $4, null)"; // Helper method to conveniently create change stream query texts and // bind parameters. public static Statement getChangeStreamQuery( String partitionToken, Timestamp startTimestamp, Timestamp endTimestamp, long heartbeatMillis) { return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE) .bind("p1") .to(startTimestamp) .bind("p2") .to(endTimestamp) .bind("p3") .to(partitionToken) .bind("p4") .to(heartbeatMillis) .build(); }
Cambia el formato de registro de las transmisiones
GoogleSQL
La función de lectura de flujos de cambios muestra una sola columna ChangeRecord
de tipo ARRAY<STRUCT<...>>
. En cada fila, este array siempre contiene un solo elemento.
Los elementos del array tienen el siguiente tipo:
STRUCT < data_change_record ARRAY<STRUCT<...>>, heartbeat_record ARRAY<STRUCT<...>>, child_partitions_record ARRAY<STRUCT<...>> >
Hay tres campos en este STRUCT
: data_change_record
, heartbeat_record
y child_partitions_record
, cada uno de tipo ARRAY<STRUCT<...>>
. En cualquier fila que devuelva la función de lectura del flujo de cambios, solo uno de estos tres campos contiene un valor; los otros dos están vacíos o son NULL
. Estos campos de array contienen, como máximo, un elemento.
En las siguientes secciones, se examina cada uno de estos tres tipos de registros.
PostgreSQL
La función de lectura de flujos de cambios muestra una sola columna ChangeRecord
de tipo JSON
con la siguiente estructura:
{
"data_change_record" : {},
"heartbeat_record" : {},
"child_partitions_record" : {}
}
Hay tres claves posibles en este objeto: data_change_record
, heartbeat_record
y child_partitions_record
, y el tipo de valor correspondiente es JSON
. En cualquier fila que devuelva la función de lectura del flujo de cambios, solo existe una de estas tres claves.
En las siguientes secciones, se examina cada uno de estos tres tipos de registros.
Registros de cambios de datos
Un registro de cambios de datos contiene un conjunto de cambios en una tabla con el mismo tipo de modificación (inserción, actualización o eliminación) confirmado en la misma marca de tiempo de confirmación en una partición de flujo de cambios para la misma transacción. Se pueden mostrar varios registros de cambios de datos para la misma transacción en varias particiones de flujo de cambios.
Todos los registros de cambios de datos tienen campos commit_timestamp
, server_transaction_id
y record_sequence
, que, en conjunto, determinan el orden en el flujo de cambios de un registro de flujo. Estos tres campos son suficientes para derivar el orden de los cambios y proporcionar coherencia externa.
Ten en cuenta que varias transacciones pueden tener la misma marca de tiempo de confirmación si
tocan datos que no se superponen. El campo server_transaction_id
ofrece la capacidad de distinguir qué conjunto de cambios (posiblemente en particiones de flujo de cambios) se emitió dentro de la misma transacción. Si lo vinculas con los campos record_sequence
y
number_of_records_in_transaction
, también puedes almacenar en búfer y ordenar
todos los registros de una transacción en particular.
Los campos de un registro de cambio de datos incluyen lo siguiente:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
commit_timestamp |
TIMESTAMP |
Indica la marca de tiempo en la que se confirmó el cambio. |
record_sequence |
STRING |
Indica el número de secuencia del registro dentro de la transacción.
Los números de secuencia son únicos y aumentan monótonamente (pero no necesariamente contiguos) dentro de una transacción. Ordena los registros del mismo server_transaction_id por record_sequence para reconstruir el orden de los cambios dentro de la transacción.
Spanner podría optimizar este orden para mejorar el rendimiento y es posible que no siempre coincida con el orden original que proporcionas. |
server_transaction_id |
STRING |
Proporciona una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo debe usarse en el contexto del procesamiento de registros de transmisión de cambios y no está correlacionado con el ID de transacción en la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOL |
Indica si este es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Es el nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser uno de los siguientes:
De forma predeterminada, es |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica el nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna como se define en el esquema (ordinal_position ). La primera columna de una tabla en el esquema tendría una posición ordinal de 1 . El tipo de columna puede estar anidado para las columnas de array. El formato coincide con la estructura de tipo que se describe en la referencia de la API de Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Describe los cambios que se realizaron, incluidos los valores de la clave primaria, los valores anteriores y los valores nuevos de las columnas modificadas o a las que se les hizo un seguimiento.
La disponibilidad y el contenido de los valores antiguos y nuevos dependen del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas que no son clave. |
mod_type |
STRING |
Describe el tipo de cambio. Es uno de los siguientes: INSERT ,
UPDATE o DELETE . |
number_of_records_in_transaction |
INT64 |
Indica la cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones del flujo de cambios. |
number_of_partitions_in_transaction |
INT64 |
Indica la cantidad de particiones que muestran registros de cambios de datos para esta transacción. |
transaction_tag |
STRING |
Indica la etiqueta de transacción asociada con esta transacción. |
is_system_transaction |
BOOL |
Indica si la transacción es del sistema. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
commit_timestamp |
STRING |
Indica la marca de tiempo en la que se confirmó el cambio. |
record_sequence |
STRING |
Indica el número de secuencia del registro dentro de la transacción.
Los números de secuencia son únicos y aumentan monótonamente (pero no necesariamente contiguos) dentro de una transacción. Ordena los registros del mismo server_transaction_id por record_sequence para reconstruir el orden de los cambios dentro de la transacción. |
server_transaction_id |
STRING |
Proporciona una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo debe usarse en el contexto del procesamiento de registros de transmisión de cambios y no está correlacionado con el ID de transacción en la API de Spanner. |
is_last_record_in_transaction_in_partition |
BOOLEAN |
Indica si este es el último registro de una transacción en la partición actual. |
table_name |
STRING |
Indica el nombre de la tabla afectada por el cambio. |
value_capture_type |
STRING |
Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio. El tipo de captura de valor puede ser uno de los siguientes:
De forma predeterminada, es |
column_types |
[ { "name": "STRING", "type": { "code": "STRING" }, "is_primary_key": BOOLEAN "ordinal_position": NUMBER }, ... ] |
Indica el nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna como se define en el esquema (ordinal_position ). La primera columna de una tabla en el esquema tendría una posición ordinal de 1 . El tipo de columna puede estar anidado para las columnas de array. El formato coincide con la estructura de tipo que se describe en la referencia de la API de Spanner.
|
mods |
[ { "keys": {"STRING" : "STRING"}, "new_values": { "STRING" : "VALUE-TYPE", [...] }, "old_values": { "STRING" : "VALUE-TYPE", [...] }, }, [...] ] |
Describe los cambios que se realizaron, incluidos los valores de la clave primaria, los valores anteriores y los valores nuevos de las columnas modificadas o a las que se les hizo un seguimiento. La disponibilidad y el contenido de los valores antiguos y nuevos dependen del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas que no son clave.
|
mod_type |
STRING |
Describe el tipo de cambio. Es uno de los siguientes: INSERT ,
UPDATE o DELETE . |
number_of_records_in_transaction |
INT64 |
Indica la cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones del flujo de cambios. |
number_of_partitions_in_transaction |
NUMBER |
Indica la cantidad de particiones que muestran registros de cambios de datos para esta transacción. |
transaction_tag |
STRING |
Indica la etiqueta de transacción asociada con esta transacción. |
is_system_transaction |
BOOLEAN |
Indica si la transacción es del sistema. |
Ejemplo de registro de cambios de datos
A continuación, se incluyen dos ejemplos de registros de cambios de datos. Describen una sola transacción en la que hay una transferencia entre dos cuentas. Las dos cuentas se encuentran en particiones de flujo de cambios separadas.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z",
"Balance": 1500
},
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
"record_sequence": "00000001",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id2"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 2000
},
"old_values": {
"LastUpdate": "2022-01-20T11:25:00.199915Z",
"Balance": 1500
},
},
...
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "OLD_AND_NEW_VALUES",
"number_of_records_in_transaction": 2,
"number_of_partitions_in_transaction": 2,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false,
}
El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valor NEW_VALUES
. Ten en cuenta que solo se propagan los valores nuevos.
Solo se modificó la columna LastUpdate
, por lo que solo se mostró esa columna.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z"
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valor NEW_ROW
. Solo se modificó la columna LastUpdate
, pero se muestran todas las columnas a las que se les hizo un seguimiento.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valor NEW_ROW_AND_OLD_VALUES
. Solo se modificó la columna LastUpdate
, pero se muestran todas las columnas a las que se les hizo un seguimiento. Este tipo de captura de valor captura el valor nuevo y el valor anterior de LastUpdate
.
"data_change_record": {
"commit_timestamp": "2022-09-27T12:30:00.123456Z",
// record_sequence is unique and monotonically increasing within a
// transaction, across all partitions.
"record_sequence": "00000000",
"server_transaction_id": "6329047911",
"is_last_record_in_transaction_in_partition": true,
"table_name": "AccountBalance",
"column_types": [
{
"name": "AccountId",
"type": {"code": "STRING"},
"is_primary_key": true,
"ordinal_position": 1
},
{
"name": "LastUpdate",
"type": {"code": "TIMESTAMP"},
"is_primary_key": false,
"ordinal_position": 2
},
{
"name": "Balance",
"type": {"code": "INT"},
"is_primary_key": false,
"ordinal_position": 3
}
],
"mods": [
{
"keys": {"AccountId": "Id1"},
"new_values": {
"LastUpdate": "2022-09-27T12:30:00.123456Z",
"Balance": 1000
},
"old_values": {
"LastUpdate": "2022-09-26T11:28:00.189413Z"
}
}
],
"mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
"value_capture_type": "NEW_ROW_AND_OLD_VALUES",
"number_of_records_in_transaction": 1,
"number_of_partitions_in_transaction": 1,
"transaction_tag": "app=banking,env=prod,action=update",
"is_system_transaction": false
}
Registros de latidos
Cuando se muestra un registro de heartbeat, indica que se devolvieron todos los cambios con commit_timestamp
inferior o igual a timestamp
del registro de heartbeat, y los registros de datos futuros en esta partición deben tener marcas de tiempo de confirmación más altas que las que muestra el registro de heartbeat. Los registros de señales de actividad se muestran cuando no hay cambios de datos escritos en una partición. Cuando hay cambios de datos escritos en la partición, se puede usar data_change_record.commit_timestamp
en lugar de heartbeat_record.timestamp
para indicar que el lector está avanzando en la lectura de la partición.
Puedes usar los registros de señales de actividad que se muestran en las particiones para sincronizar lectores en todas las particiones. Una vez que todos los lectores hayan recibido un heartbeat superior o igual a una marca de tiempo A
, o hayan recibido datos o registros de particiones secundarias superiores o iguales a la marca de tiempo A
, los lectores sabrán que recibieron todos los registros confirmados en esa marca de tiempo A
o antes, y pueden comenzar a procesar los registros almacenados en búfer, por ejemplo, ordenar los registros de particiones cruzadas por marca de tiempo y agruparlos por server_transaction_id
.
Un registro de heartbeat contiene solo un campo:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
timestamp |
TIMESTAMP |
Indica la marca de tiempo del registro de la señal de monitoreo de funcionamiento. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
timestamp |
STRING |
Indica la marca de tiempo del registro de la señal de monitoreo de funcionamiento. |
Ejemplo de registro de señal de monitoreo de funcionamiento
Ejemplo de un registro de heartbeat que indica que se devolvieron todos los registros con marcas de tiempo menores o iguales a la marca de tiempo de este registro:
heartbeat_record: {
"timestamp": "2022-09-27T12:35:00.312486Z"
}
Registros de particiones secundarias
Los registros de particiones secundarias muestran información sobre las particiones secundarias: sus tokens de partición, los tokens de sus particiones superiores y el start_timestamp
que representa la marca de tiempo más antigua para la que las particiones secundarias contienen registros de cambios. Los registros cuyas marcas de tiempo de confirmación son inmediatamente anteriores a child_partitions_record.start_timestamp
se muestran en la partición actual. Después de mostrar todos los
registros de particiones secundarias de esta partición, esta consulta muestra un
estado de éxito, lo que indica que se mostraron todos los registros de esta
partición.
Los campos de un registro de partición secundaria incluyen lo siguiente:
GoogleSQL
Campo | Tipo | Descripción |
---|---|---|
start_timestamp |
TIMESTAMP |
Indica que los registros de cambios de datos que se muestran de las particiones secundarias en este registro de partición secundaria tienen una marca de tiempo de confirmación superior o igual a start_timestamp . Cuando se consulta una partición
secundaria, la consulta debe especificar el token de partición secundaria y un
start_timestamp mayor o igual que
child_partitions_token.start_timestamp . Todos los registros de particiones secundarias que muestra una partición tienen el mismo start_timestamp y la marca de tiempo siempre se encuentra entre el start_timestamp y el end_timestamp especificados en la consulta. |
record_sequence |
STRING |
Indica un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden de los registros de partición secundarios cuando hay varios registros de partición secundarios que se devuelven con el mismo start_timestamp en una partición en particular. El token de partición, start_timestamp y record_sequence , identifican de forma única un registro de partición secundario.
|
child_partitions |
[ { "token" : "STRING", "parent_partition_tokens" : ["STRING"] } ] |
Devuelve un conjunto de particiones secundarias y su información asociada. Esto incluye la cadena de tokens de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones superiores. |
PostgreSQL
Campo | Tipo | Descripción |
---|---|---|
start_timestamp |
STRING |
Indica que los registros de cambios de datos que se muestran de las particiones secundarias en este registro de partición secundaria tienen una marca de tiempo de confirmación superior o igual a start_timestamp . Cuando se consulta una partición
secundaria, la consulta debe especificar el token de partición secundaria y un
start_timestamp mayor o igual que
child_partitions_token.start_timestamp . Todos los registros de particiones secundarias que muestra una partición tienen el mismo start_timestamp y la marca de tiempo siempre se encuentra entre el start_timestamp y el end_timestamp especificados en la consulta.
|
record_sequence |
STRING |
Indica un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden de los registros de partición secundarios cuando hay varios registros de partición secundarios que se devuelven con el mismo start_timestamp en una partición en particular. El token de partición, start_timestamp y record_sequence identifican de forma única un registro de partición secundario.
|
child_partitions |
[ { "token": "STRING", "parent_partition_tokens": ["STRING"], }, [...] ] |
Devuelve un array de particiones secundarias y su información asociada. Esto incluye la cadena de tokens de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones superiores. |
Ejemplo de registro de partición secundaria
El siguiente es un ejemplo de un registro de partición secundario:
child_partitions_record: {
"start_timestamp": "2022-09-27T12:40:00.562986Z",
"record_sequence": "00000001",
"child_partitions": [
{
"token": "child_token_1",
// To make sure changes for a key is processed in timestamp
// order, wait until the records returned from all parents
// have been processed.
"parent_partition_tokens": ["parent_token_1", "parent_token_2"]
}
],
}
Flujo de trabajo de consulta de flujos de cambios
Ejecuta consultas de flujo de cambios con la API de ExecuteStreamingSql
, con una transacción de solo lectura de un solo uso y un límite de marca de tiempo firme. La función de lectura de flujo de cambios te permite especificar start_timestamp
y end_timestamp
para el período de tiempo de interés. Se puede acceder a todos los registros de cambios dentro del período de retención con el límite de marca de tiempo de solo lectura.
Todos los demás TransactionOptions
no son válidos para las consultas de flujo de cambios. Además, si TransactionOptions.read_only.return_read_timestamp
se establece en true
, se muestra un valor especial de kint64max - 1
en el mensaje Transaction
que describe la transacción, en lugar de una marca de tiempo de lectura válida. Este valor especial se debe descartar y no se debe usar para ninguna consulta posterior.
Cada consulta del flujo de cambios puede mostrar cualquier cantidad de filas, cada una de las cuales contiene un registro de cambios de datos, un registro de señales de actividad o un registro de particiones secundarias. No es necesario establecer una fecha límite para la solicitud.
Ejemplo de flujo de trabajo de consulta de flujo de cambios
El flujo de trabajo de la consulta de transmisión comienza con la emisión de la primera consulta de flujo de cambios especificando partition_token
a NULL
. La consulta debe especificar
la función de lectura para el flujo de cambios, la marca de tiempo de inicio y finalización de interés,
y el intervalo de intervalo de tiempo. Cuando end_timestamp
es NULL
, la consulta sigue mostrando cambios de datos hasta que finaliza la partición.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:00Z",
end_timestamp => NULL,
partition_token => NULL,
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:00Z',
NULL,
NULL,
10000,
NULL
) ;
Procesa los registros de datos de esta consulta hasta que se muestren todos los registros de particiones secundarias. En el siguiente ejemplo, se muestran dos registros de particiones secundarias y tres tokens de partición, y luego se finaliza la consulta. Los registros de particiones secundarias de una consulta específica siempre comparten el mismo start_timestamp
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_1",
// Note parent tokens are null for child partitions returned
// from the initial change stream queries.
"parent_partition_tokens": [NULL]
}
{
"token": "child_token_2",
"parent_partition_tokens": [NULL]
}
],
}
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:00:01Z",
"record_sequence": 1000012390,
"child_partitions": [
{
"token": "child_token_3",
"parent_partition_tokens": [NULL]
}
],
}
Para procesar los cambios después de 2022-05-01T09:00:01Z
, crea tres consultas nuevas y
ejecútalas en paralelo. Cuando se usan juntas, las tres consultas muestran cambios de datos para el mismo rango de claves que cubre su elemento superior. Siempre establece el start_timestamp
en el
start_timestamp
en el mismo registro de partición secundaria y usa el mismo
end_timestamp
y el mismo intervalo de intervalo de tiempo para procesar los registros de manera coherente
en todas las consultas.
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_1",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_2",
heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
start_timestamp => "2022-05-01T09:00:01Z",
end_timestamp => NULL,
partition_token => "child_token_3",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_1',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_2',
10000,
NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:00:01Z',
NULL,
'child_token_3',
10000,
NULL
);
La consulta en child_token_2
finaliza después de mostrar otro registro de partición
secundaria. Este registro indica que una partición nueva abarca los cambios de child_token_2
y child_token_3
a partir de 2022-05-01T09:30:15Z
. La consulta en child_token_3
muestra el mismo registro exacto, ya que ambas son las particiones superiores del nuevo child_token_4
. Para garantizar un procesamiento ordenado y estricto de los registros de datos de una clave en particular, la consulta en child_token_4
debe comenzar después de que finalicen todos los elementos superiores. En este caso, los elementos superiores son child_token_2
y child_token_3
. Crea solo una consulta para cada token de partición secundario. El diseño del flujo de trabajo de consulta debe designar un elemento superior para que espere y programe la consulta en child_token_4
.
child_partitions_record: {
"record_type": "child_partitions",
"start_timestamp": "2022-05-01T09:30:15Z",
"record_sequence": 1000012389,
"child_partitions": [
{
"token": "child_token_4",
"parent_partition_tokens": [child_token_2, child_token_3],
}
],
}
GoogleSQL
SELECT ChangeRecord FROM READ_SingersNameStream(
start_timestamp => "2022-05-01T09:30:15Z",
end_timestamp => NULL,
partition_token => "child_token_4",
heartbeat_milliseconds => 10000
);
PostgreSQL
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
'2022-05-01T09:30:15Z',
NULL,
'child_token_4',
10000,
NULL
);
Encuentra ejemplos de cómo controlar y analizar registros de flujo de cambios en el conector de Apache Beam SpannerIO Dataflow en GitHub.