Cette page présente les bonnes pratiques à suivre pour transmettre des données en flux continu dans BigQuery à l'aide de Datastream et Dataflow.
Partitionner les ensembles de données des réplicas sur des clés définies par l'utilisateur
L'ensemble de données d'approvisionnement dans BigQuery est partitionné automatiquement. Toutefois, par défaut, l'ensemble de données de la réplication n'est pas partitionné, car les clés de partition des tables de réplication doivent être définies en fonction d'une logique métier spécifique, au lieu d'être appliquées par Datastream et Dataflow.
Pour chaque table de l'ensemble de données du réplicat qui doit être partitionnée:
Utilisez l'éditeur SQL de BigQuery pour exécuter le script SQL suivant pour chaque table du jeu de données du réplica. Dans cet exemple, la table
actor
de l'ensemble de donnéesdatastream_cdc
comporte une colonnelast_update
que nous souhaitons définir comme clé de partitionnement. En exécutant le script, vous recréez la table avec la clé de partition appropriée.create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new'
partition by date(last_update) as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor' drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor' alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor' Utilisez le modèle Datastream vers BigQuery pour recréer une tâche Dataflow.
Exécuter des fonctions définies par l'utilisateur pour manipuler les données d'événement
Vous pouvez utiliser le modèle Datastream vers BigQuery pour exécuter une fonction JavaScript définie par l'utilisateur. Pour ce faire, placez d'abord un fichier contenant la fonction à un emplacement spécifique dans Cloud Storage. Ensuite, procédez comme suit :
- Utilisez le paramètre
javascriptTextTransformGcsPath
dans le modèle pour spécifier l'emplacement du fichier dans Cloud Storage qui contient votre fonction définie par l'utilisateur. - Utilisez le paramètre
javascriptTextTransformFunctionName
pour spécifier le nom de la fonction JavaScript que vous souhaitez appeler en tant que fonction définie par l'utilisateur.
Par exemple, vous pouvez exécuter une fonction définie par l'utilisateur pour conserver les enregistrements supprimés dans les tables de l'ensemble de données du réplica dans BigQuery. Ce processus est appelé "suppression temporaire".
Pour ce faire, créez une fonction qui copie la valeur de la colonne _metadata_deleted
dans une nouvelle colonne nommée is_deleted
, puis réinitialise la valeur de la colonne _metadata_deleted
sur false
. La tâche Dataflow ignore alors les événements de suppression et conserve les enregistrements supprimés lors de la mise à jour de l'ensemble de données du réplica dans BigQuery.
Voici un exemple de code pour cette fonction définie par l'utilisateur:
/** * This function changes the behavior of the Datastream to * BigQuery template to allow soft deletes. * @param {string} messageString from DatastreamIO data * @return {string} same as an input message with an added property */ function transform(messageString) { // messageString is a JSON object as a string var messageJson = JSON.parse(messageString); // Moving the deleted flag to a new column will cause the pipeline to soft delete data. messageJson['is_deleted'] = messageJson['_metadata_deleted']; messageJson['_metadata_deleted'] = false; return JSON.stringify(messageJson); }
Définir la fréquence de fusion
Utilisez le paramètre mergeFrequencyMinutes
du modèle Datastream vers BigQuery pour définir la fréquence de fusion. Il s'agit du nombre de minutes entre les fusions pour une table donnée dans l'ensemble de données du réplica dans BigQuery. Pendant que les données historiques sont renseignées, nous vous recommandons de limiter la fréquence de fusion (12 ou 24 heures) afin de maîtriser les coûts.
Par exemple, si vous définissez la valeur de ce paramètre sur 10 minutes, Dataflow exécutera la tâche qui utilise le modèle toutes les 10 minutes. Toutefois, la première fois que la tâche s'exécute, un délai de cinq minutes est appliqué. Dans cet exemple, si la tâche s'exécute à 9h14, la première fusion aura lieu à 9h29 (10 minutes pour la fusion et 5 minutes pour le délai). La deuxième fusion aura lieu à 9h39, et toutes les fusions suivantes auront lieu à intervalles de 10 minutes (9h49, 9h59, 10h09, etc.).
Si vous définissez la fréquence de fusion sur 60 minutes, la tâche s'exécutera à l'heure, après un délai de cinq minutes pour l'exécution initiale de la tâche. Si la tâche est planifiée pour 10h, elle s'exécutera en réalité à 10h05 en raison du délai de cinq minutes. Toutes les fusions ultérieures auront lieu à intervalles de 60 minutes (11h05, 12h05, 13h05, etc.).
Pour des raisons de contrôle des coûts ou pour d'autres raisons, vous ne pourrez peut-être pas effectuer de fusion à une fréquence qui répond à vos besoins commerciaux. Vous ne disposez peut-être pas des données les plus récentes. Pour accéder aux données les plus récentes, créez une vue au-dessus des tables des ensembles de données de préproduction et de réplication dans BigQuery, où la vue imite la fusion. Cette vue est créée en tant que table logique (pour les ensembles de données de préproduction et de réplication). Si la fréquence de fusion est faible et que vous avez besoin d'un accès plus rapide aux données, utilisez la vue.