Modifier les partitions, les enregistrements et les requêtes de flux

Cette page décrit en détail les attributs suivants des flux de modifications:

  • Son modèle de partitionnement basé sur le partage
  • Format et contenu des enregistrements de flux de modifications
  • Syntaxe de bas niveau utilisée pour interroger ces enregistrements
  • Exemple de workflow de requête

Les informations de cette page sont les plus pertinentes pour utiliser l'API Spanner pour interroger directement des flux de modification. Les applications qui utilisent Dataflow pour lire les données de flux de modifications n'ont pas besoin de travailler directement avec le modèle de données décrit ici.

Pour en savoir plus sur la modification des flux, consultez la section Modifier les flux.

Modifier les partitions de flux

Lorsqu'une modification se produit sur une table surveillée par un flux de modifications, Cloud Spanner écrit un enregistrement de flux de modification correspondant dans la base de données de manière synchrone dans la même transaction que la modification des données. Cela garantit que si la transaction aboutit, Spanner a bien réussi à enregistrer et à conserver la modification. En interne, Spanner colocalise l'enregistrement de flux de modification et la modification des données afin qu'elles soient traitées par le même serveur afin de minimiser les coûts d'écriture.

Dans le cadre du LMD pour une répartition donnée, Spanner ajoute l'écriture à la répartition des données du flux de modifications correspondante dans la même transaction. En raison de cette colocation, les flux de modification n'ajoutent pas de coordination supplémentaire entre les ressources de diffusion, ce qui réduit les coûts de commit de transaction.

image

Spanner évolue en divisant et fusionnant dynamiquement les données en fonction de la charge et de la taille de la base de données, et en répartissant les répartitions entre les ressources de diffusion.

Pour permettre les flux de modifications en écriture et en lecture à l'échelle, Spanner divise et fusionne le stockage interne des flux de modifications avec les données de la base de données, en évitant automatiquement les hotspots. Pour permettre la lecture des enregistrements de flux de modifications quasiment en temps réel à mesure que les données de la base de données évoluent, l'API Spanner est conçue pour interroger un flux de modifications en même temps que les partitions de flux de modification. Modifier la carte des partitions de flux pour modifier les répartitions des données de flux contenant les enregistrements de flux de modifications Les partitions d'un flux de modification changent de façon dynamique au fil du temps. Elles sont corrélées à la manière dont Spanner divise et fusionne les données de la base de données de manière dynamique.

Une partition de flux de modification contient des enregistrements pour une plage de clés immuable pour une période spécifique. Toute partition de flux de modification peut être scindée en une ou plusieurs partitions de flux de modifications, ou fusionnée avec d'autres partitions de flux de modification. Lorsque ces événements de division ou de fusion se produisent, des partitions enfants sont créées pour capturer les modifications apportées à leurs plages de clés immuables respectives pour la prochaine période. En plus des enregistrements de modification de données, une requête de flux de modification renvoie des enregistrements de partition enfant pour avertir les lecteurs des nouvelles partitions de flux de modification qui doivent être interrogées, ainsi que des enregistrements de pulsation pour indiquer la progression avant qu'aucune écriture ne se soit récemment effectuée.

Lors de l'interrogation d'une partition de flux de modification spécifique, les enregistrements de modifications sont renvoyés dans l'ordre d'horodatage de commit. Chaque enregistrement de modification est renvoyé une seule fois. Dans les partitions de flux de modifications, il n'existe pas d'ordre garanti des enregistrements de modifications. Les enregistrements de modifications d'une clé primaire spécifique ne sont renvoyés que sur une partition pour une période donnée.

En raison de la traçabilité des partitions parent-enfant, pour que les modifications apportées à une clé particulière dans l'ordre d'horodatage de commit soient traitées, les enregistrements renvoyés par les partitions enfants ne doivent être traités qu'après le traitement des enregistrements de toutes les partitions parentes.

Modifier la syntaxe d'une requête de flux

Les flux de modifications sont interrogés avec l'API ExecuteStreamingSql. Une fonction de valeur table (TVF) spéciale est automatiquement créée avec le flux de modifications. Il permet d'accéder aux enregistrements du flux de modifications. La convention de nommage TVF est READ_change_stream_name.

En supposant qu'un flux de modifications SingersNameStream existe dans la base de données, la syntaxe de la requête est la suivante:

SELECT ChangeRecord
    FROM READ_SingersNameStream (
        start_timestamp,
        end_timestamp,
        partition_token,
        heartbeat_milliseconds
    )

La fonction accepte les arguments suivants:

Nom de l'argument Type Obligatoire ? Description
start_timestamp TIMESTAMP Requis Indique que les enregistrements dont la valeur commit_timestamp est supérieure ou égale à start_timestamp doivent être renvoyés. La valeur doit se situer dans la période de conservation du flux de modifications, et être inférieure ou égale à l'heure actuelle et supérieure ou égale à l'horodatage de la création du flux de modifications.
end_timestamp TIMESTAMP Facultatif (par défaut: NULL) Indique que les enregistrements dont la valeur commit_timestamp est inférieure ou égale à end_timestamp doivent être renvoyés. La valeur doit se situer dans la durée de conservation du changement et être supérieure ou égale à start_timestamp. La requête se terminera une fois que vous aurez renvoyé tous les enregistrements ChangeRecords jusqu'à end_timestamp ou renvoyé un ensemble d'enregistrements de partition enfant. Si NULL ou n'est pas spécifié, la requête s'exécute jusqu'à ce que la partition actuelle soit terminée et que tous les enregistrements ChangeRecords avec leurs champs child_partition_record définis soient renvoyés. Spécifier NULL pour end_timestamp indique de toujours lire les dernières modifications en temps réel.
partition_token STRING Facultatif (par défaut : NULL) Spécifie la modification de la partition de flux à interroger, en fonction du contenu des enregistrements de partitions enfants. Si la valeur est NULL ou n'est pas spécifiée, cela signifie que le lecteur interroge le flux de modifications pour la première fois et qu'il n'a pas obtenu de jetons de partition spécifiques à interroger.
heartbeat_milliseconds INT64 Requis Détermine la fréquence à laquelle la valeur ChangeRecord de pulsation sera affichée en cas d'absence de transaction dans cette partition. La valeur doit être comprise entre 1000 (une seconde) et 300000 (cinq minutes).

Nous vous recommandons de créer une méthode pratique pour créer le texte de la requête TVF et des paramètres de liaison, comme illustré dans l'exemple suivant.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT ChangeRecord FROM READ_SingersNameStream"
            + "("
            + "   start_timestamp => @startTimestamp,"
            + "   end_timestamp => @endTimestamp,"
            + "   partition_token => @partitionToken,"
            + "   heartbeat_milliseconds => @heartbeatMillis"
            + ")";

// Helper method to conveniently create change stream query texts and bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {
    return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("startTimestamp")
                    .to(startTimestamp)
                    .bind("endTimestamp")
                    .to(endTimestamp)
                    .bind("partitionToken")
                    .to(partitionToken)
                    .bind("heartbeatMillis")
                    .to(heartbeatMillis)
                    .build();
}

Modifier le format d'enregistrement du flux

La TVF du flux de modifications renvoie une seule colonne ChangeRecord de type ARRAY<STRUCT<...>>. Dans chaque ligne, ce tableau contient toujours un seul élément.

Les éléments du tableau possèdent le type suivant:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Cette structure comporte trois champs : data_change_record, heartbeat_record et child_partitions_record, de type ARRAY<STRUCT<...>>. Dans une ligne renvoyée par le flux TVF des flux de modifications, seul l'un de ces trois champs contient une valeur. Les deux autres sont vides ou NULL. Ces champs de tableau ne contiennent pas plus d'un élément.

Les sections suivantes analysent chacun de ces trois types d'enregistrements.

Enregistrements des modifications des données

Un enregistrement de modifications des données contient un ensemble de modifications apportées à une table dont le type de modification (insertion, mise à jour ou suppression) est validé avec le même horodatage de commit dans une partition de flux de modifications pour la même transaction. Plusieurs enregistrements de modifications peuvent être renvoyés pour la même transaction sur plusieurs partitions de flux de modifications.

Tous les enregistrements de modifications comportent des champs commit_timestamp, server_transaction_id et record_sequence, qui déterminent l'ordre dans le flux de modifications d'un enregistrement de flux. Ces trois champs suffisent pour déterminer l'ordre des modifications et assurer la cohérence externe.

Notez que plusieurs transactions peuvent avoir le même horodatage de commit si elles touchent des données qui ne se chevauchent pas. Le champ server_transaction_id permet de distinguer l'ensemble de modifications (peut-être entre différentes partitions de flux de modifications) qui a été émis dans la même transaction. L'association aux champs record_sequence et number_of_records_in_transaction vous permet également de mettre en mémoire tampon et de classer tous les enregistrements d'une transaction particulière.

Les champs d'un enregistrement de modification de données incluent les éléments suivants:

Champ Type Description
commit_timestamp TIMESTAMP Horodatage de la validation de la modification.
record_sequence STRING Numéro de séquence de l'enregistrement dans la transaction. Les numéros de séquence sont garantis de façon unique et croissante (mais pas contigus) au cours d'une transaction. Triez les enregistrements de la même valeur "server_transaction_id" par "record_séquence" pour recréer l'ordre des modifications dans la transaction.
server_transaction_id STRING Chaîne unique qui représente la transaction dans laquelle la modification a été validée. Cette valeur ne doit être utilisée que dans le contexte du traitement des enregistrements de flux de modification, et n'est pas liée à l'ID de transaction dans l'API Spanner, par exemple "TransactionSelector.id". Ces deux éléments identifient de manière unique une transaction par rapport à d'autres valeurs dans le même contexte (par exemple, "data_change_records" ou "API Spanner").
is_last_record_in_transaction_in_partition BOOL Indique s'il s'agit du dernier enregistrement d'une transaction de la partition actuelle.
table_name STRING Nom de la table concernée par la modification.
value_capture_type STRING

Décrit le type de capture de valeur spécifié dans la configuration du flux de modification lorsque cette modification a été capturée.

Actuellement, utilisez toujours "OLD_AND_NEW_VALUES".

column_types ARRAY<STRUCT<
name STRING,
 type JSON,
 is_primary_key BOOL,
 ordinal_position INT64
>>
Nom de la colonne, type de colonne, s'il s'agit d'une clé primaire et position de la colonne telle que définie dans le schéma ("ordinal_position"). La première colonne d'une table du schéma possède une position ordinale "1". Le type de colonne peut être imbriqué pour les colonnes de tableau. Le format correspond à la structure de type décrite dans la documentation de référence de l'API Spanner.
mods ARRAY<STRUCT<
keys JSON,
 new_values JSON,
 old_values JSON
>>
Décrit les modifications effectuées, y compris les clés-valeurs principales, et les anciennes et nouvelles valeurs des colonnes modifiées si le flux de modifications est configuré avec "value_capture_type". Les champs new_values et old_values ne contiennent que les colonnes non clés.
mod_type STRING Décrit le type de modification. Un objet INSERT, UPDATE ou DELETE.
number_of_records_in_transaction INT64 Nombre d'enregistrements de modifications de données inclus dans cette transaction sur toutes les partitions de flux de modification.
number_of_partitions_in_transaction INT64 Nombre de partitions qui renvoient les enregistrements de modification de données pour cette transaction.

Voici deux exemples d'enregistrements de modifications de données : Elles décrivent une seule transaction dans laquelle un transfert entre deux comptes est effectué. Notez que les deux comptes figurent dans des partitions de flux de modifications distinctes.

data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
         "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
}
data_change_record: {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
}

Battements

Lorsqu'un enregistrement de pulsation est renvoyé, cela signifie que toutes les modifications pour lesquelles commit_timestamp est inférieur ou égal à l'enregistrement de battement de cœur (timestamp) ont été renvoyées. Les futurs enregistrements de données de cette partition doivent avoir des horodatages de commit supérieurs à ceux renvoyés par l'enregistrement de pulsation. Les enregistrements de pulsation sont renvoyés lorsqu'aucune modification de données n'est écrite dans une partition. Lorsque des modifications sont apportées aux données de la partition, data_change_record.commit_timestamp peut être utilisé à la place de heartbeat_record.timestamp pour indiquer que le lecteur progresse dans la lecture de la partition.

Vous pouvez utiliser des enregistrements de pulsation renvoyés sur des partitions pour synchroniser les lecteurs sur toutes les partitions. Une fois que tous les lecteurs ont reçu un battement de cœur supérieur ou égal à un horodatage A, ou des données ou des enregistrements de partition enfants supérieurs ou égaux à l'horodatage A, ils savent qu'ils ont reçu tous les enregistrements validés dans cet horodatage ou avant celui-ci, et peuvent commencer à traiter les enregistrements mis en mémoire tampon (par exemple, en triant les enregistrements multi-partitions par horodatage et en les regroupant par server_transaction_id).

Un enregistrement de pulsation ne contient qu'un seul champ:

Champ Type Description
timestamp TIMESTAMP Horodatage de l'enregistrement cardiaque.

Exemple d'enregistrement de pulsation indiquant que tous les enregistrements dont les horodatages sont inférieurs ou égaux à cet horodatage:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Enregistrements des partitions enfants

Un enregistrement de partitions enfants renvoie des informations sur les partitions enfants: leurs jetons de partition, les jetons de leurs partitions parentes et la propriété start_timestamp qui représente le premier horodatage pour lequel les partitions enfants contiennent des enregistrements de modifications. Les enregistrements dont les horodatages de commit sont immédiats avant le child_partitions_record.start_timestamp sont renvoyés dans la partition actuelle. Après avoir renvoyé tous les enregistrements de partitions enfants de cette partition, cette requête renvoie un état de réussite, indiquant que tous les enregistrements de cette partition ont été renvoyés.

Les champs d'un enregistrement de partitions enfants incluent les éléments suivants:

Champ Type Description
start_timestamp TIMESTAMP Les enregistrements de modifications renvoyés par les partitions enfants de cet enregistrement auront un horodatage de commit supérieur ou égal à start_timestamp. Lors de l'interrogation d'une partition enfant, la requête doit spécifier le jeton de partition enfant et un start_timestamp supérieur ou égal à child_partitions_token.start_timestamp. Tous les enregistrements de partitions enfants renvoyés par une partition ont le même start_timestamp, et l'horodatage est toujours compris entre les valeurs start_timestamp et end_timestamp spécifiées par la requête.
record_sequence STRING Nombre séquentiel monotonique pouvant être utilisé pour définir l'ordre des enregistrements de partitions enfants lorsque plusieurs enregistrements de partitions enfants sont renvoyés avec le même start_timestamp dans une partition particulière. Le jeton de partition, start_timestamp et record_sequence, identifient de manière unique un enregistrement de partitions enfant.
child_partitions ARRAY<STRUCT<
token STRING,
parent_partition_tokens
ARRAY<STRING>
>>
Renvoie un ensemble de partitions enfants et les informations qui leur sont associées. Cela inclut la chaîne de jeton de partition permettant d'identifier la partition enfant dans les requêtes, ainsi que les jetons de ses partitions parentes.

Exemple d'enregistrement de partitions enfants:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"],
    }
  ],
}

Workflow de requête de modification des flux

Exécutez des requêtes de flux de modification à l'aide de l'API ExecuteStreamingSql, avec une transaction en lecture seule à usage unique et une limitation d'horodatage forte. Ce paramètre permet aux utilisateurs de spécifier les valeurs start_timestamp et end_timestamp pour la période qui vous intéresse. Tous les enregistrements de modifications enregistrés au cours de la période de conservation sont accessibles à l'aide d'une limite d'horodatage en lecture seule forte.

Tous les autres TransactionOptions ne sont pas valides pour les requêtes de flux de modification. De plus, si TransactionOptions.read_only.return_read_timestamp est défini sur "true", une valeur spéciale de kint64max - 1 est renvoyée dans le message Transaction qui décrit la transaction, au lieu d'un horodatage de lecture valide. Cette valeur spéciale doit être supprimée et ne doit pas être utilisée pour les requêtes suivantes.

Chaque requête de flux de modifications peut renvoyer n'importe quel nombre de lignes, chacune contenant un enregistrement de modifications de données, un enregistrement de pulsations ou un enregistrement de partitions enfants. Vous n'avez pas besoin de définir de date limite pour la demande.

Exemple :

Le workflow de requête en streaming commence par émettre la première requête de flux de modifications en spécifiant partition_token sur NULL. La requête doit spécifier la fonction TVF du flux de modifications, l'horodatage de début et de fin de l'intérêt, ainsi que l'intervalle du rythme cardiaque. Lorsque end_timestamp est défini sur NULL, la requête continue de renvoyer des modifications jusqu'à la création des partitions enfants.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:00-00",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

Traitez les enregistrements de données de cette requête jusqu'à ce que les enregistrements de la partition enfant soient renvoyés. Dans l'exemple ci-dessous, deux enregistrements de partition enfants et trois jetons de partition sont renvoyés, puis la requête s'arrête. Les enregistrements de partition enfants d'une requête spécifique partagent toujours le même start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
      // from the initial change stream queries.
      "parent_partition_tokens": [NULL],
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL],
    }
  ],
}
child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:00:01-00",
  "record_sequence": 1000012390,
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL],
    }
  ],
}

Pour traiter les modifications ultérieures à la 2022-05-01 09:00:01-00, créez trois nouvelles requêtes et exécutez-les en parallèle. Les trois requêtes afficheront les modifications de données futures pour la même plage de clés que leurs parents. Définissez toujours start_timestamp sur start_timestamp dans le même enregistrement de partition enfant, et utilisez les mêmes end_timestamp et l'intervalle de pulsation pour traiter les enregistrements de manière cohérente dans toutes les requêtes.

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000);
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:00:01-00",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000);

Après un certain temps, la requête sur child_token_2 se termine après avoir renvoyé un autre enregistrement de partition enfant. Cela indique qu'une nouvelle partition couvrira les modifications futures pour child_token_2 et child_token_3 à partir de 2022-05-01 09:30:15-00. Le même enregistrement est renvoyé par la requête sur child_token_3, car il s'agit des deux partitions parentes du nouveau child_token_4. Pour garantir le traitement strict des enregistrements de données pour une clé spécifique, la requête sur child_token_4 ne doit commencer qu'une fois tous les parents terminés. Dans ce cas, il s'agit de child_token_2 et de child_token_3. Créez une seule requête pour chaque jeton de partition enfant. La conception du workflow de requête doit désigner un parent pour attendre et planifier la requête sur child_token_4.

child partitions record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01 09:30:15-00",
  "record_sequence": 1000012389,
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": [child_token_2, child_token_3],
    }
  ],
}
SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01 09:30:15-00",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

Accédez à GitHub pour découvrir des exemples de gestion et d'analyse des enregistrements de flux de modifications dans le connecteur Dataflow BeamIO.