Cambia las particiones, los registros y las consultas de las transmisiones

En esta página, se describen en detalle los siguientes atributos de los flujos de cambios:

  • Su modelo de partición basado en divisiones
  • El formato y el contenido de los registros del flujo de cambios
  • La sintaxis de bajo nivel que se usa para consultar esos registros
  • Ejemplo del flujo de trabajo de consulta

La información de esta página es más relevante para usar la API de Spanner para consultar flujos de cambios directamente. Las aplicaciones que, en cambio, usan Dataflow para leer datos del flujo de cambios no necesitan trabajar directamente con el modelo de datos que se describe aquí.

Para obtener una guía introductoria más amplia sobre los flujos de cambios, consulta Flujos de cambios descripción general.

Cómo cambiar las particiones de flujo de cambios

Cuando se produce un cambio en una tabla que supervisa un flujo de cambios, Spanner escribe un registro de flujo de cambios correspondiente en la base de datos, de forma síncrona en la misma transacción que el cambio de datos. Esta garantiza que, si la transacción se realiza correctamente, Spanner también capturaron correctamente y conservaron el cambio. De forma interna, Spanner coloca en la misma ubicación el registro del flujo de cambios y el cambio de datos para que el mismo servidor los procese y minimice la sobrecarga de escritura.

Como parte de la DML en una división en particular, Spanner adjunta la operación de escritura a la división de datos de la secuencia de cambios correspondiente en la misma transacción. Debido a esta colocación, cambiar no agregan coordinación adicional entre los recursos de entrega, minimiza la sobrecarga de confirmación de la transacción.

imagen

Spanner se escala dividiendo y combinando datos de forma dinámica según la carga y el tamaño de la base de datos, y distribuyendo las divisiones entre los recursos de entrega.

Para permitir que las operaciones de lectura y escritura de los flujos de cambios se escalen, Spanner divide y combina el almacenamiento interno del flujo de cambios junto con los datos de la base de datos, lo que evita automáticamente los puntos calientes. Para admitir la lectura de registros de flujos de cambios en casi tiempo real a medida que se escalan las operaciones de escritura en la base de datos, la API de Spanner está diseña para que un flujo de cambios se consulte de forma simultánea con particiones de flujo de cambios. Las particiones de flujo de cambios se asignan a divisiones de datos de flujo de cambios que contienen los registros de flujo de cambios. Las particiones de un flujo de cambios cambian de forma dinámica con el tiempo y se correlacionan con la forma en que Spanner divide y combina de forma dinámica los datos de la base de datos.

Una partición de transmisión de cambios contiene registros de un rango de claves inmutable para un período específico. Cualquier partición de flujo de cambios se puede dividir en una o más particiones de flujo de cambios, o bien se puede combinar con otras particiones de flujos de cambios. Cuando ocurren estos eventos de división o combinación, se crean particiones secundarias para capturar los cambios de sus respectivos rangos de claves inmutables para el siguiente período. Además, a los registros de cambios de datos, una consulta de flujos de cambios devuelve los registros de particiones secundarios notificar a los lectores sobre las nuevas particiones del flujo de cambios que deben consultarse como registros de señal de monitoreo de funcionamiento para indicar el progreso hacia delante cuando no se han producido operaciones de escritura. recientemente.

Cuando consultas una partición de flujo de cambios en particular, los registros de cambios se muestran en orden de marca de tiempo de confirmación. Cada registro de cambios se devuelve una vez. En las particiones del flujo de cambios, no se garantiza el orden de los registros de cambios. Los registros de cambios de una clave primaria en particular se devuelven solo en una partición para un período determinado.

Debido al linaje de particiones principales y secundarias, para procesar los cambios de una clave en particular en el orden de la marca de tiempo de confirmación, los registros que se devuelven de las particiones secundarias solo deben procesarse después de que se hayan procesado los registros de todas las particiones principales.

Cambios en las funciones de lectura de flujo y la sintaxis de consulta

GoogleSQL

Puedes consultar flujos de cambios con el ExecuteStreamingSql API de gcloud. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nomenclatura de la función de lectura es READ_change_stream_name

Si suponemos que existe un flujo de cambios SingersNameStream en la base de datos, el la sintaxis de la consulta para GoogleSQL es la siguiente:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

La función de lectura acepta los siguientes argumentos:

Nombre del argumento Tipo ¿Es obligatorio? Descripción
start_timestamp TIMESTAMP Obligatorio Especifica que los registros con commit_timestamp mayor o igual que start_timestamp debería mostrarse. El valor debe estar dentro del período de retención del flujo de cambios y debe ser menor o igual que la hora actual, y mayor o igual que la marca de tiempo de la creación del flujo de cambios.
end_timestamp TIMESTAMP Opcional (predeterminado: NULL) Especifica que se registre con commit_timestamp menos. mayor o igual que end_timestamp debe se devuelvan. El valor debe estar dentro del período de retención del flujo de cambios y ser mayor o igual que start_timestamp. La consulta finaliza después de mostrar todos los ChangeRecords hasta end_timestamp o el usuario finalizará la conexión. Si es NULL o no especificado, la consulta se ejecuta hasta que se devuelvan todos los ChangeRecords o el usuario cancela la conexión.
partition_token STRING Opcional (predeterminado: NULL) Especifica qué partición de flujo de cambios consultar en función del contenido de los registros de particiones secundarias. Si es NULL o no se especifica, esto significa que lector consulta el flujo de cambios por primera vez y tiene no obtuvo tokens de partición específicos para consultar.
heartbeat_milliseconds INT64 Obligatorio Determina la frecuencia con la que se devuelve una señal de monitoreo de funcionamiento. en caso de que no haya transacciones confirmadas en esta partición. El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco minutos).
read_options ARRAY Opcional (predeterminado: NULL) Opciones de lectura adicionales reservadas para uso futuro. Actualmente, el único valor permitido es NULL.

Se recomienda crear un método conveniente para compilar el texto del leer la consulta de función y vincular parámetros a ella, como se muestra a continuación ejemplo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

PostgreSQL

Para consultar los flujos de cambios, usa la API de ExecuteStreamingSql. Spanner crea automáticamente una función de lectura especial junto con el flujo de cambios. La función de lectura proporciona acceso a los registros del flujo de cambios. La convención de nombres de la función de lectura es spanner.read_json_change_stream_name.

Si suponemos que existe un flujo de cambios SingersNameStream en la base de datos, la sintaxis de consulta para PostgreSQL es la siguiente:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

La función read acepta los siguientes argumentos:

Nombre del argumento Tipo ¿Es obligatorio? Descripción
start_timestamp timestamp with time zone Obligatorio Especifica que cambian los registros en los que commit_timestamp es mayor o igual que start_timestamp debería mostrarse. El valor debe estar dentro del período de retención del flujo de cambios y debe ser menor o igual que la hora actual, y mayor o igual que la marca de tiempo de la creación del flujo de cambios.
end_timestamp timestamp with timezone Opcional (predeterminado: NULL) Especifica que cambia los registros con commit_timestamp menor o igual que end_timestamp debe se devuelvan. El valor debe estar dentro del período de retención del flujo de cambios y ser mayor o igual que start_timestamp. La consulta finaliza después de mostrar todos los registros de cambios hasta end_timestamp o cuando el usuario finaliza la conexión. Si es NULL, la consulta se ejecuta hasta que se muestran todos los registros de cambios o el usuario finaliza la conexión.
partition_token text Opcional (predeterminado: NULL) Especifica qué partición de flujo de cambios consultar, según el contenido de particiones secundarias registros. Si es NULL o no se especifica, significa que el lector consulta el flujo de cambios por primera vez y no obtuvo ningún token de partición específico para consultar.
heartbeat_milliseconds bigint Obligatorio Determina la frecuencia con la que se mostrará un ChangeRecord de sondeo en caso de que no haya transacciones confirmadas en esta partición. El valor debe estar entre 1,000 (un segundo) y 300,000 (cinco minutos).
null null Obligatorio Reservado para uso futuro

Te recomendamos que crees un método conveniente para compilar el texto de la función de lectura y vincularle parámetros, como se muestra en el siguiente ejemplo.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Cambia el formato de registro de las transmisiones

GoogleSQL

La función de lectura de flujos de cambios muestra una sola columna ChangeRecord de tipo ARRAY<STRUCT<...>>. En cada fila, este array siempre contiene un solo elemento.

Los elementos del array tienen el siguiente tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Hay tres campos en esta estructura: data_change_record, heartbeat_record y child_partitions_record, cada uno de tipo ARRAY<STRUCT<...>>. En cualquier fila que devuelva la función de lectura del flujo de cambios, solo uno de estos tres campos contiene un valor; los otros dos están vacíos o son NULL. Estos campos de array contienen, como máximo, un elemento.

En las siguientes secciones, se examina cada uno de estos tres tipos de registros.

PostgreSQL

La función de lectura de flujos de cambios muestra una sola columna ChangeRecord de escribe JSON con la siguiente estructura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Hay tres claves posibles en este objeto: data_change_record, heartbeat_record y child_partitions_record, el valor correspondiente tipo es JSON. En cualquier fila que muestre la función de lectura del flujo de cambios, solo existe una de estas tres claves.

En las siguientes secciones, se examina cada uno de estos tres tipos de registros.

Registros de cambios de datos

Un registro de cambios de datos contiene un conjunto de cambios en una tabla con el mismo tipo de modificación (inserción, actualización o eliminación) confirmado en la misma marca de tiempo de confirmación en una partición de flujo de cambios para la misma transacción. Se pueden mostrar varios registros de cambios de datos para la misma transacción en varias particiones de flujo de cambios.

Todos los registros de cambios de datos tienen campos commit_timestamp, server_transaction_id y record_sequence, que, en conjunto, determinan el orden en el flujo de cambios de un registro de flujo. Estos tres campos son suficientes para derivar el orden de los cambios y proporcionar coherencia externa.

Ten en cuenta que varias transacciones pueden tener la misma marca de tiempo de confirmación si cuando tocan datos que no se superponen. El campo server_transaction_id ofrece la capacidad de distinguir qué conjunto de cambios (posiblemente en particiones de flujo de cambios) se emitió dentro de la misma transacción. Si lo vinculas con los campos record_sequence y number_of_records_in_transaction, también puedes almacenar en búfer y ordenar todos los registros de una transacción en particular.

Los campos de un registro de cambios de datos incluyen los siguientes:

GoogleSQL

Campo Tipo Descripción
commit_timestamp TIMESTAMP La marca de tiempo en la que se confirmó el cambio.
record_sequence STRING Es el número de secuencia del registro dentro de la transacción. Se garantiza que los números de secuencia sean únicos y aumenten monótonamente (pero no necesariamente contiguos) dentro de una transacción. Ordena los registros del mismo server_transaction_id por record_sequence para reconstruir el orden de los cambios dentro de la transacción. Spanner puede optimizar este orden para mejorar el rendimiento y es posible que no siempre coincida con el orden original que proporcionan los usuarios.
server_transaction_id STRING Es una cadena única a nivel global que representa la transacción en la que se confirmó el cambio. El valor solo debe ser se usan en el contexto del procesamiento de registros de flujos de cambios y no se correlacionado con el ID de transacción en la API de Spanner.
is_last_record_in_transaction_in_partition BOOL Indica si este es el último registro de una transacción en la partición actual.
table_name STRING Es el nombre de la tabla afectada por el cambio.
value_capture_type STRING

Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio.

El tipo de captura de valor puede ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES". De forma predeterminada, es "OLD_AND_NEW_VALUES". Para obtener más información, consulta los tipos de captura de valores.

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
El nombre de la columna, el tipo de columna, si es una clave primaria y la posición de la columna como se define en el esquema ("ordinal_position"). La primera columna de una tabla en el esquema tendría una posición ordinal de "1". El tipo de columna puede estar anidado para las columnas de array. El formato coincide con la estructura de tipo que se describe en la referencia de la API de Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Describe los cambios que se realizaron, incluida la clave primaria anteriores, así como los valores nuevos de las columnas modificadas o a las que se les hace seguimiento. La disponibilidad y el contenido de los valores anteriores y nuevos dependerán del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas sin clave.
mod_type STRING Describe el tipo de cambio. Es uno de los siguientes: INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Es la cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones de transmisión de cambios.
number_of_partitions_in_transaction INT64 Es la cantidad de particiones que mostrarán registros de cambios de datos para esta transacción.
transaction_tag STRING Etiqueta de transacción asociada a esta transacción.
is_system_transaction BOOL Indica si la transacción es del sistema.

PostgreSQL

Campo Tipo Descripción
commit_timestamp STRING La marca de tiempo en la que se confirmó el cambio.
record_sequence STRING Es el número de secuencia del registro dentro de la transacción. Se garantiza que los números de secuencia sean únicos y aumenten monótonamente (pero no necesariamente contiguos) dentro de una transacción. Ordenar los registros por la misma `server_transaction_id` por `record_seq` para reconstruir el orden de los cambios dentro del transacción.
server_transaction_id STRING Una cadena única a nivel global que representa la transacción en en la que se confirmó el cambio. El valor solo debe ser se usan en el contexto del procesamiento de registros de flujos de cambios y no se correlacionado con el ID de transacción en la API de Spanner
is_last_record_in_transaction_in_partition BOOLEAN Indica si este es el último registro de una transacción en la partición actual.
table_name STRING Es el nombre de la tabla afectada por el cambio.
value_capture_type STRING

Describe el tipo de captura de valor que se especificó en la configuración del flujo de cambios cuando se capturó este cambio.

El tipo de captura de valor puede ser "OLD_AND_NEW_VALUES", "NEW_ROW", "NEW_VALUES" o "NEW_ROW_AND_OLD_VALUES" De forma predeterminada, es "OLD_AND_NEW_VALUES". Para obtener más información, consulta los tipos de captura de valores.

column_types
[
  {
      "name": <STRING>,
      "type": {
        "code": <STRING>
      },
      "is_primary_key": <BOOLEAN>,
      "ordinal_position": <NUMBER>
    },
    ...
]
El nombre de la columna, el tipo de columna si es una clave primaria, y la posición de la columna como se define en el esquema (“ordinal_position”). La primera columna de una tabla en el esquema tendrían una posición ordinal "1". El tipo de columna pueden estar anidadas en columnas de array. El formato debe coincidir con la estructura de tipos que se describe en la referencia de la API de Spanner.
mods
[
  {
    "keys": {<STRING> : <STRING>},
    "new_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
    "old_values": {
      <STRING> : <VALUE-TYPE>,
      [...]
    },
  },
  [...]
]
Describe los cambios que se realizaron, incluidos los valores de la clave principal, los valores anteriores y los valores nuevos de las columnas modificadas o a las que se les hizo un seguimiento. La disponibilidad y el contenido de los valores antiguos y nuevos dependerán del value_capture_type configurado. Los campos new_values y old_values solo contienen las columnas que no son clave.
mod_type STRING Describe el tipo de cambio. Es uno de los siguientes: INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Es la cantidad de registros de cambios de datos que forman parte de esta transacción en todas las particiones de transmisión de cambios.
number_of_partitions_in_transaction NUMBER La cantidad de particiones que devolverán registros de cambios de datos para esta transacción.
transaction_tag STRING Etiqueta de transacción asociada a esta transacción.
is_system_transaction BOOLEAN Indica si la transacción es una transacción del sistema.

A continuación, se incluyen dos ejemplos de registros de cambios de datos. Describen una sola transacción en la que hay una transferencia entre dos cuentas. Ten en cuenta que las dos cuentas están en flujo de cambios independiente y particiones.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

El siguiente registro de cambios de datos es un ejemplo de un registro con el valor tipo de captura "NEW_VALUES". Ten en cuenta que solo se propagan los valores nuevos. Solo se modificó la columna "LastUpdate", por lo que solo esa columna fue devuelto.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valor "NEW_ROW". Solo el "LastUpdate" se modificó la columna, pero se muestran todas las columnas con seguimiento.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

El siguiente registro de cambio de datos es un ejemplo de un registro con el tipo de captura de valor "NEW_ROW_AND_OLD_VALUES". Solo se modificó la columna "LastUpdate", pero se muestran todas las columnas de seguimiento. Este tipo de captura de valores captura el valor nuevo y el valor anterior de LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Registros de latidos

Cuando se muestra un registro de estado, se indica que se devolvieron todos los cambios con commit_timestamp inferior o igual a timestamp del registro de estado, y los registros de datos futuros en esta partición deben tener marcas de tiempo de confirmación más altas que las que muestra el registro de estado. Los registros de señal de monitoreo de funcionamiento se muestran cuando no hay datos los cambios escritos en una partición. Cuando hay cambios de datos escritos en la partición, se puede usar data_change_record.commit_timestamp en lugar de heartbeat_record.timestamp para indicar que el lector está avanzando en la lectura de la partición.

Puedes usar los registros de señal de monitoreo de funcionamiento que se muestran en particiones para sincronizar lectores de todas las particiones. Una vez que todos los lectores hayan recibido un señal de monitoreo de funcionamiento superior o igual a alguna marca de tiempo A, o bien que recibieron datos o elementos secundarios registros de partición mayores o iguales a la marca de tiempo A, los lectores sabrán que recibieron todos los registros confirmados antes o después de esa marca de tiempo A y pueden comenzar procesar los registros almacenados en búfer, por ejemplo, ordenar la partición cruzada registros por marca de tiempo y agrupándolos por server_transaction_id.

Un registro de señal de monitoreo de funcionamiento contiene solo un campo:

GoogleSQL

Campo Tipo Descripción
timestamp TIMESTAMP La marca de tiempo del registro de la señal de monitoreo de funcionamiento.

PostgreSQL

Campo Tipo Descripción
timestamp STRING La marca de tiempo del registro de la señal de monitoreo de funcionamiento.

Un ejemplo de registro de señal de monitoreo de funcionamiento, en el que se comunica que todos los registros con marcas de tiempo menores o iguales que la marca de tiempo de este registro.

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Registros de particiones secundarias

Un registro de particiones secundarias devuelve información sobre las particiones secundarias: sus tokens de partición, los tokens de sus particiones superiores y los start_timestamp, que representa la marca de tiempo más antigua que el elemento secundario particiones contienen registros de cambios. Registros cuyas marcas de tiempo de confirmación inmediatamente anteriores a la child_partitions_record.start_timestamp se que se devuelven en la partición actual. Después de mostrar todos los secundarios para esta partición, esta consulta devolverá un estado de éxito que indica que todos los registros se devolvieron para este por cada partición.

Los campos de un registro de particiones secundarias incluyen los siguientes:

GoogleSQL

Campo Tipo Descripción
start_timestamp TIMESTAMP Registros de cambios de datos que muestra el elemento secundario las particiones en este registro de partición secundario tienen una marca de tiempo de confirmación mayor o igual que start_timestamp. Cuando se consulta una partición secundaria, la consulta debe especifica el token de partición secundario y un start_timestamp mayor o igual que child_partitions_token.start_timestamp Todos los registros de particiones secundarias que muestra una partición tienen el mismo start_timestamp, y la marca de tiempo siempre se encuentra entre el start_timestamp y el end_timestamp especificados en la consulta.
record_sequence STRING Es un número de secuencia que aumenta de forma monótona y que se puede usar para definir el orden del registro de particiones secundarias cuando hay varios registros de particiones secundarias que se muestran con la misma marca de tiempo de inicio en una partición en particular. El token de partición, start_timestamp y record_sequence, identifican de forma única un registro de particiones secundarias.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Muestra un conjunto de particiones secundarias y su información asociada. Esto incluye la cadena de tokens de partición que se usa para identificar la partición secundaria en las consultas, así como los tokens de sus particiones superiores.

PostgreSQL

Campo Tipo Descripción
start_timestamp STRING Registros de cambios de datos que muestra el elemento secundario las particiones en este registro de particiones secundarias tienen una marca de tiempo de confirmación mayor o igual que start_timestamp. Cuando se consulta una partición secundaria, la consulta debe especificar el token de partición secundaria y un start_timestamp mayor o igual que child_partitions_token.start_timestamp. Todas las particiones secundarias los registros que devuelve una partición tienen el mismo start_timestamp y la marca de tiempo siempre se encuentra entre la start_timestamp especificada de la consulta y end_timestamp
record_sequence STRING Es un número de secuencia que aumenta de forma monótona y se puede usar para definir el orden del registro de particiones secundarias cuando hay varios registros de particiones secundarias que se muestran con la misma marca de tiempo de inicio en una partición en particular. El token de partición, start_timestamp y record_sequence, identifican de forma única un registro de particiones secundarias.
child_partitions
[
  {
    "token": <STRING>,
    "parent_partition_tokens": [<STRING>],
  }, [...]
]
Devuelve un array de particiones secundarias y su información asociada. Esto incluye la cadena del token de partición que se usa para identificar al elemento secundario por cada partición en las consultas, así como los tokens de su y particiones.

El siguiente es un ejemplo de un registro de partición secundario:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Flujo de trabajo de consulta de transmisiones de cambios

Ejecutar consultas de flujos de cambios con el API de ExecuteStreamingSql, con una función de un solo uso solo lectura de transacciones y una un límite de marca de tiempo estricto La función de lectura de flujo de cambios te permite especificar start_timestamp y end_timestamp para el período de tiempo de interés. Todos los registros de cambios dentro del período de retención son accesibles con la estrategia límite de marca de tiempo.

Todos los demás TransactionOptions no son válidos para las consultas de flujo de cambios. Además, si TransactionOptions.read_only.return_read_timestamp se establece como verdadero, se mostrará un valor especial de kint64max - 1 en el mensaje Transaction que describe la transacción, en lugar de una marca de tiempo de lectura válida. Este valor especial se debe descartar y no se debe usar para ninguna consulta posterior.

Cada consulta de flujo de cambios puede mostrar cualquier cantidad de filas, cada una de las cuales contiene un registro de cambio de datos, un registro de verificación de estado o un registro de particiones secundarias. No es necesario establecer una fecha límite para la solicitud.

Ejemplo:

El flujo de trabajo de consulta de transmisión comienza con la emisión de la primera consulta de flujo de cambios especificando partition_token a NULL. La consulta debe especificar la función de lectura para el flujo de cambios, la marca de tiempo de inicio y finalización de interés, y el intervalo de intervalo de tiempo de actividad. Cuando end_timestamp es NULL, la consulta sigue mostrando cambios de datos hasta que finaliza la partición.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Procesar registros de datos de esta consulta hasta que los registros de partición secundaria se que se devuelven. En el siguiente ejemplo, dos registros de particiones secundarios y tres registros los tokens y, luego, finaliza la consulta. Registros de partición secundaria de un esta consulta específica siempre comparte el mismo start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Para procesar cambios futuros después de 2022-05-01T09:00:01Z, crea tres consultas nuevas y ejecútalas en paralelo. Las tres consultas juntas muestran los cambios de datos futuros para el mismo rango de claves que cubre su elemento superior. Siempre establece start_timestamp al start_timestamp en el mismo registro de partición secundaria y usa el mismo end_timestamp y el mismo intervalo de señal de monitoreo de funcionamiento para procesar los registros de forma coherente en todas las consultas.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

Después de un tiempo, la consulta en child_token_2 finaliza después de mostrar otra. registro de partición secundario, indica que se creará una partición nueva que cubre cambios futuros tanto para child_token_2 como para child_token_3 a partir de 2022-05-01T09:30:15Z La consulta devolverá exactamente el mismo registro el child_token_3, ya que ambas son las particiones superiores del elemento child_token_4 nuevo Para garantizar un procesamiento estricto y ordenado de los registros de datos para una clave en particular, La consulta en child_token_4 solo debe comenzar después de que hayan terminado todos los elementos superiores. que en este caso son child_token_2 y child_token_3. Solo crea una consulta para cada token de partición secundario. El diseño del flujo de trabajo de la consulta debe designar un elemento superior para esperar y programar la consulta en child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Encuentra ejemplos de cómo controlar y analizar registros de flujo de cambios en el conector de Apache Beam SpannerIO para Dataflow en GitHub.