Utiliser Datastream et Dataflow pour importer des données en flux continu dans BigQuery

Cette page présente les bonnes pratiques à suivre pour transmettre des données en flux continu dans BigQuery à l'aide de Datastream et Dataflow.

Partitionner les ensembles de données des réplicas sur des clés définies par l'utilisateur

L'ensemble de données d'approvisionnement dans BigQuery est partitionné automatiquement. Toutefois, par défaut, l'ensemble de données répliqué n'est pas partitionné, car les clés de partitionnement des tables répliquées doivent être définies en fonction d'une logique métier spécifique, au lieu d'être appliquées par Datastream et Dataflow.

Pour chaque table de l'ensemble de données du réplicat qui doit être partitionnée :

  1. Arrêtez et videz la tâche Dataflow.

  2. Utilisez l'éditeur SQL de BigQuery pour exécuter le script SQL suivant pour chaque table du jeu de données du réplica. Dans cet exemple, la table actor de l'ensemble de données datastream_cdc comporte une colonne last_update que nous souhaitons définir comme clé de partitionnement. En exécutant le script, vous recréez la table avec la clé de partition appropriée.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Utilisez le modèle Datastream vers BigQuery pour recréer une tâche Dataflow.

Exécuter des fonctions définies par l'utilisateur pour manipuler les données d'événement

Vous pouvez utiliser le modèle Datastream vers BigQuery pour exécuter une fonction JavaScript définie par l'utilisateur. Pour ce faire, commencez par placer un fichier contenant la fonction à un emplacement spécifique dans Cloud Storage. Ensuite, procédez comme suit :

  • Utilisez le paramètre javascriptTextTransformGcsPath dans le modèle pour spécifier l'emplacement du fichier dans Cloud Storage qui contient votre fonction définie par l'utilisateur.
  • Utilisez le paramètre javascriptTextTransformFunctionName pour spécifier le nom de la fonction JavaScript que vous souhaitez appeler en tant que fonction définie par l'utilisateur.

Par exemple, vous pouvez exécuter une fonction définie par l'utilisateur pour conserver les enregistrements supprimés dans les tables de l'ensemble de données dupliqué dans BigQuery. Ce processus est appelé "suppression réversible".

Pour ce faire, créez une fonction qui copie la valeur de la colonne _metadata_deleted dans une nouvelle colonne nommée is_deleted, puis réinitialise la valeur de la colonne _metadata_deleted sur false. La tâche Dataflow ignore alors les événements de suppression et conserve les enregistrements supprimés lors de la mise à jour de l'ensemble de données du réplica dans BigQuery.

Voici l'exemple de code pour cette fonction définie par l'utilisateur:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Définir la fréquence de fusion

Utilisez le paramètre mergeFrequencyMinutes du modèle Datastream vers BigQuery pour définir la fréquence de fusion. Il s'agit du nombre de minutes entre les fusions pour une table donnée dans l'ensemble de données du réplica dans BigQuery. Pendant que les données historiques sont renseignées, nous vous recommandons de limiter la fréquence de fusion (12 ou 24 heures) afin de contrôler les coûts.

Par exemple, si vous définissez la valeur de ce paramètre sur 10 minutes, Dataflow exécutera la tâche qui utilise le modèle toutes les 10 minutes. Toutefois, la première fois que le job s'exécute, un délai de cinq minutes est observé. Dans cet exemple, si le job s'exécute à 9h14, la première fusion aura lieu à 9h29 (10 minutes pour la fusion et 5 minutes pour le retard). La seconde fusion aura lieu à 9h39 et toutes les fusions suivantes auront lieu toutes les 10 minutes (9h49, 9h59, 10h09, etc.).

Si vous définissez la fréquence de fusion sur 60 minutes, la tâche s'exécutera à l'heure, après un délai de cinq minutes pour l'exécution initiale de la tâche. Si l'exécution du job est planifiée à 10h, elle s'exécutera en réalité à 10h05 en raison du délai de cinq minutes. Toutes les fusions ultérieures auront lieu à intervalles de 60 minutes (11h05, 12h05, 13h05, etc.).

Que ce soit pour contrôler les coûts ou pour d'autres raisons, vous ne pourrez peut-être pas effectuer une fusion à une fréquence adaptée aux besoins de votre entreprise. Vous ne disposez peut-être pas des données les plus récentes. Pour accéder aux données les plus récentes, créez une vue au-dessus des tables des ensembles de données de préproduction et des ensembles de données répliqués dans BigQuery. La vue imite la fusion. Cette vue est créée en tant que table logique (pour les ensembles de données de préproduction et de réplication). Si la fréquence de fusion est faible et que vous avez besoin d'un accès plus rapide aux données, utilisez la vue.