Exemples de pipelines pour les jeux mobiles

Ce tutoriel propose une série d'exemples de pipelines Dataflow qui présentent des fonctionnalités plus complexes que l'exemple de base WordCount. Dans cette section, les pipelines traitent les données d’un jeu imaginaire auquel les utilisateurs participent sur leur téléphone portable. Les pipelines exposent des niveaux de complexité croissants dans leurs processus de traitement. Le premier pipeline montre comment exécuter une tâche d'analyse par lots pour obtenir des données de score relativement simples, tandis que les suivants utilisent les fonctionnalités de fenêtrage et de déclencheurs de Dataflow pour fournir une analyse de données à faible latence et une compréhension plus élaborée des modèles de jeu de l'utilisateur.

Chaque fois qu'un utilisateur exécute une instance de notre jeu imaginaire sur mobile, un événement de données est généré. Chaque événement de données comprend les informations suivantes :

  • L'ID unique du joueur.
  • L'ID de l'équipe du joueur.
  • Une valeur de score pour cette instance de jeu particulière.
  • Un horodatage qui enregistre le moment auquel l'instance de jeu s'est produite, soit l'heure de l'événement pour chaque événement de données de jeu.

Lorsque l'utilisateur termine une instance du jeu, son téléphone envoie l'événement de données à un serveur de jeu où les données sont consignées et stockées dans un fichier. En règle générale, les données sont immédiatement envoyées au serveur dès la fin du jeu. Toutefois, les utilisateurs peuvent jouer "hors connexion" lorsque leur téléphone est hors d'atteinte du serveur (en avion ou en dehors de la zone de couverture du réseau, par exemple). Lorsque le téléphone de l'utilisateur entre à nouveau en contact avec le serveur de jeu, le téléphone envoie toutes les données de jeu accumulées.

Certains événements de données peuvent donc être reçus par le serveur de jeu bien après que les utilisateurs les ont générés. Cette différence temporelle peut avoir des incidences sur le processus de traitement des pipelines, qui doivent tenir compte dans leurs calculs du moment où chaque score a été généré. Ces pipelines peuvent, par exemple, suivre les scores générés à chaque heure de la journée ou calculer la durée des sessions de jeu continues (ces deux éléments dépendant de l'heure de l'événement de chaque enregistrement de données).

Les exemples de pipelines de jeu mobile varient en complexité, allant de la simple analyse par lots à des pipelines plus complexes pouvant effectuer une analyse en temps réel et détecter les abus. Cette section vous guide à travers chaque exemple et montre comment utiliser les fonctionnalités Dataflow telles que le fenêtrage et les déclencheurs pour étendre les capacités de votre pipeline.

UserScore : Pipeline de traitement de base des scores par lots

Le pipeline UserScore est l'exemple le plus simple de traitement des données de jeu sur mobile. UserScore détermine le score total par utilisateur sur un ensemble de données fini (par exemple, les scores d'une journée stockés sur le serveur de jeu). Les pipelines comme UserScore sont exécutés périodiquement une fois que toutes les données pertinentes ont été collectées. Par exemple, UserScore peut s'exécuter la nuit sur les données collectées au cours de la journée.

Que fait UserScore ?

Au cours d'une journée de données de score, chaque ID utilisateur peut avoir plusieurs enregistrements (si l'utilisateur joue plusieurs instances du jeu pendant la fenêtre d'analyse), chacun avec ses propres valeurs de score et d'horodatage. Si nous voulons déterminer le score total de toutes les instances jouées par un utilisateur au cours de la journée, notre pipeline devra regrouper tous les enregistrements par utilisateur.

À mesure que le pipeline traite chaque événement, le score de l'événement est ajouté à la somme totale de cet utilisateur particulier.

UserScore analyse uniquement les données dont il a besoin dans chaque enregistrement, en particulier l'ID utilisateur et la valeur du score. Le pipeline ne prend en compte l'heure de l'événement pour aucun des enregistrements. Il traite simplement toutes les données présentes dans les fichiers d'entrée que vous spécifiez lors de l'exécution du pipeline.

Le flux de pipeline de base UserScore effectue les opérations suivantes :

  1. Lit les données de score de la journée à partir d'un fichier stocké dans Google Cloud Storage.
  2. Calcule la somme des valeurs de score pour chaque utilisateur unique en regroupant chaque événement de jeu par ID utilisateur et en combinant les valeurs de score pour obtenir le score total de cet utilisateur particulier.
  3. Écrit les résultats dans une table BigQuery.

Le diagramme suivant présente les données de score de plusieurs utilisateurs sur la période d'analyse du pipeline. Chaque point de données qui apparaît correspond à un événement représenté par la paire utilisateur/score :

Figure 1 : Données de score de trois utilisateurs.

Cet exemple utilise le traitement par lots, l'axe Y du diagramme représentant l'heure du traitement : le pipeline traite les événements par ordre ascendant sur l'axe Y à mesure qu'ils se succèdent. L'axe X du diagramme représente l'heure de l'événement pour chaque événement de jeu, comme indiqué par son horodatage. Notez que les événements individuels dans le diagramme ne sont pas traités par le pipeline dans l'ordre où ils se sont produits (en fonction de leur horodatage).

Après avoir lu les événements de score dans le fichier d'entrée, le pipeline regroupe toutes les paires utilisateur/score et fait la somme des valeurs de score pour obtenir le score total par utilisateur. UserScore intègre la logique de base de cette étape en tant que transformation composite définie par l'utilisateur ExtractAndSumScore :

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(
      PCollection<GameActionInfo> gameInfo) {

    return gameInfo
      .apply(MapElements
          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
      .apply(Sum.<String>integersPerKey());
  }
}

La ligne ExtractAndSumScore remplit une fonction plus générique permettant de spécifier le champ par lequel vous voulez regrouper les données (dans le cas de notre jeu, par utilisateur ou par équipe). Nous pouvons ainsi réutiliser l'écriture ExtractAndSumScore dans d'autres pipelines qui, par exemple, regroupent les données de score par équipe.

Voici la principale méthode utilisée pour le pipeline UserScore et son application aux trois étapes successives.

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
    .apply("WriteUserScoreSums",
        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
                                                 configureBigQueryWrite()));

  // Run the batch pipeline.
  pipeline.run();
}

Analyse des résultats

UserScore écrit les données dans une table BigQuery (nommée user_score par défaut). À partir des données de la table BigQuery, nous pouvons exécuter une analyse interactive plus poussée, par exemple pour obtenir la liste des N utilisateurs ayant réalisé le meilleur score au cours d'une certaine journée.

Supposons que nous voulons déterminer de façon interactive le top 10 des utilisateurs pour un jour donné. Dans l'interface utilisateur BigQuery, nous exécutons la requête suivante :

SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10

Limites

Comme le montre l'écriture de l'exemple, le pipeline UserScore présente quelques limites :

  • Certaines données de score pouvant être générées par des joueurs hors connexion et envoyées après la coupure quotidienne, les données de résultat de jeu générées par le pipeline UserScore peuvent être incomplètes. UserScore ne traite que l'ensemble d'entrées fixes présent dans les fichiers d'entrée lors de l'exécution du pipeline.
  • UserScore traite tous les événements de données présents dans le fichier d'entrée au moment du traitement et n'effectue aucun examen ni d'autre vérification sur la base de l'heure de l'événement. Par conséquent, les résultats peuvent inclure certaines valeurs dont l'heure d'événement se situe en dehors de la période d'analyse concernée, par exemple les enregistrements tardifs du jour précédent.
  • Comme UserScore ne s'exécute qu'une fois toutes les données collectées, il existe une latence élevée entre le moment où les utilisateurs génèrent des événements de données (l'heure de l'événement) et le moment où les résultats sont calculés (l'heure du traitement).
  • Par ailleurs, UserScore ne communique que les résultats totaux de la journée entière et ne fournit aucune information sur la manière dont les données ont été accumulées au cours de la journée.

L'exemple de pipeline suivant montre comment utiliser les fonctionnalités Dataflow pour contourner ces limites.

HourlyTeamScore : Pipeline de traitement avancé par lots avec fenêtrage

Le pipeline HourlyTeamScore développe les principes de base de l'analyse par lots employés dans le pipeline UserScore et atténue certaines de ses limites. HourlyTeamScore effectue une analyse plus fine en tirant parti des fonctionnalités supplémentaires offertes par le SDK Dataflow et d'autres éléments contenus dans les données de jeu. Par exemple, HourlyTeamScore peut exclure les données qui n'appartiennent pas à la période d'analyse concernée.

Tout comme le pipeline UserScore, HourlyTeamScore est conçu pour s'exécuter périodiquement une fois que toutes les données pertinentes ont été rassemblées (par exemple, une fois par jour). Le pipeline lit un ensemble de données fixe stocké dans un fichier et écrit les résultats dans une table BigQuery, exactement comme le fait UserScore.

Que fait HourlyTeamScore ?

HourlyTeamScore calcule le score total par équipe et par heure dans un ensemble de données fixe (par exemple, les données d'une journée).

  • Plutôt que de traiter l’ensemble de données complet en une seule fois, HourlyTeamScore répartit les données d’entrée dans des fenêtres logiques et effectue les calculs sur ces fenêtres. Cela permet au pipeline HourlyUserScore de fournir des informations sur les données de score par fenêtre, chaque fenêtre représentant la progression des scores à intervalles réguliers (par exemple, toutes les heures).
  • HourlyTeamScore filtre les événements de données en fonction de l'heure de l'événement (indiquée par l'horodatage intégré) dans la période d'analyse spécifiée. Sommairement, le pipeline vérifie l'horodatage de chaque événement de jeu et s'assure qu'il se situe dans la période que nous souhaitons analyser (dans ce cas, la journée choisie). Les événements de données des jours précédents sont ignorés et ne sont pas inclus dans les totaux des scores. Le pipeline HourlyTeamScore est ainsi plus stable et génère moins d'erreurs que son homologue UserScore. Cela permet également de comptabiliser les données tardives dont l'horodatage se situe dans la période d'analyse concernée.

Examinons maintenant en détail chacune des améliorations apportées par HourlyTeamScore :

Fenêtrage à heure fixe

Le fenêtrage à heure fixe permet au pipeline de fournir des informations plus pertinentes sur la manière dont les événements se sont accumulés dans l'ensemble de données au cours de la période d'analyse. Dans notre exemple, nous voyons à quel moment le groupe a été actif pour chaque journée et le score réalisé par l'équipe à ce moment-là.

Le diagramme suivant montre comment le pipeline traite les données de score d'une seule équipe dans une journée après application du fenêtrage à heure fixe :

Figure 2: Données de score pour deux équipes. Les scores de chaque équipe sont répartis dans des fenêtres logiques en fonction du moment où ces scores se sont produits à l'heure de l'événement.

Notez comment l'heure du traitement progresse : les sommes s'affichent maintenant par fenêtre, chaque fenêtre représentant un intervalle dans l'heure de l'événement au cours de la journée d'enregistrement des scores.

La fonctionnalité de fenêtrage Dataflow utilise les informations d'horodatage intrinsèques associées à chaque élément d'une classe PCollection. Comme nous voulons que notre pipeline affiche une fenêtre en fonction de l'heure de l'événement, nous devons d'abord extraire l'horodatage intégré à chaque enregistrement de données et l'appliquer à l'élément correspondant dans les données de score PCollection. Ensuite, le pipeline pourra appliquer la fonctionnalité de fenêtrage pour répartir les données PCollection en fenêtres logiques.

Voici le code, qui montre comment HourlyTeamScore utilise les transformations WithTimestamps et Window pour effectuer ces opérations :

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
       WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(Window.named("FixedWindowsTeam")
    .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getWindowDuration()))))

Notez que les transformations utilisées par le pipeline pour spécifier le fenêtrage sont distinctes des transformations de traitement effectif des données (telles que ExtractAndSumScores). Cette fonctionnalité offre une certaine flexibilité dans la conception du pipeline Dataflow. En effet, vous pouvez exécuter des transformations existantes sur des ensembles de données qui présentent des caractéristiques de fenêtrage différentes.

Filtrage en fonction de l'heure de l'événement

HourlyTeamScore utilise les fonctions de filtrage pour supprimer tous les événements de notre ensemble de données dont les horodatages sont en dehors de la période d'analyse concernée (c'est-à-dire non générés pendant la journée qui nous intéresse). Cela empêche le pipeline d'inclure par erreur les données qui, par exemple, ont été générées hors connexion le jour précédent, mais envoyées au serveur de jeu pendant la journée en cours.

Cela permet également au pipeline d'inclure les données tardives pertinentes, soit des événements de données dont les horodatages sont valides, mais qui ont été reçus après la fin de la période d'analyse. Si, par exemple, l'heure de coupure de notre pipeline est à 12h00 et que nous programmons l'exécution à 02h00 du matin, nous pouvons filtrer tous les événements qui se sont produits après la coupure de 12h00 en fonction des indications de l'horodatage. Les événements de données en retard et reçus entre 12h01 et 02h00, mais dont l'horodatage indique qu'ils se sont produits avant l'heure limite de 12h00, seront inclus dans le traitement du pipeline.

HourlyTeamScore utilise la transformation Filter pour effectuer cette opération. Lorsque vous appliquez les critères Filter, vous spécifiez un prédicat auquel chaque enregistrement de données est comparé. Les enregistrements de données acceptés au filtrage sont inclus, les autres sont exclus. Dans notre cas, le prédicat est l'heure limite que nous spécifions et nous ne comparons qu'une partie des données, soit le champ d'horodatage.

Le code suivant montre comment HourlyTeamScore utilise la transformation Filter pour filtrer les événements qui se produisent avant ou après la période d'analyse concernée :

.apply("FilterStartTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply("FilterEndTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

Calcul des scores par équipe et par fenêtre

HourlyTeamScore utilise la même transformation ExtractAndSumScores que le pipeline UserScore, mais transfère une clé différente (équipe au lieu d'utilisateur). De plus, comme le pipeline applique ExtractAndSumScores après application d'un fenêtrage fixe d'une heure aux données d'entrée, celles-ci sont regroupées à la fois par équipe et par fenêtre. La séquence complète des transformations dans la méthode principale HourlyTeamScore est exposée ici :

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    // Parse the incoming data.
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))

    // Filter out data before and after the given times so that it is not included
    // in the calculations. As we collect data in batches (say, by day), the batch for the day
    // that we want to analyze could potentially include some late-arriving data from the previous
    // day. If so, we want to weed it out. Similarly, if we include data from the following day
    // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
    // that fall after the time period we want to analyze.
    .apply("FilterStartTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
    .apply("FilterEndTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

    // Add an element timestamp based on the event log, and apply fixed windowing.
    .apply("AddEventTimestamps",
           WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
    .apply(Window.named("FixedWindowsTeam")
        .<GameActionInfo>into(FixedWindows.of(
              Duration.standardMinutes(options.getWindowDuration()))))

    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
    .apply("WriteTeamScoreSums",
      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
          configureWindowedTableWrite()));

  pipeline.run();
}

Limites

Tel qu'il est écrit, le pipeline HourlyTeamScore présente encore une limite :

  • HourlyTeamScore comporte encore une latence élevée entre le moment où les événements de données se produisent (heure de l'événement) et celui où les résultats sont générés (heure du traitement), car, en tant que pipeline par lots, il doit attendre que tous les événements de données soient présents.

LeaderBoard : Pipeline de traitement par flux des données de jeu en temps réel

Pour résoudre le problème de latence présent dans les pipelines UserScore et HourlyTeamScore, une solution consiste à lire les données de score à partir d'une source de flux. Le pipeline LeaderBoard applique un traitement par flux en lisant les données de score à partir de Google Cloud Pub/Sub, plutôt que dans un fichier stocké sur le serveur de jeu.

Le pipeline LeaderBoard montre également comment traiter simultanément les données de score en fonction de l'heure du traitement et de l'heure de l'événement. LeaderBoard fournit des données sur les scores individuels des utilisateurs et sur les scores des équipes, chacun en fonction d'une période différente.

Comme le pipeline LeaderBoard lit les données de jeu à partir d'une source de flux à mesure qu'elles sont générées, nous pouvons le considérer comme une tâche continue qui s'exécute en même temps que le processus de jeu. LeaderBoard fournit ainsi des informations à faible latence sur le jeu des utilisateurs à un moment donné. Ces données sont utiles pour mettre en ligne un tableau de scores permettant aux utilisateurs de suivre en direct leurs progrès par rapport aux autres joueurs.

Que fait LeaderBoard ?

Le pipeline LeaderBoard lit les données de jeu publiées sur Pub/Sub quasiment en temps réel et les utilise pour effectuer deux tâches de traitement distinctes :

  • LeaderBoard calcule le score total pour chaque utilisateur et publie des résultats spéculatifs toutes les dix minutes pendant l'heure du traitement. C'est-à-dire que toutes les dix minutes, le pipeline génère les données de score total par utilisateur que le pipeline a traitées jusqu'à présent. Ce calcul fournit un "tableau de classement" quasiment en temps réel, quelle que soit l'heure à laquelle les événements de jeu ont été générés.
  • LeaderBoard calcule les scores par équipe toutes les heures pendant l'exécution du pipeline. Cela est utile si, par exemple, nous voulons récompenser l’équipe qui a obtenu le meilleur score par heure de jeu. Le calcul des scores par équipe utilise un fenêtrage à intervalles fixes pour répartir les données d'entrée par fenêtres finies d'une heure en fonction de l'heure de l'événement (indiquée par l'horodatage) à mesure que les données arrivent dans le pipeline.

    En outre, le calcul des scores par équipe utilise les mécanismes déclencheurs de Dataflow pour fournir des résultats spéculatifs toutes les heures (mis à jour par intervalles de cinq minutes jusqu'à ce que l'heure soit écoulée), ainsi que pour capturer les données en retard et les ajouter à la fenêtre horaire correspondante.

Ces deux tâches sont examinées en détail ci-après.

Calcul du score utilisateur en fonction de l'heure du traitement

Nous voulons que notre pipeline génère un score total cumulé pour chaque utilisateur toutes les dix minutes pendant l'exécution. Ce calcul ne tient pas compte du moment auquel le score réel a été généré par l'instance de jeu de l'utilisateur. Il affiche simplement la somme de tous les scores de cet utilisateur qui sont arrivés dans le pipeline jusqu'à présent. Les données en retard sont incluses dans le calcul chaque fois qu'elles arrivent dans le pipeline pendant l'exécution.

Comme nous voulons prendre en compte toutes les données arrivées dans le pipeline à chaque mise à jour du calcul, nous demandons au pipeline de traiter toutes les données de score des utilisateurs dans une seule fenêtre globale. Bien que la fenêtre globale soit illimitée, nous pouvons spécifier une sorte de point de coupure temporaire pour chaque calcul de dix minutes à l'aide d'un déclencheur de l'heure du traitement.

Une fois le déclencheur de l'heure du traitement spécifié pour la fenêtre globale, le pipeline prend un "instantané" du contenu de la fenêtre à chaque activation du déclencheur (soit toutes les dix minutes). Comme nous utilisons une seule fenêtre globale, chaque instantané contient toutes les données collectées jusqu'à ce moment-là. Le diagramme suivant montre les effets de l’utilisation d’un déclencheur de l'heure du traitement sur la fenêtre globale :

Figure 3: Données de score pour trois utilisateurs. Les scores de chaque utilisateur sont regroupés dans une seule fenêtre globale, avec un déclencheur qui génère un instantané des données de résultat toutes les dix minutes.

À mesure que l'heure du traitement avance et que d'autres scores sont traités, le déclencheur génère le total mis à jour pour chaque utilisateur.

L'exemple de code suivant montre comment LeaderBoard définit le déclencheur de l'heure du traitement pour générer les données des scores utilisateur :

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing.
 * Get periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
    return input.apply("LeaderboardUserGlobalWindow",
        Window.<GameActionInfo>into(new GlobalWindows())
            // Get periodic results every ten minutes.
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
            .accumulatingFiredPanes()
            .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

Notez que LeaderBoard utilise un déclencheur d'accumulation pour le calcul du score utilisateur (par appel de la commande .accumulatingFiredPanes spécifiée dans les paramètres du déclencheur). En appliquant un déclencheur d'accumulation, le pipeline cumule les données générées jusqu'ici avec les nouvelles données arrivées depuis la dernière activation du déclencheur. LeaderBoard dispose ainsi des scores cumulés de l'utilisateur, au lieu d'une série de sommes individuelles.

Calculer le score par équipe en fonction de l'heure du traitement

Nous voulons que notre pipeline génère également le score total de chaque équipe pour chaque heure de jeu. Contrairement au calcul du score de l'utilisateur, nous allons prendre en compte ici le moment auquel chaque score a effectivement eu lieu pendant l'heure de l'événement, car nous voulons traiter chaque heure de jeu individuellement. Nous souhaitons également fournir des mises à jour spéculatives à mesure que chaque heure avance et inclure dans nos calculs les instances de données en retard (qui arrivent après l'heure à laquelle les données sont considérées comme complètes).

Comme nous considérons chaque heure individuellement, nous pouvons appliquer un fenêtrage à heure fixe à nos données d'entrée, comme dans HourlyTeamScore. Pour fournir les mises à jour spéculatives et celles des données en retard, nous allons spécifier des paramètres de déclenchement supplémentaires. Le déclencheur fera en sorte que chaque fenêtre calcule et diffuse les résultats à un intervalle spécifié (dans ce cas, toutes les cinq minutes), et continue de se déclencher une fois que la fenêtre est considérée comme "terminée" pour prendre en compte les données en retard. De la même façon que pour le calcul du score de l'utilisateur, nous allons définir le déclencheur en mode accumulation pour être certains d'obtenir une somme cumulée pour chaque fenêtre d'une heure.

Les déclencheurs de mises à jour spéculatives et des données tardives permettent de résoudre le problème du décalage temporel. Les événements en cours ne sont pas nécessairement traités dans l'ordre dans lequel ils se sont réellement produits selon l'horodatage. Ils peuvent arriver en désordre ou en retard dans le pipeline (dans notre cas, ils ont été générés alors que le téléphone de l'utilisateur n'était plus connecté à un réseau). Dataflow doit pouvoir déterminer à quel moment il peut raisonnablement décider qu'il dispose de "toutes" les données pour la fenêtre concernée. Ce processus est nommé filigrane.

Dans l'absolu, toutes les données devraient être traitées immédiatement dès leur arrivée, de sorte que l'heure du traitement concorde avec l'heure de l'événement (ou qu'au moins une relation linéaire soit établie). Cependant, l'imprécision étant inhérente aux systèmes distribués (comme les délais de rapports transmis par nos téléphones), Dataflow utilise souvent un filigrane heuristique.

Le diagramme suivant montre la relation entre l'heure du traitement en cours et l'heure de l’événement pour chaque score des deux équipes :

Figure 4 : Données de score par équipe, avec fenêtrage par heure d'événement. Un déclencheur basé sur l'heure du traitement force la fenêtre à diffuser les résultats préliminaires spéculatifs et à inclure les résultats tardifs.

La ligne en pointillé dans le diagramme représente le filigrane "idéal" : le concept de Dataflow selon lequel toutes les données d'une fenêtre peuvent raisonnablement être considérées comme étant arrivées. La ligne continue irrégulière représente le filigrane réel, comme déterminé par la source de données (dans notre cas, Pub/Sub).

Les données qui arrivent au-dessus de la ligne continue du filigrane sont des données en retard, soit un événement de score qui a été retardé (peut-être généré hors connexion) et arrivé après la fermeture de la fenêtre correspondante. Le déclencheur en mode tardif de notre pipeline garantit la prise en compte de ces données en retard dans le calcul du total.

L'exemple de code suivant montre comment notre pipeline LeaderBoard applique un fenêtrage à heure fixe avec les déclencheurs adéquats afin d'effectuer les calculs souhaités :

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
    return infos.apply("LeaderboardTeamFixedWindows",
        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
            // We will get early (speculative) results as well as cumulative
            // processing of late data.
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(FIVE_MINUTES))
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(TEN_MINUTES)))
            .withAllowedLateness(allowedLateness)
            .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

Dans leur ensemble, ces stratégies de traitement nous permettent de résoudre les problèmes de latence et d'exhaustivité présents dans les pipelines UserScore et HourlyTeamScore, tout en utilisant les mêmes transformations de base pour traiter les données. En réalité, les deux calculs appliquent la même transformation ExtractAndSumScore que celle utilisée pour les pipelines UserScore et HourlyTeamScore.

GameStats : Pipeline de détection des abus et d'analyse de l'utilisation

Si le fenêtrage de base et les déclencheurs du pipeline LeaderBoard permettent d'obtenir une analyse de données flexible et à faible latence, nous disposons d'autres techniques de fenêtrage plus avancées pour réaliser une analyse exhaustive. Certaines méthodes de calcul conçues pour détecter les abus du système (comme le spam) ou mieux comprendre le comportement des utilisateurs en font partie. Le pipeline GameStats s’appuie sur la fonctionnalité à faible latence de LeaderBoard pour montrer comment vous pouvez utiliser Dataflow pour effectuer ce type d’analyse avancée.

Tout comme LeaderBoard, le pipeline GameStats lit les données à partir d'une source de flux, dans notre cas Pub/Sub. Le programme doit être conçu comme une tâche en continu qui fournit des tendances au fil des actions de jeu des utilisateurs.

Que fait GameStats ?

Comme LeaderBoard, GameStats calcule le score total par équipe et par heure. Cependant, le pipeline réalise également deux types d’analyses plus complexes :

  • GameStats comporte un mécanisme de détection d'abus qui effectue une simple analyse statistique des données de score afin de détecter, le cas échéant, les utilisateurs susceptibles d'être des spammeurs ou des robots. La liste des spammeurs ou des robots présumés est ensuite appliquée pour filtrer ces utilisateurs frauduleux et les exclure du calcul du score horaire de l'équipe.
  • GameStats analyse les modèles d'utilisation en regroupant les données de jeu reçues pendant les mêmes heures d'événement à l'aide d'un fenêtrage de session. Nous obtenons ainsi des tendances permettant de mieux comprendre la durée moyenne des sessions de jeu ainsi que leur évolution dans le temps.

Ces fonctionnalités sont examinées plus en détail ci-après.

Détection des abus

Supposons que le calcul des scores dans notre jeu dépend de la vitesse de clic d'un utilisateur sur son téléphone. Le mécanisme de détection des abus de GameStats analyse les données de score de chaque utilisateur pour déterminer s'il présente un "taux de clics" anormalement élevé et, partant, un score anormalement élevé. Cela pourrait indiquer la présence d'un bot qui agit beaucoup plus vite qu'un humain.

Pour déterminer si un score est "anormalement" élevé, GameStats calcule la moyenne de chaque score dans cette fenêtre à heure fixe, puis compare chaque score individuel au score moyen, multiplié par un coefficient de pondération arbitraire (dans notre cas, 2,5). Ainsi, tout score supérieur à 2,5 fois la moyenne est considéré comme étant du spam. Le pipeline GameStats suit une liste de "spammeurs" et les exclut des calculs de score par équipe pour le tableau de classement.

Comme la moyenne dépend des données du pipeline, nous devons la calculer, puis utiliser la valeur calculée dans une transformation ParDo suivante qui filtre les scores dépassant la valeur pondérée. Pour ce faire, nous pouvons inscrire la moyenne calculée comme entrée secondaire dans le filtrage ParDo.

L'exemple de code suivant montre la transformation composite qui gère la détection des abus. Nous utilisons la transformation Sum.integersPerKey pour additionner tous les scores par utilisateur, puis la transformation Mean.globally pour déterminer le score moyen de tous les utilisateurs. Une fois le calcul effectué (en tant que PCollectionView), nous pouvons l'appliquer au filtrage ParDo en utilisant .withSideInputs.

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores = userScores
        .apply("UserSum", Sum.<String>integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
        .apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered = sumScores
        .apply(ParDo
            .named("ProcessAndFilter")
            // use the derived mean total score as a side input
            .withSideInputs(globalMeanScore)
            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
              private final Aggregator<Long, Long> numSpammerUsers =
                createAggregator("SpammerUsers", new Sum.SumLongFn());
              @Override
              public void processElement(ProcessContext c) {
                Integer score = c.element().getValue();
                Double gmc = c.sideInput(globalMeanScore);
                if (score > (gmc * SCORE_WEIGHT)) {
                  LOG.info("user " + c.element().getKey() + " spammer score " + score
                      + " with mean " + gmc);
                  numSpammerUsers.addValue(1L);
                  c.output(c.element());
                }
              }
            }));
    return filtered;
  }
}

La transformation de la détection d’abus génère une vue des utilisateurs suspectés d’être des spambots. Cette vue nous permet ensuite de filtrer tous les utilisateurs de ce type au moment du calcul du score d'équipe par heure, en utilisant à nouveau le mécanisme d'entrée secondaire. L'exemple de code suivant montre à quel endroit nous insérons le filtre antispam entre le fenêtrage des scores sous forme de fenêtres fixes et l'extraction des scores d'équipe :

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
  .apply(Window.named("WindowIntoFixedWindows")
      .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getFixedWindowDuration())))
      )
  // Filter out the detected spammer users, using the side input derived above.
  .apply(ParDo.named("FilterOutSpammers")
          .withSideInputs(spammersView)
          .of(new DoFn<GameActionInfo, GameActionInfo>() {
            @Override
            public void processElement(ProcessContext c) {
              // If the user is not in the spammers Map, output the data element.
              if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                c.output(c.element());
              }
            }}))
  // Extract and sum teamname/score pairs from the event data.
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

Analyse des modèles d'utilisation

En examinant les heures d'événement pour chaque score de jeu et en rassemblant les scores par heures d'événement semblables dans des sessions, nous parvenons à savoir à quel moment les utilisateurs jouent à notre jeu et pendant combien de temps. GameStats utilise la fonction intégrée de fenêtrage de session de Dataflow pour regrouper les scores des utilisateurs dans des sessions en fonction de l'heure à laquelle ils se sont produits.

Lorsque vous définissez le fenêtrage de session, vous spécifiez un intervalle de pause minimal entre les événements. Tous les événements qui se succèdent de façon plus rapprochée que l'intervalle de pause minimal sont regroupés dans la même fenêtre. Les événements dont le différentiel d’heure d’arrivée est supérieur à cet intervalle sont regroupés dans des fenêtres distinctes. Selon la manière dont nous définissons l'intervalle de pause minimal, nous pouvons sans risque supposer que les scores d'une même fenêtre de session font partie de la même séquence de jeu (relativement) ininterrompue. Les scores dans une fenêtre différente indiquent que l'utilisateur a interrompu le jeu pendant au moins l'intervalle de pause minimal avant de le reprendre plus tard.

Le diagramme suivant montre à quoi ressemblent les données lorsqu'elles sont regroupées dans des fenêtres de session. Contrairement aux fenêtres fixes, les fenêtres de session sont différentes pour chaque utilisateur et dépendent du modèle de jeu de chacun :

Un diagramme représentant le fenêtrage de session.
Figure 5 : Sessions utilisateur, avec un intervalle de pause minimal. Notez les différences de sessions entre utilisateurs, selon le nombre d'instances de jeu et la durée des pauses entre ces instances.

Les données de fenêtrage de sessions nous permettent donc de déterminer la durée moyenne de jeu sans interruption pour tous les utilisateurs, ainsi que le score total réalisé au cours de chaque session. Pour traduire tout cela dans le code, nous appliquons d'abord le fenêtrage de session en additionnant les scores par utilisateur et par session, puis nous utilisons une transformation pour calculer la durée de chaque session :

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
  .apply(Window.named("WindowIntoSessions")
        .<KV<String, Integer>>into(
              Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
    .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
  // For this use, we care only about the existence of the session, not any particular
  // information aggregated over it, so the following is an efficient way to do that.
  .apply(Combine.perKey(x -> 0))
  // Get the duration per session.
  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))

Nous obtenons ainsi un ensemble de sessions utilisateur, chacune avec une durée de session. Nous pouvons ensuite calculer la durée moyenne de session. Pour cela, nous appliquons à nouveau la fonctionnalité de fenêtrage des données pour obtenir des fenêtres temporelles fixes, puis nous calculons la moyenne pour toutes les sessions qui se terminent chaque heure :

// Re-window to process groups of session sums according to when the sessions complete.
.apply(Window.named("WindowToExtractSessionMean")
      .<Integer>into(
          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
       new WriteWindowedToBigQuery<Double>(
          options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));

Les informations obtenues nous permettent de déterminer, par exemple, à quel moment de la journée nos utilisateurs jouent le plus longtemps ou à quel moment les sessions de jeu sont généralement les plus courtes.

Informations supplémentaires

Pour en savoir plus sur le fonctionnement de ces exemples, vous pouvez exécuter les pipelines proposés dans les Exemples de SDK Dataflow sur GitHub (branche master-1.x). Consultez le fichier README.md dans le répertoire d'exemples pour obtenir des instructions complètes sur l'exécution des exemples de pipelines inclus.

Les vidéos et articles de blog suivants fournissent des informations supplémentaires en contexte concernant le modèle unifié de traitement des données par flux et par lots de Dataflow :

  • Article de blog Dataflow & Spark : article de blog rédigé par les ingénieurs Google, qui présente une comparaison, à l'aide d'exemples, des modèles de programmation Dataflow et Apache Spark.
  • Dataflow talk @Scale : présentation vidéo issue de la conférence @Scale, réalisée par Frances Perry, ingénieur chez Google, qui explique le modèle Dataflow à l'aide d'une version antérieure des exemples de jeux mobiles décrits ci-dessus.
  • "The World Beyond Batch, Streaming 101 et Streaming 102" : série d'articles en deux parties publiée sur le blog O'Reilly par Tyler Akidau, ingénieur chez Google, consacrée aux perspectives futures du traitement Big Data.
Cette page vous a-t-elle été utile ? Évaluez-la :

Envoyer des commentaires concernant…

Besoin d'aide ? Consultez notre page d'assistance.