Utiliser Datastream et Dataflow pour diffuser des données dans BigQuery

Sur cette page, vous trouverez les bonnes pratiques à suivre pour utiliser Datastream et Dataflow afin de diffuser des données en flux continu dans BigQuery.

Partitionner les ensembles de données répliqués sur des clés définies par l'utilisateur

L'ensemble de données intermédiaire dans BigQuery est partitionné automatiquement. Toutefois, par défaut, l'ensemble de données répliqué n'est pas partitionné, car les clés de partition des tables répliquées doivent être définies en fonction d'une logique métier spécifique, au lieu d'être appliquées par Datastream et Dataflow.

Pour chaque table de l'ensemble de données répliqué qui doit être partitionnée :

  1. Arrêter et drainer la tâche Dataflow

  2. Utilisez l'éditeur SQL de BigQuery pour exécuter le script SQL suivant pour chaque table du jeu de données répliqué. Dans cet exemple, la table actor de l'ensemble de données datastream_cdc comporte une colonne last_update que nous souhaitons définir comme clé de partition. En exécutant le script, vous recréez la table avec la clé de partition appropriée.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
  3. Utilisez le modèle Datastream vers BigQuery pour recréer une tâche Dataflow.

Exécuter des fonctions définies par l'utilisateur pour manipuler les données d'événement

Vous pouvez utiliser le modèle Datastream vers BigQuery pour exécuter une fonction JavaScript définie par l'utilisateur. Pour ce faire, commencez par placer un fichier contenant la fonction dans un emplacement spécifique de Cloud Storage. Ensuite, procédez comme suit :

  • Utilisez le paramètre javascriptTextTransformGcsPath dans le modèle pour spécifier l'emplacement du fichier dans Cloud Storage qui contient votre fonction définie par l'utilisateur.
  • Utilisez le paramètre javascriptTextTransformFunctionName pour spécifier le nom de la fonction JavaScript que vous souhaitez appeler en tant que fonction définie par l'utilisateur.

Par exemple, vous pouvez exécuter une fonction définie par l'utilisateur pour conserver les enregistrements supprimés dans les tables de l'ensemble de données répliqué dans BigQuery. Ce processus est appelé suppression logicielle.

Pour ce faire, créez une fonction qui copie la valeur de la colonne _metadata_deleted dans une nouvelle colonne nommée is_deleted, puis réinitialise la valeur de la colonne _metadata_deleted sur false. La tâche Dataflow ignore alors les événements de suppression et conserve les enregistrements supprimés lors de la mise à jour de l'ensemble de données répliqué dans BigQuery.

Voici l'exemple de code pour cette fonction définie par l'utilisateur :

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Définir la fréquence de fusion

Utilisez le paramètre mergeFrequencyMinutes du modèle Datastream vers BigQuery pour définir la fréquence de fusion. Il s'agit du nombre de minutes entre les fusions pour une table donnée dans l'ensemble de données répliquées dans BigQuery. Pendant que les données historiques sont en cours de remplissage, nous vous recommandons de maintenir une faible fréquence de fusion (12 ou 24 heures) pour maîtriser les coûts.

Par exemple, si vous définissez la valeur de ce paramètre sur 10 minutes, Dataflow exécutera le job qui utilise le modèle toutes les 10 minutes. Toutefois, la première fois que le job s'exécute, il y a un délai de cinq minutes. Dans cet exemple, si le job s'exécute à 9h14, la première fusion aura lieu à 9h29 (10 minutes pour la fusion et 5 minutes pour le délai). La deuxième fusion aura lieu à 9h39, et toutes les fusions suivantes auront lieu à intervalles de 10 minutes (9h49, 9h59, 10h09, etc.).

Si vous définissez la fréquence de fusion sur 60 minutes, le job s'exécutera toutes les heures, après un délai de cinq minutes pour la première exécution. Si la tâche est planifiée pour s'exécuter à 10h, elle s'exécutera en fait à 10h05 en raison du délai de cinq minutes. Toutes les fusions suivantes auront lieu à intervalles de 60 minutes (11h05, 12h05, 13h05, etc.).

Que ce soit pour contrôler les coûts ou pour d'autres raisons, il est possible que vous ne puissiez pas effectuer de fusion à une fréquence qui répond aux besoins de votre entreprise. Il est possible que vous n'ayez pas les données les plus récentes. Pour accéder aux données les plus récentes, créez une vue au-dessus des tables des ensembles de données de préparation et de réplique dans BigQuery, où la vue imite la fusion. Cette vue est créée sous la forme d'une table logique (pour les ensembles de données de préparation et de réplique). Si la fréquence de fusion est faible et que vous avez besoin d'accéder plus rapidement aux données, utilisez la vue.