Ejemplos de canalizaciones de videojuegos para dispositivos móviles

En esta sección, se brinda una explicación de varios ejemplos de canalizaciones de Dataflow que demuestran una funcionalidad más compleja que el ejemplo de WordCount básico. Las canalizaciones en esta sección procesan datos de un juego hipotético que los usuarios juegan en sus teléfonos celulares. Las canalizaciones demuestran el procesamiento en niveles crecientes de complejidad. Por ejemplo, la primera canalización muestra cómo ejecutar un trabajo de análisis por lotes para obtener datos de puntuación relativamente simples, mientras que las canalizaciones posteriores usan las funciones del sistema de ventanas y de los activadores de Dataflow para proporcionar un análisis de datos de latencia baja y una información más compleja acerca de los patrones de juego del usuario.

Cada usuario genera un evento de datos cada vez que reproduce una instancia de nuestro juego hipotético para dispositivos móviles. Cada evento de datos consta de la siguiente información:

  • El ID único del usuario que juega
  • El ID de equipo para el equipo al que pertenece el usuario
  • Un valor de puntuación para esa instancia particular de juego
  • Una marca de tiempo que registra cuándo ocurrió la instancia particular del juego: este es el tiempo del evento para cada evento de datos del juego

Cuando el usuario completa una instancia del juego, su teléfono envía el evento de datos a un servidor del juego, en el que los datos se registran y se almacenan en un archivo. En general, los datos se envían al servidor del juego inmediatamente después de su finalización. Sin embargo, los usuarios pueden jugar “sin conexión” cuando sus teléfonos no están en contacto con el servidor (como cuando están en un avión o fuera del área de cobertura de la red). Cuando el teléfono del usuario vuelva a estar en contacto con el servidor de juegos, el teléfono enviará todos los datos acumulados del juego.

Esto significa que el servidor del juego puede recibir algunos eventos de datos mucho después del momento en que los generan los usuarios. Esta diferencia de tiempo puede tener implicaciones de procesamiento para las canalizaciones que realizan cálculos que consideran cuándo se generó cada puntuación. Esas canalizaciones podrían realizar un seguimiento de las puntuaciones generadas durante cada hora del día, por ejemplo, o calcular la cantidad de tiempo que los usuarios ocupan continuamente en el juego, los cuales dependen del tiempo del evento de cada registro de datos.

El ejemplo de canalizaciones de juegos para dispositivos móviles varía en complejidad, desde un simple análisis por lotes hasta canalizaciones más complejas, que pueden realizar análisis en tiempo real y detección de abuso. En esta sección, se explica cada ejemplo y se demuestra cómo usar las funciones de Dataflow como sistemas de ventanas y activadores para expandir las funciones de las canalizaciones.

UserScore: Procesamiento de puntuación básico en lotes

La canalización UserScore es el ejemplo más simple para el procesamiento de datos de juegos en dispositivos móviles. UserScore determina la puntuación total por usuario sobre un conjunto de datos finitos (por ejemplo, el valor de las puntuaciones de un día almacenadas en el servidor del juego). Las canalizaciones como UserScore se ejecutan mejor periódicamente después de que se recopilan todos los datos relevantes. Por ejemplo, UserScore podría ejecutarse como un trabajo nocturno sobre los datos recopilados durante ese día.

¿Qué hace el UserScore?

En el valor de los datos de puntuación de un día, cada ID de usuario puede tener varios registros (si el usuario usa más de una instancia del juego durante la ventana de análisis), cada uno con su propio valor de puntuación y marca de tiempo. Si queremos determinar la puntuación total en todas las instancias que un usuario juega durante el día, nuestra canalización deberá agrupar todos los registros por usuario individual.

A medida que la canalización procesa cada evento, la puntuación del evento se agrega a la suma total para ese usuario en particular.

UserScore analiza solo los datos que necesita de cada registro, específicamente el ID del usuario y el valor de la puntuación. La canalización no considera el tiempo del evento para ningún registro; simplemente procesa todos los datos presentes en los archivos de entrada que especificas cuando ejecutas la canalización.

El flujo de canalización básica de UserScore hace lo siguiente:

  1. Lee los datos de puntuación del día de un archivo almacenado en Google Cloud Storage.
  2. Suma los valores de puntuación para cada usuario único agrupando cada evento de juego por ID de usuario y combinando los valores de puntuación a fin de obtener la puntuación total para ese usuario específico.
  3. Escribe los datos del resultado en una tabla de BigQuery.

El diagrama siguiente muestra los datos de puntuación de varios usuarios durante el período de análisis de la canalización. En el diagrama, cada dato es un evento que tiene como resultado un par usuario/puntuación:

Figura 1: Datos de puntuación para tres usuarios

En este ejemplo, se usa el procesamiento por lotes, y el eje Y del diagrama representa el tiempo de procesamiento: la canalización procesa los eventos más bajos en el eje Y primero y los eventos más arriba del eje después. El eje X del diagrama representa el tiempo del evento para cada evento del juego, como indica la marca de tiempo del evento. Ten en cuenta que la canalización no procesa los eventos individuales en el diagrama en el mismo orden en que ocurrieron (de acuerdo con sus marcas de tiempo).

Después de leer los eventos de puntuación del archivo de entrada, la canalización agrupa todos esos pares de usuario/puntuación y suma los valores de puntuación para completar un valor total por usuario único. UserScore encapsula la lógica central para ese paso como transformación compuesta definida por el usuario ExtractAndSumScore:

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(
      PCollection<GameActionInfo> gameInfo) {

    return gameInfo
      .apply(MapElements
          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
      .apply(Sum.<String>integersPerKey());
  }
}

ExtractAndSumScore está escrito para ser más general, ya que puedes pasar el campo por el que deseas agrupar los datos (en el caso de nuestro juego, por usuario único o equipo único). Esto significa que podemos volver a usar ExtractAndSumScore en otras canalizaciones que agrupan datos de puntuación por equipo, por ejemplo.

Aquí está el método principal de UserScore, que muestra cómo aplicamos los tres pasos de la canalización:

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
    .apply("WriteUserScoreSums",
        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
                                                 configureBigQueryWrite()));

  // Run the batch pipeline.
  pipeline.run();
}

Cómo trabajar con los resultados

UserScore escribe los datos en una tabla de BigQuery (llamada user_score por configuración predeterminada). Con los datos en la tabla de BigQuery, podríamos realizar un análisis interactivo adicional, como consultar una lista de los N usuarios con mayor puntuación en un día determinado.

Supongamos que queremos determinar de manera interactiva los 10 usuarios con mayor puntuación en un día específico. En la interfaz de usuario de BigQuery, podemos ejecutar la siguiente consulta:

SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10

Limitaciones

Como está escrito en el ejemplo, la canalización UserScore tiene algunas limitaciones:

  • Debido a que jugadores sin conexión pueden generar algunos datos de puntuación y enviarlos después del punto límite diario, los datos de resultados que genera la canalización UserScore para los datos del juego pueden estar incompletos. UserScore solo procesa el conjunto de entrada fijo presente en los archivos de entrada cuando se ejecuta la canalización.
  • UserScore procesa todos los eventos de datos presentes en el archivo de entrada en el momento del procesamiento y no examina ni verifica los eventos de error en función del tiempo del evento. Por lo tanto, los resultados pueden incluir algunos valores cuyos tiempos del evento están fuera del período de análisis relevante, como los registros tardíos del día anterior.
  • Debido a que UserScore se ejecuta solo después de que se hayan recopilado todos los datos, tiene latencia alta entre el momento en que los usuarios generan eventos de datos (el tiempo del evento) y el momento en que se calculan los resultados (el tiempo de procesamiento).
  • UserScore también solo informa los resultados totales de todo el día y no proporciona información más detallada sobre cómo se acumulan los datos durante el día.

Para analizar cómo puedes usar las funciones de Dataflow a fin de abordar estas limitaciones, empezaremos con el ejemplo de canalización a continuación.

HourlyTeamScore: Procesamiento avanzado por lotes con sistema de ventanas

La canalización HourlyTeamScore amplía los principios básicos de análisis por lotes que se usan en la canalización UserScore y mejora algunas de sus limitaciones. Para realizar un análisis más detallado, HourlyTeamScore usa funciones adicionales en los SDK de Dataflow y considera más aspectos de los datos del juego. Por ejemplo, HourlyTeamScore puede filtrar los datos que no forman parte del período de análisis relevante.

Al igual que UserScore, se considera que HourlyTeamScore es un trabajo que debe ejecutarse periódicamente después de que se recopilaron todos los datos relevantes (por ejemplo, una vez por día). La canalización lee un conjunto de datos fijo de un archivo y escribe los resultados en una tabla de BigQuery, como UserScore.

¿Qué hace HourlyTeamScore?

HourlyTeamScore calcula la puntuación total por equipo, por hora, en un conjunto de datos fijo (como el valor de un día de datos).

  • En lugar de operar en todo el conjunto de datos a la vez, HourlyTeamScore divide los datos de entrada en ventanas lógicas y realiza cálculos en esas ventanas. Esto permite que HourlyUserScore proporcione información sobre los datos de puntuación por ventana: cada ventana representa el progreso de la puntuación del juego a intervalos fijos en el tiempo (por ejemplo, una vez por hora).
  • HourlyTeamScore filtra los eventos de datos según si su tiempo del evento (como lo indica la marca de tiempo incorporada) se incluye en el período de análisis relevante. Básicamente, la canalización verifica la marca de tiempo de cada evento del juego y se asegura de que se encuentre dentro del rango que queremos analizar (en este caso, el día en cuestión). Los eventos de datos de días anteriores se descartan y no se incluyen en los totales de puntuación. Esto hace que HourlyTeamScore sea más sólido y menos propenso a datos de resultados erróneos que UserScore. También permite a la canalización tener en cuenta los datos que llegan tarde y que tienen una marca de tiempo dentro del período de análisis relevante.

A continuación, examinaremos cada una de estas mejoras en HourlyTeamScore en detalle:

Sistema de ventanas de tiempo fijo

El uso del sistema de ventanas de tiempo fijo permite que la canalización proporcione mejor información sobre cómo se acumulan los eventos en el conjunto de datos durante el transcurso del período de análisis. En nuestro caso, nos dice en qué momento del día estuvo activo cada equipo y cuánta puntuación lograron en esas horas.

En el siguiente diagrama, se muestra cómo la canalización procesa el valor de un día de los datos de puntuación de un solo equipo después de aplicar el sistema de ventanas de tiempo fijo:

Figura 2: Datos de puntuación para dos equipos. Las puntuaciones de cada equipo se dividen en ventanas lógicas en función de cuándo se produjeron esas puntuaciones en el tiempo del evento.

Ten en cuenta que a medida que avanza el tiempo de procesamiento, las sumas ahora son por ventana, y cada ventana representa una hora del tiempo del evento durante el día en que se producen las puntuaciones.

La función del sistema de ventanas de Dataflow usa la información de marca de tiempo intrínseca adjunta a cada elemento de una PCollection. Si queremos que nuestra canalización a ventana se base en el tiempo del evento, primero debemos extraer la marca de tiempo que está incorporada en cada registro de datos, aplicarla al elemento correspondiente en la PCollection de los datos de puntuación. Luego, la canalización puede aplicar la función de sistema de ventanas para dividir la PCollection en ventanas lógicas.

Este es el código que muestra cómo HourlyTeamScore usa las transformaciones WithTimestamps y Window para realizar estas operaciones.

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
       WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(Window.named("FixedWindowsTeam")
    .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getWindowDuration()))))

Ten en cuenta que las transformaciones que utiliza la canalización para especificar el sistema de ventanas son distintas de las transformaciones de procesamiento de datos reales (como ExtractAndSumScores). Esta funcionalidad te proporciona cierta flexibilidad en el diseño de tu canalización de Dataflow, ya que puedes ejecutar transformaciones existentes sobre conjuntos de datos con diferentes características del sistema de ventanas.

Cómo filtrar según el tiempo del evento

HourlyTeamScore utiliza el filtrado para quitar cualquier evento de nuestro conjunto de datos cuyas marcas de tiempo no se encuentren dentro del período de análisis relevante (es decir, no se generaron durante el día que nos interesa). Esto evita que la canalización incluya erróneamente cualquier dato que, por ejemplo, se generó sin conexión durante el día anterior, pero se envió al servidor del juego durante el día actual.

También permite que la canalización incluya datos tardíos relevantes (eventos de datos con marcas de tiempo, pero que llegaron después de la finalización de nuestro período de análisis). Por ejemplo, si el tiempo límite de nuestra canalización es 12:00 a.m., podemos ejecutar la canalización a las 2:00 a.m., pero deberíamos filtrar cualquier evento cuya marca de tiempo indique que ocurrió después del límite de las 12:00 a.m. Los eventos de datos que se retrasaron y llegaron entre las 12:01 a.m. y las 2:00 a.m., pero cuyas marcas de tiempo indican que ocurrieron antes del límite de las 12:00 a.m., se incluirían en el procesamiento de las canalizaciones.

HourlyTeamScore usa la transformación Filter para realizar esta operación. Cuando aplicas Filter, estás especificando un predicado con el que se compara cada registro de datos. Se incluyen los registros de datos que pasan la comparación y se excluyen los eventos que no. En nuestro caso, el predicado es el tiempo límite que especificamos, y comparamos solo una parte de los datos: el campo de marca de tiempo.

El código siguiente muestra cómo HourlyTeamScore utiliza la transformación Filter para filtrar los eventos que ocurren antes o después del período de análisis relevante:

.apply("FilterStartTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply("FilterEndTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

Cómo calcular la puntuación por equipo y por ventana

HourlyTeamScore usa la misma transformación ExtractAndSumScores que la canalización de UserScore, pero pasa una clave diferente (equipo, a diferencia de usuario). Además, debido a que la canalización aplica ExtractAndSumScores después de aplicar el sistema de ventanas de tiempo fijo de 1 hora a los datos de entrada, los datos se agrupan por equipo y por ventana. Puedes ver una secuencia completa de transformaciones en el método principal de HourlyTeamScore:

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    // Parse the incoming data.
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))

    // Filter out data before and after the given times so that it is not included
    // in the calculations. As we collect data in batches (say, by day), the batch for the day
    // that we want to analyze could potentially include some late-arriving data from the previous
    // day. If so, we want to weed it out. Similarly, if we include data from the following day
    // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
    // that fall after the time period we want to analyze.
    .apply("FilterStartTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
    .apply("FilterEndTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

    // Add an element timestamp based on the event log, and apply fixed windowing.
    .apply("AddEventTimestamps",
           WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
    .apply(Window.named("FixedWindowsTeam")
        .<GameActionInfo>into(FixedWindows.of(
              Duration.standardMinutes(options.getWindowDuration()))))

    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
    .apply("WriteTeamScoreSums",
      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
          configureWindowedTableWrite()));

  pipeline.run();
}

Limitaciones

Como está escrito, HourlyTeamScore todavía tiene una limitación:

  • HourlyTeamScore todavía tiene una latencia alta entre el momento en que ocurren los eventos de datos (el tiempo del evento) y el momento en que se generan los resultados (el tiempo de procesamiento), ya que, como una canalización por lotes, debe esperar para comenzar a procesarse hasta que todos los eventos de datos estén presentes.

LeaderBoard: Procesamiento de transmisión con datos de juego en tiempo real

Una forma en que podemos ayudar a solucionar el problema de latencia presente en las canalizaciones de UserScore y HourlyTeamScore es mediante la lectura de los datos de puntuación de una fuente de transmisión. La canalización LeaderBoard ingresa el procesamiento de transmisión mediante la lectura de los datos de puntuación del juego desde Google Cloud Pub/Sub, y no desde un archivo en el servidor del juego.

La canalización LeaderBoard también demuestra cómo procesar los datos de puntuación del juego con respecto al tiempo de procesamiento y al tiempo del evento. LeaderBoard genera datos sobre puntuaciones de usuario individual y puntuaciones de equipo, cada una con respecto a un período diferente.

Debido a que la canalización de LeaderBoard lee los datos del juego desde una fuente de transmisión a medida que se generan esos datos, puedes pensar en la canalización como un trabajo en curso que se ejecuta simultáneamente con el proceso del juego. LeaderBoard puede proporcionar información de latencia baja sobre cómo los usuarios juegan en cualquier momento (por ejemplo, es útil si deseamos proporcionar puntuaciones en vivo basadas en la Web para que los usuarios puedan seguir su progreso frente a otros usuarios mientras juegan).

¿Qué hace LeaderBoard?

La canalización LeaderBoard lee los datos del juego publicados en Pub/Sub casi en tiempo real, y utiliza esos datos para realizar dos tareas de procesamiento separadas:

  • LeaderBoard calcula la puntuación total para cada usuario único y publica resultados especulativos por cada diez minutos del tiempo de procesamiento. Es decir, cada diez minutos, la canalización genera la puntuación total por usuario que la canalización procesó hasta la fecha. Este cálculo proporciona una “tabla principal” continua casi en tiempo real, independientemente de cuándo se generaron los eventos reales del juego.
  • LeaderBoard calcula las puntuaciones del equipo para cada hora que se ejecuta la canalización. Por ejemplo, esto es útil si queremos recompensar al equipo con la mejor puntuación por cada hora de juego. El cálculo de la puntuación del equipo usa el sistema de ventanas de tiempo fijo para dividir los datos de entrada en ventanas finitas de una hora de duración en función del tiempo del evento (indicado por la marca de tiempo) a medida que los datos llegan a la canalización.

    Además, el cálculo de la puntuación del equipo utiliza los mecanismos de activador de Dataflow a fin de proporcionar resultados especulativos para cada hora (que se actualizan cada cinco minutos hasta que se acabe la hora), y también con el fin de capturar cualquier dato tardío y agregarlo a la ventana específica de una hora de duración a la que pertenece.

A continuación, veremos estas dos tareas en detalle.

Cómo calcular la puntuación del usuario según el tiempo de procesamiento

Queremos que nuestra canalización genere una puntuación total continua para cada usuario por cada diez minutos que se ejecute la canalización. Este cálculo no considera cuándo la instancia de juego del usuario generó la puntuación real. Sino que simplemente genera la suma de todas las puntuaciones que hayan llegado a la canalización hasta la fecha para ese usuario. Los datos tardíos se incluyen en el cálculo cada vez que llegan a la canalización mientras se están ejecutando.

Como queremos todos los datos que llegan a la canalización cada vez que actualizamos nuestro cálculo, la canalización debe considerar todos los datos de puntuación de usuario en una ventana global única. La ventana global única es ilimitada, pero podemos especificar un tipo de punto de límite temporal para cada cálculo de diez minutos mediante el uso de un activador de tiempo de procesamiento.

Cuando especificamos un activador de tiempo de procesamiento de diez minutos para la ventana global única, la canalización toma eficazmente una “instantánea” de los contenidos de la ventana cada vez que el activador se activa (lo hace en intervalos de diez minutos). Como estamos usando una ventana global única, cada instantánea contiene todos los datos recopilados hasta ese momento. El siguiente diagrama muestra los efectos de usar un activador del tiempo de procesamiento en la ventana global única:

Figura 3: Datos de puntuación para tres usuarios. Las puntuaciones de cada usuario se agrupan en una ventana global única, con un activador que genera una instantánea para los resultados cada diez minutos.

A medida que avanza el tiempo de procesamiento y se procesan más puntuaciones, el activador genera la suma actualizada para cada usuario.

El siguiente ejemplo de código muestra cómo LeaderBoard establece el activador de tiempo de procesamiento a fin de generar los datos para las puntuaciones de los usuarios:

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing.
 * Get periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
    return input.apply("LeaderboardUserGlobalWindow",
        Window.<GameActionInfo>into(new GlobalWindows())
            // Get periodic results every ten minutes.
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
            .accumulatingFiredPanes()
            .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

Ten en cuenta que LeaderBoard usa un activador de acumulación para el cálculo de la puntuación del usuario (mediante la invocación de .accumulatingFiredPanes cuando configuras el activador). El uso de un activador de acumulación hace que la canalización acumule los datos emitidos previamente junto con cualquier dato nuevo que haya llegado desde la última activación del activador. Esto garantiza a LeaderBoard una suma continua para las puntuaciones de los usuarios, en lugar de una colección de sumas individuales.

Cómo calcular la puntuación del equipo según el tiempo del evento

Queremos que nuestra canalización también genere la puntuación total para cada equipo durante cada hora de juego. A diferencia del cálculo de puntuación del usuario, para las puntuaciones de equipo, nos preocupamos de saber en qué momento del tiempo del evento ocurrió cada puntuación en realidad, porque queremos considerar cada hora de juego individualmente. También queremos proporcionar actualizaciones especulativas a medida que avanza cada hora y permitir que cualquier instancia de datos tardíos (datos que llegan después de que los datos de una hora determinada se consideren completos) se incluyan en nuestro cálculo.

Debido a que consideramos cada hora de forma individual, podemos aplicar sistema de ventanas de tiempo fijo a nuestros datos de entrada, como en HourlyTeamScore. Para proporcionar las actualizaciones especulativas y las actualizaciones sobre datos tardíos, especificaremos parámetros de activador adicionales. El activador hará que cada ventana calcule y emita resultados en un intervalo que especifiquemos (en este caso, cada cinco minutos), y también que la activación continúe después de que la ventana se considere “completa” para incluir los datos tardíos. Al igual que el cálculo de la puntuación del usuario, estableceremos el activador en modo de acumulación para asegurarnos de obtener una suma continua por cada ventana de una hora de duración.

Los activadores para actualizaciones especulativas y datos tardíos ayudan con el problema de retraso de tiempo. Los eventos en la canalización no se procesan necesariamente en el orden en que ocurrieron según sus marcas de tiempo. Es posible que lleguen a la canalización en otro orden o tarde (en nuestro caso, porque se generaron cuando el teléfono del usuario no estaba en contacto con una red). Dataflow necesita una forma de determinar cuándo puede suponer razonablemente que tiene "todos" los datos en una ventana determinada: esto se denomina marca de agua.

En un mundo ideal, todos los datos se procesarían de manera inmediata cuando ocurren, por lo que el tiempo de procesamiento sería igual (o al menos tendría una relación lineal) al tiempo del evento. Sin embargo, debido a que los sistemas distribuidos contienen alguna inexactitud inherente (como nuestros teléfonos con notificaciones tardías), Dataflow, a menudo, usa una marca de agua heurística.

En el siguiente diagrama, se muestra la relación entre el tiempo de procesamiento continuo y el tiempo del evento de cada puntuación para dos equipos:

Figura 4: Datos de puntuación por equipo, con ventanas por tiempo del evento. Un activador basado en el tiempo de procesamiento hace que la ventana emita resultados tempranos especulativos y que incluya resultados tardíos.

La línea de puntos en el diagrama es la marca de agua “ideal”: la noción de Dataflow de cuándo se puede considerar razonablemente que todos los datos en una ventana determinada hayan llegado. La línea sólida irregular representa la marca de agua real, según lo determinado por la fuente de datos (en nuestro caso, Pub/Sub).

Los datos que llegan por encima de la línea de marca de agua sólida son datos tardíos: un evento de puntuación que se retrasó (quizás se generó sin conexión) y llegó después de que se cerrara la ventana a la que pertenece. El activador de activación tardía de nuestra canalización garantiza que estos datos tardíos aún se incluyan en la suma.

En el siguiente ejemplo de código, se muestra cómo LeaderBoard aplica el sistema de ventanas de tiempo fijo con los activadores adecuados para que nuestra canalización realice los cálculos que deseamos:

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
    return infos.apply("LeaderboardTeamFixedWindows",
        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
            // We will get early (speculative) results as well as cumulative
            // processing of late data.
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(FIVE_MINUTES))
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(TEN_MINUTES)))
            .withAllowedLateness(allowedLateness)
            .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

En conjunto, estas estrategias de procesamiento nos permiten abordar los problemas de integridad y de latencia presentes en las canalizaciones UserScore y HourlyTeamScore, mientras seguimos con las mismas transformaciones básicas para procesar los datos (de hecho, ambos cálculos todavía usan la misma transformación ExtractAndSumScore que usamos en las canalizaciones UserScore y HourlyTeamScore).

GameStats: Análisis de uso y detección de abuso

Si bien LeaderBoard demuestra cómo usar activadores y sistemas de ventanas básicas para realizar análisis de datos flexibles y de latencia baja, podemos usar técnicas de sistemas de ventanas más avanzadas para realizar análisis más completos. Esto podría incluir algunos cálculos diseñados para detectar el abuso del sistema (como el spam) o conseguir información sobre el comportamiento del usuario. La canalización GameStats se compila en la funcionalidad de latencia baja en LeaderBoard a fin de demostrar cómo puedes usar Dataflow para realizar este tipo de análisis avanzado.

Al igual que LeaderBoard, GameStats lee datos desde una fuente de transmisión, en este caso Pub/Sub. Se lo considera como un trabajo continuo que proporciona estadísticas sobre el juego mientras los usuarios juegan.

¿Qué hace GameStats?

Al igual que LeaderBoard, GameStats calcula la puntuación total por equipo, por hora. Sin embargo, la canalización también realiza dos tipos de análisis más complejos:

  • GameStats realiza un sistema de detección de abuso que lleva a cabo un análisis estadístico simple sobre los datos de puntuación para determinar qué usuarios pueden ser spammers o bots. Luego utiliza la lista de usuarios sospechosos de spam/bot para filtrar los bots del cálculo de puntuación por hora del equipo.
  • GameStats analiza patrones de uso mediante la agrupación de datos de juego que comparten horas de evento similares con sistema de ventanas de sesión. Esto nos permite obtener información acerca de cuánto tiempo los usuarios suelen jugar y cómo cambia la duración del juego a lo largo del tiempo.

A continuación, analizaremos estas funciones en detalle.

Detección de abuso

Supongamos que la puntuación en nuestro juego depende de la velocidad a la que un usuario puede “hacer clic” en su teléfono. La detección de abuso de GameStats analiza los datos de puntuación de cada usuario para detectar si un usuario tiene una "tasa de clics" anormalmente alta y, por lo tanto, una puntuación anormalmente alta. Esto podría indicar que un bot que opera significativamente más rápido que una persona podría estar jugando.

Para determinar si una puntuación es o no “anormalmente” alta, GameStats calcula el promedio de cada puntuación en esa ventana de tiempo fijo, y, luego, compara cada puntuación individual con la puntuación promedio multiplicada por un factor de peso arbitrario (en nuestro caso, 2.5). Por lo tanto, cualquier puntuación de más de 2.5 veces el promedio se considera producto de spam. La canalización GameStats hace seguimiento de una lista de usuarios “spam” y filtra a esos usuarios fuera de los cálculos de puntuación del equipo para la tabla principal del equipo.

Dado que el promedio depende de los datos de la canalización, debemos calcularlo y, luego, usar esos datos calculados en una transformación ParDo posterior que filtre las puntuaciones que excedan el valor ponderado. Para hacer esto, podemos pasar el promedio calculado como una entrada lateral al filtrado ParDo.

En el siguiente ejemplo de código, se muestra la transformación compuesta que controla la detección de abuso. La transformación utiliza la transformación Sum.integersPerKey para sumar todas las puntuaciones por usuario y, luego, la transformación Mean.globally a fin de determinar la puntuación promedio para todos los usuarios. Una vez que eso se haya calculado (como un singleton de PCollectionView), podemos pasarlo al filtrado ParDo con .withSideInputs:

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores = userScores
        .apply("UserSum", Sum.<String>integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
        .apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered = sumScores
        .apply(ParDo
            .named("ProcessAndFilter")
            // use the derived mean total score as a side input
            .withSideInputs(globalMeanScore)
            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
              private final Aggregator<Long, Long> numSpammerUsers =
                createAggregator("SpammerUsers", new Sum.SumLongFn());
              @Override
              public void processElement(ProcessContext c) {
                Integer score = c.element().getValue();
                Double gmc = c.sideInput(globalMeanScore);
                if (score > (gmc * SCORE_WEIGHT)) {
                  LOG.info("user " + c.element().getKey() + " spammer score " + score
                      + " with mean " + gmc);
                  numSpammerUsers.addValue(1L);
                  c.output(c.element());
                }
              }
            }));
    return filtered;
  }
}

La transformación de detección de abuso genera una vista de los usuarios sospechosos de ser spambots. Más adelante en la canalización, utilizamos esa vista para filtrar a cualquiera de esos usuarios cuando calculamos la puntuación del equipo por hora, nuevamente con el mecanismo de entrada lateral. En el siguiente ejemplo de código, se muestra dónde insertamos el filtro de spam, entre la calificación de las puntuaciones por ventanas en ventanas fijas y la extracción de las puntuaciones del equipo:

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
  .apply(Window.named("WindowIntoFixedWindows")
      .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getFixedWindowDuration())))
      )
  // Filter out the detected spammer users, using the side input derived above.
  .apply(ParDo.named("FilterOutSpammers")
          .withSideInputs(spammersView)
          .of(new DoFn<GameActionInfo, GameActionInfo>() {
            @Override
            public void processElement(ProcessContext c) {
              // If the user is not in the spammers Map, output the data element.
              if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                c.output(c.element());
              }
            }}))
  // Extract and sum teamname/score pairs from the event data.
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

Cómo analizar patrones de uso

Podemos obtener información sobre cuándo los usuarios juegan nuestro juego y durante cuánto tiempo. Para esto, debemos examinar las horas del evento de la puntuación de cada juego y agrupar las puntuaciones con horas del evento similares en sesiones. GameStats usa una función integrada de sistema de ventanas de sesión de Dataflow para agrupar puntuaciones del usuario en sesiones según el tiempo en que ocurren.

Cuando configuras el sistema de ventanas de sesión, especificas una duración de intervalo mínima entre eventos. Se agrupan en la misma ventana todos los eventos cuyas horas de llegada se acercan más a la duración de intervalo mínima. Se agrupan en ventanas separadas los eventos en que la diferencia en la hora de llegada es mayor que el intervalo. Según cómo establezcamos la duración de intervalo mínima, podemos suponer con seguridad que las puntuaciones en la misma ventana de sesión son parte del mismo tramo de juego (relativamente) ininterrumpido. Las puntuaciones en una ventana diferente indican que el usuario dejó de jugar el juego durante, al menos, el tiempo de intervalo mínimo antes de volver más tarde.

En el siguiente diagrama, se muestra cómo podrían verse los datos cuando se agrupan en ventanas de sesión. A diferencia de las ventanas fijas, las ventanas de sesión son diferentes para cada usuario y dependen del patrón de juego de cada usuario:

Un diagrama que representa el sistema de ventanas de sesión.
Figura 5: Sesiones de usuario, con una duración de intervalo mínima. Ten en cuenta cómo cada usuario tiene sesiones diferentes según la cantidad de instancias que jueguen y el tiempo de descanso entre instancias.

Podemos usar los datos con sesiones de ventanas si queremos determinar la duración promedio del tiempo de juego ininterrumpido para todos nuestros usuarios, además de la puntuación total que alcanzan durante cada sesión. Podemos hacer esto en el código, primero mediante la aplicación de ventanas de sesión, la suma de la puntuación por usuario y sesión, y luego con una transformación para calcular la duración de cada sesión individual:

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
  .apply(Window.named("WindowIntoSessions")
        .<KV<String, Integer>>into(
              Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
    .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
  // For this use, we care only about the existence of the session, not any particular
  // information aggregated over it, so the following is an efficient way to do that.
  .apply(Combine.perKey(x -> 0))
  // Get the duration per session.
  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))

Esto nos da un conjunto de sesiones de usuario, cada una con una duración adjunta. A continuación, podemos calcular la duración promedio de las sesiones si volvemos a clasificar los datos en ventanas de tiempo fijo y, luego, calculamos el promedio para todas las sesiones que finalizan cada hora:

// Re-window to process groups of session sums according to when the sessions complete.
.apply(Window.named("WindowToExtractSessionMean")
      .<Integer>into(
          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
       new WriteWindowedToBigQuery<Double>(
          options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));

Podemos usar la información resultante para encontrar, por ejemplo, las horas del día en que los usuarios juegan durante más tiempo o los tramos del día con mayor probabilidad de ver sesiones de juego más cortas.

Más información

Para saber más sobre cómo funcionan estos ejemplos, ejecuta las canalizaciones de ejemplo que se incluyen en los ejemplos del SDK de Dataflow (rama master-1.x) en GitHub. Consulta README.md en el directorio de ejemplos para obtener instrucciones completas sobre cómo ejecutar las canalizaciones de ejemplo incluidas:

Los videos y las entradas de blog que se muestran a continuación proporcionan más información y contexto sobre el modelo de datos unificados de Dataflow para el procesamiento de transmisión y por lotes.

  • Entrada de blog de Dataflow & Spark: Una entrada de blog creada por ingenieros de Google que utiliza ejemplos en el dominio de los videojuegos para dispositivos móviles a fin de ilustrar cómo se compara el modelo de Dataflow con el de Apache Spark.
  • Dataflow talk @Scale: Una presentación en video tomada de la conferencia @Scala, por el ingeniero de Google, Frances Perry, que explica el modelo de Dataflow con una versión anterior de los ejemplos de videojuegos para dispositivos móviles descritos anteriormente.
  • “El mundo más allá de los lotes: transmisión 101 y transmisión 102”: Una serie de dos partes sobre entradas en el blog O'Reilly, del ingeniero de Google Tyler Akidau, que examina el futuro del procesamiento de los macrodatos.
¿Te ha resultado útil esta página? Enviar comentarios:

Enviar comentarios sobre...

Si necesitas ayuda, visita nuestra página de asistencia.