Utiliser Datastream et Dataflow pour insérer des données en flux continu dans BigQuery

Sur cette page, vous trouverez les bonnes pratiques pour utiliser Datastream et Dataflow pour diffuser des données en flux continu dans BigQuery.

Partitionner des ensembles de données répliqués en fonction de clés définies par l'utilisateur

L'ensemble de données de préproduction dans BigQuery est partitionné automatiquement. Toutefois, l'instance répliquée de l'ensemble de données n'est pas partitionnée par défaut, car les clés de partition des tables répliquées doivent être définies en fonction d'une logique métier spécifique, et non appliquées par Datastream et Dataflow.

Pour chaque table de l'ensemble de données répliquée qui nécessite un partitionnement:

  1. Arrêtez et drainez la tâche Dataflow.

  2. À l'aide de l'éditeur SQL dans BigQuery, exécutez le script SQL suivant pour chaque table de l'ensemble de données répliqué. Pour cet exemple, la table actor de l'ensemble de données datastream_cdc comporte une colonne last_update que nous voulons définir comme clé de partition. En exécutant le script, vous recréez la table avec la clé de partition appropriée.

    create table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' partition by date(last_update)
    as SELECT * FROM '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    drop table '[BigQuery_PROJECT_ID].datastream_cdc.actor'
    
    alter table '[BigQuery_PROJECT_ID].datastream_cdc.actor_new' rename to 'actor'
    
    
  3. Utilisez le modèle Datastream vers BigQuery pour recréer un job Dataflow.

Exécuter des fonctions définies par l'utilisateur pour manipuler des données d'événement

Vous pouvez utiliser le modèle Datastream vers BigQuery pour exécuter une fonction JavaScript définie par l'utilisateur. Pour ce faire, commencez par placer un fichier contenant la fonction à un emplacement spécifique de Cloud Storage. Ensuite, procédez comme suit :

  • Utilisez le paramètre javascriptTextTransformGcsPath dans le modèle pour spécifier l'emplacement dans Cloud Storage du fichier contenant votre fonction définie par l'utilisateur.
  • Utilisez le paramètre javascriptTextTransformFunctionName pour spécifier le nom de la fonction JavaScript que vous souhaitez appeler en tant que fonction définie par l'utilisateur.

Par exemple, vous pouvez exécuter une fonction définie par l'utilisateur pour conserver des enregistrements supprimés dans les tables de l'ensemble de données répliquée dans BigQuery. Ce processus est appelé suppression réversible.

Pour ce faire, créez une fonction qui copie la valeur de la colonne _metadata_deleted dans une nouvelle colonne nommée is_deleted, puis réinitialise la valeur de la colonne _metadata_deleted sur false. Ainsi, la tâche Dataflow ignore les événements de suppression et conserve les enregistrements supprimés lors de la mise à jour de l'ensemble de données répliqué dans BigQuery.

Voici l'exemple de code pour cette fonction définie par l'utilisateur:

/**
* This function changes the behavior of the Datastream to
* BigQuery template to allow soft deletes.
* @param {string} messageString from DatastreamIO data
* @return {string} same as an input message with an added property
*/
function transform(messageString) {
   // messageString is a JSON object as a string
   var messageJson = JSON.parse(messageString);
    // Moving the deleted flag to a new column will cause the pipeline to soft delete data.
   messageJson['is_deleted'] = messageJson['_metadata_deleted'];
   messageJson['_metadata_deleted'] = false;
    return JSON.stringify(messageJson);
 }

Définir la fréquence de fusion

Utilisez le paramètre mergeFrequencyMinutes du modèle Datastream vers BigQuery pour définir la fréquence de fusion. Il s'agit du nombre de minutes entre les fusions pour une table donnée dans l'ensemble de données répliquée dans BigQuery. Pendant que les données historiques sont remplies, nous vous recommandons de limiter la fréquence de fusion (12 ou 24 heures) pour maîtriser les coûts.

Par exemple, si vous définissez la valeur de ce paramètre sur 10 minutes, Dataflow exécutera la tâche qui utilise le modèle toutes les 10 minutes. Toutefois, la première fois que la tâche s'exécute, un délai de cinq minutes est nécessaire. Dans cet exemple, si la tâche s'exécute à 9h14, la première fusion aura lieu à 9h29 (10 minutes pour la fusion et 5 minutes pour le retard). La seconde fusion aura lieu à 9h39 et toutes les fusions suivantes auront lieu toutes les 10 minutes (9h49, 9h59, 10h09, etc.).

Si vous définissez la fréquence de fusion sur 60 minutes, le job sera exécuté toutes les heures, après un délai de 5 minutes pour son exécution initiale. Si le job est programmé pour s'exécuter à 10h, il s'exécutera en fait à 10h05 en raison du délai de cinq minutes. Toutes les fusions ultérieures auront lieu toutes les 60 minutes (11 h 05, 12 h 05, 13 h 05, etc.).

Que ce soit pour contrôler les coûts ou pour d'autres raisons, vous ne pourrez peut-être pas effectuer une fusion à une fréquence adaptée aux besoins de votre entreprise. Vous ne disposez peut-être pas des données les plus récentes. Pour accéder aux données les plus récentes, créez une vue au-dessus des tables des ensembles de données de préproduction et d'instances répliquées dans BigQuery, où la vue imite la fusion. Cette vue est créée sous la forme d'une table logique unique (pour les ensembles de données de préproduction et d'instances répliquées). Si la fréquence de fusion est faible et que vous avez besoin d'accéder plus rapidement aux données, utilisez la vue.