Ejemplos de canalizaciones de videojuegos para dispositivos móviles

En esta sección, se brinda una explicación de varios ejemplos de canalizaciones de Dataflow que demuestran una funcionalidad más compleja que el ejemplo de WordCount básico. Las canalizaciones en esta sección procesan datos de un juego hipotético que los usuarios juegan en sus teléfonos celulares. Las canalizaciones demuestran el procesamiento en niveles crecientes de complejidad. Por ejemplo, la primera canalización muestra cómo ejecutar un trabajo de análisis por lotes para obtener datos de puntuación relativamente simples, mientras que las canalizaciones posteriores usan las funciones del sistema de ventanas y de los activadores de Dataflow para proporcionar un análisis de datos de latencia baja y una información más compleja acerca de los patrones de juego del usuario.

Cada usuario genera un evento de datos cada vez que reproduce una instancia de nuestro supuesto juego para dispositivos móviles. Cada evento de datos consta de la información siguiente:

  • El ID único del usuario que juega.
  • El ID de equipo para el equipo al que pertenece el usuario
  • Un valor de puntuación para esa instancia particular de juego
  • Una marca de tiempo que registra cuándo ocurrió la instancia en particular del juego: este es el tiempo del evento para cada evento de datos del juego

Cuando el usuario completa una instancia del juego, su teléfono envía el evento de datos a un servidor del juego, en el que los datos se registran y almacenan en un archivo. En general, los datos se envían al servidor del juego inmediatamente después de su finalización. Sin embargo, los usuarios pueden jugar “sin conexión” cuando sus teléfonos no están en contacto con el servidor (como cuando están en un avión o fuera del área de cobertura de la red). Cuando el teléfono del usuario vuelva a estar en contacto con el servidor de juegos, el teléfono enviará todos los datos acumulados del juego.

Esto significa que el servidor del juego puede recibir algunos eventos de datos mucho después del momento en que los generan los usuarios. Esta diferencia de tiempo puede tener implicaciones de procesamiento para las canalizaciones que realizan cálculos que consideran cuándo se generó cada puntuación. Esas canalizaciones podrían realizar un seguimiento de las puntuaciones generadas durante cada hora del día, por ejemplo, o calcular la cantidad de tiempo que los usuarios juegan de forma continua, que dependen del tiempo del evento de cada registro de datos.

El ejemplo de canalizaciones de juegos para dispositivos móviles varía en complejidad, desde un simple análisis por lotes hasta canalizaciones más complejas que pueden realizar análisis en tiempo real y detección de abuso. En esta sección, se explica cada ejemplo y se demuestra cómo usar las funciones de Dataflow como el sistema de ventanas y los activadores para expandir las funciones de las canalizaciones.

UserScore: procesamiento de puntuación básico en lotes

La canalización UserScore es el ejemplo más simple para procesar datos de juegos para dispositivos móviles. UserScore determina la puntuación total por usuario en un conjunto de datos finitos (por ejemplo, todas las puntuaciones de un día almacenadas en el servidor del juego). Las canalizaciones como UserScore se ejecutan mejor de manera periódica después de que se recopilan todos los datos relevantes. Por ejemplo, UserScore podría ejecutarse como un trabajo nocturno sobre los datos recopilados durante ese día.

¿Qué hace UserScore?

En el valor de los datos de puntuación de un día, cada ID de usuario puede tener varios registros (si el usuario usa más de una instancia del juego durante la ventana de análisis), cada uno con su propio valor de puntuación y marca de tiempo. Si queremos determinar la puntuación total en todas las instancias que un usuario juega durante el día, nuestra canalización deberá agrupar todos los registros por usuario individual.

A medida que la canalización procesa los eventos, la puntuación de cada uno se agrega a la suma total para ese usuario en particular.

UserScore analiza solo los datos que necesita de cada registro, en especial, el ID del usuario y el valor de la puntuación. La canalización no tiene en cuenta el tiempo del evento para ningún registro; tan solo procesa todos los datos presentes en los archivos de entrada que especificas cuando ejecutas la canalización.

Esto es lo que hace el flujo de canalización básico de UserScore:

  1. Lee los datos de puntuación del día de un archivo almacenado en Google Cloud Storage.
  2. Suma los valores de puntuación para cada usuario único agrupando cada evento de juego por ID de usuario y combinando los valores de puntuación a fin de obtener la puntuación total para ese usuario específico.
  3. Escribe los datos del resultado en una tabla de BigQuery.

En el diagrama siguiente, se muestran los datos de puntuación de varios usuarios durante el período de análisis de la canalización. En el diagrama, cada dato es un evento que tiene como resultado un par usuario/puntuación:

Figura 1: datos de puntuación de tres usuarios.

En este ejemplo, se usa el procesamiento por lotes, y el eje Y del diagrama representa el tiempo de procesamiento: la canalización procesa los eventos más bajos en el eje Y primero y los eventos más arriba del eje después. El eje X del diagrama representa el tiempo del evento para cada evento del juego, como indica la marca de tiempo del evento. Ten en cuenta que la canalización no procesa los eventos individuales en el diagrama en el mismo orden en que ocurrieron (de acuerdo con sus marcas de tiempo).

Después de leer los eventos de puntuación del archivo de entrada, la canalización agrupa todos esos pares de usuario/puntuación y suma los valores de puntuación para completar un valor total por usuario único. UserScore encapsula la lógica central para ese paso como la transformación compuesta definida por el usuario ExtractAndSumScore:

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(
      PCollection<GameActionInfo> gameInfo) {

    return gameInfo
      .apply(MapElements
          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
      .apply(Sum.<String>integersPerKey());
  }
}

ExtractAndSumScore está escrito para ser más general, ya que puedes pasar el campo en el que quieres agrupar los datos (en el caso de nuestro juego, por usuario único o equipo único). Esto significa que podemos volver a usar ExtractAndSumScore en otras canalizaciones que agrupen datos de puntuación por equipo, por ejemplo.

Este es el método principal de UserScore, que muestra cómo aplicamos los tres pasos de la canalización:

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
    .apply("WriteUserScoreSums",
        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
                                                 configureBigQueryWrite()));

  // Run the batch pipeline.
  pipeline.run();
}

Trabaja con los resultados

UserScore escribe los datos en una tabla de BigQuery (llamada user_score de forma predeterminada). Con los datos en la tabla BigQuery, podríamos realizar un análisis interactivo más profundo, como consultar una lista de los usuarios N con mayor puntuación de un día determinado.

Supongamos que queremos determinar de manera interactiva los 10 usuarios con mayor puntuación en un día específico. En la interfaz de usuario de BigQuery, podemos ejecutar la siguiente consulta:

SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10

Limitaciones

Como se indica en el ejemplo, la canalización UserScore tiene algunas limitaciones:

  • Debido a que los jugadores sin conexión pueden generar datos de puntuación y enviarlos después del corte diario para los datos del juego, es posible que los datos de resultado generados por la canalización UserScore estén incompletos. UserScore solo procesa el conjunto de entrada fijo presente en los archivos de entrada cuando se ejecuta la canalización.
  • UserScore procesa todos los eventos de datos presentes en el archivo de entrada en el momento del procesamiento y no examina ni verifica de otra manera los eventos basados en el tiempo del evento. Por lo tanto, los resultados pueden incluir algunos valores cuyos tiempos del evento estén fuera del período de análisis relevante, como los registros tardíos del día anterior.
  • Debido a que UserScore se ejecuta solo después de que se recopilaron todos los datos, tiene una latencia alta entre el momento en que los usuarios generan eventos de datos (el tiempo del evento) y cuando se calculan los resultados (el tiempo de procesamiento).
  • UserScore también solo informa el total de los resultados de todo el día y no proporciona información más precisa sobre cómo se acumularon los datos durante el día.

Para analizar cómo puedes usar las funciones de Dataflow a fin de abordar estas limitaciones, empezaremos con el ejemplo de canalización a continuación.

HourlyTeamScore: procesamiento avanzado por lotes con sistema de ventanas

La canalización HourlyTeamScore amplía los principios básicos del análisis por lotes usados en la canalización UserScore y mejora algunas de sus limitaciones. HourlyTeamScore realiza un análisis más detallado mediante el uso de funciones adicionales en los SDK de Dataflow y la consideración de más aspectos de los datos del juego. Por ejemplo, HourlyTeamScore puede filtrar datos que no formen parte del período de análisis relevante.

Al igual que UserScore, se considera a HourlyTeamScore como un trabajo que se ejecutará de manera periódica después la recopilación de todos los datos relevantes (por ejemplo, una vez al día). La canalización lee un conjunto de datos fijo de un archivo y escribe los resultados en una tabla de BigQuery, al igual que UserScore.

¿Qué hace HourlyTeamScore?

HourlyTeamScore calcula la puntuación total por equipo, por hora, en un conjunto de datos fijo (como el valor de un día de datos).

  • En lugar de operar en todo el conjunto de datos a la vez, HourlyTeamScore divide los datos de entrada en ventanas lógicas y realiza cálculos en ellas. Esto permite que HourlyUserScore proporcione información sobre los datos de puntuación por ventana, en los que cada ventana representa el progreso de la puntuación del juego en intervalos fijos de tiempo (como una vez por hora).
  • HourlyTeamScore filtra los eventos de datos en función de si el tiempo de evento (como lo indica la marca de tiempo incrustada) se encuentra dentro del período de análisis relevante. En resumen, la canalización verifica la marca de tiempo de cada evento del juego y asegura que se encuentre dentro del rango que queremos analizar (en este caso, el día en cuestión). Los eventos de datos de días anteriores se descartan y no se incluyen en los totales de puntuación. Esto hace que HourlyTeamScore sea más robusto y menos propenso a generar datos de resultados erróneos que UserScore. También permite a la canalización tener en cuenta los datos que llegan tarde y que tienen una marca de tiempo dentro del período de análisis relevante.

A continuación, veremos en detalle cada una de estas mejoras en HourlyTeamScore:

Sistema de ventanas de tiempo fijo

El uso del sistema de ventanas de tiempo fijo permite que la canalización proporcione mejor información sobre cómo se acumulan los eventos en el conjunto de datos durante el transcurso del período de análisis. En nuestro caso, nos dice en qué momento del día estuvo activo cada equipo y cuánta puntuación lograron en esas horas.

En el diagrama siguiente, se muestra cómo la canalización procesa el valor de un día de los datos de puntuación de un solo equipo después de aplicar el sistema de ventanas de tiempo fijo:

Figura 2: datos de puntuación de dos equipos. Las puntuaciones de cada equipo se dividen en ventanas lógicas en función de cuándo se produjeron esas puntuaciones en el tiempo del evento.

Ten en cuenta que a medida que avanza el tiempo de procesamiento, las sumas se realizan por ventana, y cada ventana representa una hora del tiempo del evento durante el día en que se producen las puntuaciones.

La función de sistema de ventanas de Dataflow usa la información de la marca de tiempo intrínseca adjunta a cada elemento de un PCollection. Como queremos que nuestra canalización a ventana se base en el tiempo del evento, primero debemos extraer la marca de tiempo presente en cada registro de datos y aplicarla al elemento correspondiente en el PCollection de los datos de puntuación. Luego, la canalización puede aplicar la función de sistema de ventanas para dividir PCollection en ventanas lógicas.

Aquí está el código, que muestra cómo HourlyTeamScore usa las transformaciones WithTimestamps y Window para realizar estas operaciones:

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
       WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(Window.named("FixedWindowsTeam")
    .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getWindowDuration()))))

Observa que las transformaciones que usa la canalización para especificar las ventanas son distintas de las transformaciones de procesamiento de datos reales (como ExtractAndSumScores). Esta funcionalidad proporciona cierta flexibilidad en el diseño de tu canalización de Dataflow, ya que puedes ejecutar transformaciones existentes sobre conjuntos de datos con diferentes características del sistema de ventanas.

Filtra según el tiempo del evento

HourlyTeamScore usa el filtrado para quitar cualquier evento de nuestro conjunto de datos cuyas marcas de tiempo no se encuentren dentro del período de análisis relevante (es decir, que no se generaron durante el día que nos interesa). Esto evita que la canalización incluya por error cualquier dato que, por ejemplo, se generó sin conexión durante el día anterior, pero se envió al servidor del juego durante el día actual.

También permite que la canalización incluya datos tardíos relevantes (eventos de datos con marcas de tiempo, pero que llegaron después de la finalización de nuestro período de análisis). Por ejemplo, si el tiempo límite de nuestra canalización es 12:00 a.m., podemos ejecutar la canalización a las 2:00 a.m., pero deberíamos filtrar cualquier evento cuya marca de tiempo indique que ocurrió después del límite de las 12:00 a.m. Los eventos de datos que se retrasaron y llegaron entre las 12:01 a.m. y las 2:00 a.m., pero cuyas marcas de tiempo indican que ocurrieron antes del límite de las 12:00 a.m., se incluirían en el procesamiento de las canalizaciones.

HourlyTeamScore usa la transformación Filter para realizar esta operación. Cuando aplicas Filter, se especifica un predicado con el que se compara cada registro de datos. Se incluyen los registros de datos que pasan la comparación y se excluyen los eventos que no. En nuestro caso, el predicado es el tiempo límite que especificamos, y comparamos solo una parte de los datos: el campo de marca de tiempo.

El código siguiente muestra cómo HourlyTeamScore usa la transformación Filter para filtrar eventos que ocurren antes o después del período de análisis relevante:

.apply("FilterStartTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply("FilterEndTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

Calcula la puntuación por equipo y por ventana

HourlyTeamScore usa la misma transformación ExtractAndSumScores que la canalización UserScore, pero pasa una clave diferente (equipo, en lugar de usuario). Además, debido a que la canalización aplica ExtractAndSumScores después de aplicar el sistema de ventanas de tiempo fijo de 1 hora a los datos de entrada, los datos se agrupan por equipo y por ventana. Puedes ver la secuencia completa de transformaciones en el método principal de HourlyTeamScore:

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    // Parse the incoming data.
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))

    // Filter out data before and after the given times so that it is not included
    // in the calculations. As we collect data in batches (say, by day), the batch for the day
    // that we want to analyze could potentially include some late-arriving data from the previous
    // day. If so, we want to weed it out. Similarly, if we include data from the following day
    // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
    // that fall after the time period we want to analyze.
    .apply("FilterStartTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
    .apply("FilterEndTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

    // Add an element timestamp based on the event log, and apply fixed windowing.
    .apply("AddEventTimestamps",
           WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
    .apply(Window.named("FixedWindowsTeam")
        .<GameActionInfo>into(FixedWindows.of(
              Duration.standardMinutes(options.getWindowDuration()))))

    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
    .apply("WriteTeamScoreSums",
      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
          configureWindowedTableWrite()));

  pipeline.run();
}

Limitaciones

Tal como se describe, HourlyTeamScore aún tiene una limitación:

  • HourlyTeamScore todavía tiene una latencia alta entre los eventos de datos (el tiempo del evento) y el momento en que se generan resultados (el tiempo de procesamiento) porque, como canaliza por lotes, necesita esperar hasta que todos los eventos de datos estén presentes antes de comenzar a procesar.

LeaderBoard: procesamiento de transmisión con datos de juego en tiempo real

Una forma en que podemos ayudar a abordar el problema de latencia presente en las canalizaciones UserScore y HourlyTeamScore es mediante la lectura de los datos de puntuación de una fuente de transmisión. La canalización LeaderBoard pone en marcha el procesamiento de transmisión cuando lee los datos de puntuación del juego desde Google Cloud Pub/Sub, en lugar de hacerlo desde un archivo en el servidor del juego.

La canalización LeaderBoard también muestra cómo procesar los datos de la puntuación del juego con respecto al tiempo de procesamiento y al tiempo del evento. LeaderBoard genera datos sobre las puntuaciones de los usuarios individuales y las de los equipos, cada una con respecto a un marco de tiempo diferente.

Debido a que la canalización LeaderBoard lee los datos del juego desde una fuente de transmisión a medida que se generan, se puede considerar la canalización como un trabajo continuo que se ejecuta de manera simultánea con el proceso de juego. Es por esto que LeaderBoard puede proporcionar estadísticas de latencia baja sobre cómo los usuarios juegan en un momento determinado. Es útil si, por ejemplo, queremos proporcionar un marcador en vivo basado en la Web de modo que los usuarios pueden seguir su progreso con respecto a otros usuarios mientras juegan.

¿Qué hace LeaderBoard?

La canalización LeaderBoard lee los datos del juego publicados en Pub/Sub en tiempo casi real y los usa para realizar dos tareas de procesamiento distintas:

  • LeaderBoard calcula la puntuación total de cada usuario único y publica resultados especulativos por cada diez minutos de tiempo de procesamiento. Es decir, cada diez minutos, la canalización genera la puntuación total por usuario que la canalización procesó hasta la fecha. Este cálculo proporciona una “tabla principal” continua casi en tiempo real, sin tener en cuenta cuándo se generaron los eventos reales del juego.
  • LeaderBoard calcula las puntuaciones del equipo por cada hora que se ejecuta la canalización. Por ejemplo, esto es útil si queremos recompensar al equipo con la mejor puntuación por cada hora de juego. El cálculo de la puntuación del equipo usa el sistema de ventanas de tiempo fijo para dividir los datos de entrada en ventanas finitas de una hora en función del tiempo del evento (indicado por la marca de tiempo) a medida que los datos llegan a la canalización.

    Además, el cálculo de la puntuación del equipo usa los mecanismos del activador de Dataflow a fin de proporcionar resultados especulativos por hora (que se actualizan cada cinco minutos hasta que se acabe la hora) y, también, con el fin de capturar cualquier dato tardío y agregarlo a la ventana específica de una hora a la que pertenece.

A continuación, veremos estas dos tareas en detalle.

Cómo calcular la puntuación del usuario según el tiempo de procesamiento

Queremos que nuestra canalización genere una puntuación total continua para cada usuario por cada diez minutos que se ejecute la canalización. Este cálculo no considera cuándo la instancia de juego del usuario generó la puntuación real. Sino que simplemente genera la suma de todas las puntuaciones que hayan llegado a la canalización hasta la fecha para ese usuario. Los datos tardíos se incluyen en el cálculo cada vez que llegan a la canalización mientras se están ejecutando.

Como necesitamos todos los datos que llegan a la canalización cada vez que actualizamos nuestro cálculo, la canalización debe considerar todos los datos de puntuación de usuario en una ventana global única. La ventana global única es ilimitada, pero podemos especificar una categoría de punto de límite temporal para cada cálculo de diez minutos mediante el uso de un activador de tiempo de procesamiento.

Cuando especificamos un activador de tiempo de procesamiento de diez minutos para la ventana global única, la canalización toma eficazmente una “instantánea” de los contenidos de la ventana cada vez que el activador se activa (lo hace en intervalos de diez minutos). Como estamos usando una ventana global única, cada instantánea contiene todos los datos recopilados hasta ese momento. En el diagrama siguiente, se muestran los efectos de usar un activador del tiempo de procesamiento en la ventana global única:

Figura 3: datos de puntuación de tres usuarios. Las puntuaciones de cada usuario se agrupan en una ventana global única, con un activador que genera una instantánea para los resultados cada diez minutos.

A medida que avanza el tiempo de procesamiento y se procesan más puntuaciones, el activador genera la suma actualizada para cada usuario.

El ejemplo siguiente de código muestra cómo LeaderBoard establece el activador del tiempo de procesamiento para generar los datos destinados a las puntuaciones de los usuarios:

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing.
 * Get periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
    return input.apply("LeaderboardUserGlobalWindow",
        Window.<GameActionInfo>into(new GlobalWindows())
            // Get periodic results every ten minutes.
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
            .accumulatingFiredPanes()
            .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

Ten en cuenta que LeaderBoard usa un activador de acumulación para el cálculo de la puntuación del usuario (mediante la invocación de .accumulatingFiredPanes cuando se configura el activador). El uso de un activador de acumulación hace que la canalización acumule los datos emitidos con anterioridad junto con cualquier dato nuevo que haya llegado desde la última activación del activador. Esto garantiza a LeaderBoard una suma continua para las puntuaciones de los usuarios, en lugar de una colección de sumas individuales.

Calcula la puntuación del equipo según el tiempo del evento

Queremos que nuestra canalización también genere la puntuación total para cada equipo durante cada hora de juego. A diferencia del cálculo de puntuación del usuario, para las puntuaciones de equipo, nos preocupamos de saber en qué momento del tiempo del evento ocurrió cada puntuación en realidad, porque queremos considerar cada hora de juego individualmente. También queremos proporcionar actualizaciones especulativas a medida que avanza cada hora y permitir que cualquier instancia de datos tardíos (datos que llegan después de que los datos de una hora determinada se consideren completos) se incluya en nuestro cálculo.

Debido a que consideramos cada hora de forma individual, podemos aplicar el sistema de ventanas de tiempo fijo a nuestros datos de entrada, como en HourlyTeamScore. Para proporcionar las actualizaciones especulativas y las actualizaciones sobre datos tardíos, especificaremos parámetros de activador adicionales. El activador hará que cada ventana calcule y emita resultados en un intervalo que especifiquemos (en este caso, cada cinco minutos), y también que la activación continúe después de que la ventana se considere “completa” para incluir los datos tardíos. Al igual que el cálculo de la puntuación del usuario, estableceremos el activador en modo de acumulación para asegurarnos de obtener una suma continua por cada ventana de una hora de duración.

Los activadores para actualizaciones especulativas y datos tardíos ayudan con el problema de retraso de tiempo. Los eventos en la canalización no se procesan necesariamente en el orden en que ocurrieron según sus marcas de tiempo. Es posible que lleguen a la canalización en otro orden o tarde (en nuestro caso, porque se generaron cuando el teléfono del usuario no estaba en contacto con una red). Dataflow necesita una forma de determinar cuándo puede suponer de manera razonable que tiene “todos” los datos en una ventana determinada: esto se denomina marca de agua.

En un mundo ideal, todos los datos se procesarían de manera inmediata cuando ocurren, por lo que el tiempo de procesamiento sería igual (o al menos tendría una relación lineal) al tiempo del evento. Sin embargo, debido a que los sistemas distribuidos contienen alguna inexactitud inherente (como nuestros teléfonos con notificaciones tardías), Dataflow, a menudo, usa una marca de agua heurística.

En el diagrama siguiente, se muestra la relación entre el tiempo de procesamiento continuo y el tiempo del evento de cada puntuación para dos equipos:

Figura 4: datos de puntuación por equipo, con ventanas por tiempo del evento. Un activador basado en el tiempo de procesamiento hace que la ventana emita resultados tempranos especulativos y que incluya resultados tardíos.

La línea de puntos en el diagrama es la marca de agua “ideal”: la noción de Dataflow de cuándo se puede considerar de forma razonable que todos los datos en una ventana determinada han llegado. La línea sólida irregular representa la marca de agua real, según lo determinado por la fuente de datos (en nuestro caso, Pub/Sub).

Los datos que llegan por encima de la línea de marca de agua sólida son datos tardíos: un evento de puntuación que se retrasó (quizás se generó sin conexión) y llegó después de que se cerrara la ventana a la que pertenece. El activador de activación tardía de nuestra canalización garantiza que estos datos tardíos aún se incluyan en la suma.

En el ejemplo de código siguiente, se muestra cómo LeaderBoard aplica el sistema de ventanas de tiempo fijo con los activadores adecuados para que nuestra canalización realice los cálculos que queremos:

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
    return infos.apply("LeaderboardTeamFixedWindows",
        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
            // We will get early (speculative) results as well as cumulative
            // processing of late data.
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(FIVE_MINUTES))
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(TEN_MINUTES)))
            .withAllowedLateness(allowedLateness)
            .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

En conjunto, estas estrategias de procesamiento nos permiten abordar los problemas de latencia y compleción presentes en las canalizaciones UserScore y HourlyTeamScore, mientras usamos las mismas transformaciones básicas para procesar los datos. De hecho, ambos cálculos usan la misma transformación ExtractAndSumScore que usamos en las canalizaciones y HourlyTeamScore.

GameStats: análisis de uso y detección de abuso

Si bien LeaderBoard demuestra cómo usar activadores y sistemas de ventanas básicas para realizar análisis de datos flexibles y de latencia baja, podemos usar técnicas de sistemas de ventanas más avanzadas para realizar análisis más completos. Esto podría incluir algunos cálculos diseñados para detectar el abuso del sistema (como el spam) o conseguir información sobre el comportamiento del usuario. La canalización GameStats compila la funcionalidad de latencia baja en LeaderBoard a fin de demostrar cómo puedes usar Dataflow para realizar este tipo de análisis avanzado.

Al igual que LeaderBoard, GameStats lee los datos de una fuente de transmisión, en este caso, Pub/Sub. Se lo considera como un trabajo continuo que proporciona estadísticas sobre el juego mientras los usuarios juegan.

¿Qué hace GameStats?

Al igual que LeaderBoard, GameStats calcula la puntuación total por equipo por hora. Sin embargo, la canalización también realiza dos tipos de análisis más complejos:

  • GameStats aplica un sistema de detección de abuso que realiza un análisis estadístico simple de los datos de puntuación para determinar qué usuarios, si los hay, pueden ser spammers o bots. Luego, usa la lista de usuarios sospechosos de spam/bot para filtrar los bots del cálculo de puntuación por hora del equipo.
  • GameStats analiza los patrones de uso mediante la agrupación de los datos del juego que comparten tiempos de eventos similares con el sistema de ventanas de sesión. Esto nos permite obtener información acerca de cuánto tiempo los usuarios suelen jugar y cómo cambia la duración del juego a lo largo del tiempo.

A continuación, analizaremos estas funciones en detalle.

Detección de abuso

Supongamos que la puntuación en nuestro juego depende de la velocidad a la que un usuario puede “hacer clic” en su teléfono. La detección de abuso de GameStats analiza los datos de puntuación de cada usuario para detectar si un usuario tiene una “tasa de clics” más alta de lo normal y, por lo tanto, una puntuación más alta de lo normal. Esto podría indicar que un bot está jugando y operando mucho más rápido de lo que podría jugar una persona.

Para determinar si una puntuación es o no más alta "de lo normal", GameStats calcula el promedio de cada puntuación en esa ventana de tiempo fijo y, luego, compara cada puntuación individual con la puntuación promedio multiplicada por un factor de peso arbitrario (en nuestro caso, 2.5). Por lo tanto, cualquier puntuación de más de 2.5 veces el promedio se considera producto de spam. La canalización GameStats hace seguimiento de una lista de usuarios “spam” y los filtra fuera de los cálculos de puntuación del equipo para la tabla principal del equipo.

Dado que el promedio depende de los datos de la canalización, debemos calcularlo y, luego, usar esos datos calculados en una transformación ParDo posterior que filtra las puntuaciones que exceden el valor ponderado. Para hacer esto, podemos pasar el promedio calculado como una entrada lateral al filtro ParDo.

En el ejemplo de código siguiente, se muestra la transformación compuesta que controla la detección de abuso. La transformación usa la transformación Sum.integersPerKey con el fin de sumar todos las puntuaciones por usuario y, luego, la transformación Mean.globally a fin de determinar la puntuación promedio de todos los usuarios. Una vez que se calculó (como un singleton PCollectionView), podemos pasarlo al filtro ParDo con .withSideInputs:

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores = userScores
        .apply("UserSum", Sum.<String>integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
        .apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered = sumScores
        .apply(ParDo
            .named("ProcessAndFilter")
            // use the derived mean total score as a side input
            .withSideInputs(globalMeanScore)
            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
              private final Aggregator<Long, Long> numSpammerUsers =
                createAggregator("SpammerUsers", new Sum.SumLongFn());
              @Override
              public void processElement(ProcessContext c) {
                Integer score = c.element().getValue();
                Double gmc = c.sideInput(globalMeanScore);
                if (score > (gmc * SCORE_WEIGHT)) {
                  LOG.info("user " + c.element().getKey() + " spammer score " + score
                      + " with mean " + gmc);
                  numSpammerUsers.addValue(1L);
                  c.output(c.element());
                }
              }
            }));
    return filtered;
  }
}

La transformación de detección de abuso genera una vista de los usuarios sospechosos de ser spambots. Más adelante en la canalización, utilizamos esa vista para filtrar a cualquiera de esos usuarios cuando calculamos la puntuación del equipo por hora, nuevamente con el mecanismo de entrada lateral. En el ejemplo de código siguiente, se muestra dónde insertamos el filtro de spam, entre la calificación de las puntuaciones por sistemas de ventanas en ventanas fijas y la extracción de las puntuaciones del equipo:

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
  .apply(Window.named("WindowIntoFixedWindows")
      .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getFixedWindowDuration())))
      )
  // Filter out the detected spammer users, using the side input derived above.
  .apply(ParDo.named("FilterOutSpammers")
          .withSideInputs(spammersView)
          .of(new DoFn<GameActionInfo, GameActionInfo>() {
            @Override
            public void processElement(ProcessContext c) {
              // If the user is not in the spammers Map, output the data element.
              if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                c.output(c.element());
              }
            }}))
  // Extract and sum teamname/score pairs from the event data.
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

Analiza patrones de uso

Podemos obtener información sobre cuándo los usuarios juegan nuestro juego y durante cuánto tiempo. Para esto, debemos examinar las horas del evento de la puntuación de cada juego y agrupar las puntuaciones con horas del evento similares en sesiones. GameStats usa la función de sistema de ventanas de sesión incorporada de Dataflow para agrupar las puntuaciones de los usuarios en sesiones según la hora en que ocurrieron.

Cuando configuras el sistema de ventanas de sesión, especificas una duración de intervalo mínima entre eventos. Se agrupan en la misma ventana todos los eventos cuyas horas de llegada se acercan más a la duración de intervalo mínima. Se agrupan en ventanas separadas los eventos en que la diferencia en la hora de llegada es mayor que el intervalo. Según cómo establezcamos la duración de intervalo mínima, podemos suponer con seguridad que las puntuaciones en la misma ventana de sesión son parte del mismo tramo de juego (relativamente) ininterrumpido. Las puntuaciones en una ventana diferente indican que el usuario dejó de jugar el juego durante, al menos, el tiempo de intervalo mínimo antes de volver más tarde.

En el siguiente diagrama, se muestra cómo podrían verse los datos cuando se agrupan en ventanas de sesión. A diferencia de las ventanas fijas, las ventanas de sesión son diferentes para cada usuario y dependen del patrón de juego de cada usuario:

Un diagrama que representa un sistema de ventanas de sesión
Figura 5: Sesiones de usuario con una duración de intervalo mínima. Ten en cuenta cómo cada usuario tiene sesiones diferentes según la cantidad de instancias que jueguen y el tiempo de descanso entre instancias.

Podemos usar los datos de sesiones de ventanas si queremos determinar la duración promedio del tiempo de juego ininterrumpido de todos nuestros usuarios, además de la puntuación total que alcanzan durante cada sesión. Podemos hacer esto en el código, primero mediante la aplicación de ventanas de sesión, la suma de la puntuación por usuario y sesión y, luego, con una transformación para calcular la duración de cada sesión individual:

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
  .apply(Window.named("WindowIntoSessions")
        .<KV<String, Integer>>into(
              Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
    .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
  // For this use, we care only about the existence of the session, not any particular
  // information aggregated over it, so the following is an efficient way to do that.
  .apply(Combine.perKey(x -> 0))
  // Get the duration per session.
  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))

Esto nos da un conjunto de sesiones de usuario, cada una con una duración adjunta. A continuación, podemos calcular la duración promedio de las sesiones si volvemos a clasificar los datos en sistemas de ventanas de tiempo fijo y, luego, calculamos el promedio de todas las sesiones que finalizan cada hora:

// Re-window to process groups of session sums according to when the sessions complete.
.apply(Window.named("WindowToExtractSessionMean")
      .<Integer>into(
          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
       new WriteWindowedToBigQuery<Double>(
          options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));

Podemos usar la información resultante para encontrar, por ejemplo, las horas del día en que los usuarios juegan durante más tiempo o los tramos del día con mayor probabilidad de ver sesiones de juego más cortas.

Más información

Puedes obtener más información sobre cómo funcionan estos ejemplos si ejecutas las canalizaciones de ejemplo, que se incluyen en los ejemplos del SDK de Dataflow (rama master-1.x) en GitHub. Consulta README.md en el directorio de ejemplos para obtener instrucciones completas sobre cómo ejecutar las canalizaciones de ejemplo incluidas.

Los videos y las entradas de blog que se muestran a continuación proporcionan más información y contexto sobre el modelo de datos unificados de Dataflow para el procesamiento de transmisión y por lotes.

  • Entrada de blog de Dataflow & Spark: una entrada de blog creada por ingenieros de Google que usa ejemplos en el dominio de los videojuegos para dispositivos móviles a fin de ilustrar cómo se compara el modelo de Dataflow con el de Apache Spark.
  • Dataflow talk @Scale: Una presentación en video tomada de la conferencia @Scala, en manos del ingeniero de Google Frances Perry, que explica el modelo de Dataflow con una versión anterior de los ejemplos de videojuegos para dispositivos móviles descritos con anterioridad.
  • “El mundo más allá de los lotes: transmisión 101 y transmisión 102”: Una serie de dos partes sobre entradas en el blog O'Reilly del ingeniero de Google Tyler Akidau, que examina el futuro del procesamiento de los macrodatos.