Exemplos de canal de jogos para dispositivos móveis

Nesta seção, você verá uma série de exemplos de pipelines do Dataflow que demonstram funcionalidade mais complexa do que o exemplo de WordCount básico. Os pipelines desta seção processam dados de um jogo hipotético que os usuários jogam nos smartphones deles. Os pipelines demonstram processamento em níveis crescentes de complexidade. O primeiro pipeline, por exemplo, mostra como executar um job de análise em lote para gerar dados de pontuação relativamente simples, enquanto os pipelines posteriores usam os recursos de janelas e acionadores do Dataflow para fornecer análises de dados de baixa latência e informações mais complexas sobre padrões de jogo do usuário.

Sempre que um usuário joga uma instância do nosso jogo hipotético para dispositivos móveis, ele gera um evento de dados. Cada evento de dados consiste nas seguintes informações:

  • O ID exclusivo do usuário que está jogando.
  • o ID da equipe à qual o usuário pertence
  • um valor de pontuação dessa instância específica do jogo
  • Um carimbo de data/hora que registra quando a instância específica do jogo aconteceu. Essa é a hora de cada evento de dados do jogo.

Quando o usuário finaliza uma instância do jogo, o smartphone envia o evento de dados para um servidor de jogos, no qual os dados são registrados e armazenados em um arquivo. Geralmente os dados são enviados ao servidor de jogos logo após a finalização. No entanto, os usuários podem jogar "off-line", quando o smartphone não estiver conectado ao servidor, por exemplo, no aeroporto ou fora da área de cobertura da rede. Quando o telefone do usuário se reconecta ao servidor de jogos, ele envia todos os dados acumulados do jogo.

Isso significa que alguns eventos de dados são recebidos pelo servidor de jogos bem mais tarde do que os usuários que os geram. Essa diferença de tempo pode ter implicações de processamento para pipelines que fazem cálculos que consideram quando cada pontuação foi gerada. Esses pipelines rastreiam pontuações geradas durante cada hora do dia, por exemplo, ou calculam a duração do tempo de jogo contínuo dos usuários. Ambos os casos dependem do tempo do evento de cada registro de dados.

Os exemplos de pipeline de Jogo para dispositivos móveis variam em complexidade, de análise em lote a pipelines mais complexos que podem executar análises e detecções de abuso em tempo real. Nesta seção, você verá exemplos e demonstrações de como usar recursos do Dataflow como janelas e gatilhos para expandir as funcionalidades do pipeline.

UserScore: processamento básico de pontuações em lote

O pipeline UserScore é o exemplo mais simples de processamento de dados de jogos para dispositivos móveis. UserScore determina a pontuação total por usuário em um conjunto de dados finito, por exemplo, pontuações de um dia armazenadas no servidor de jogos. Pipelines como UserScore são melhor executados periodicamente depois que todos os dados relevantes estiverem reunidos. Por exemplo, execute UserScore como um job noturno dos dados coletados durante o dia.

O que o UserScore faz?

Nos dados de pontuação de um dia, cada ID de usuário vai ter vários registros, se o usuário jogar mais do que uma instância do jogo durante a janela de análise, cada um com o valor de pontuação e carimbo de data/hora próprios. Para determinar a pontuação total de todas as instâncias que o usuário joga durante o dia, o pipeline precisa agrupar todos os registros por usuário.

À medida que o pipeline processa cada evento, a pontuação do evento é adicionada à soma total desse usuário específico.

UserScore analisa somente os dados necessários de cada registro, especificamente o ID do usuário e o valor da pontuação. O pipeline não considera o horário do evento nos registros. Ele simplesmente processa todos os dados presentes nos arquivos de entrada que você especificou no momento da execução.

O fluxo do pipeline básico de UserScore faz o seguinte:

  1. Lê os dados da pontuação do dia de um arquivo armazenado no Google Cloud Storage.
  2. Soma os valores da pontuação de cada usuário agrupando cada evento do jogo por ID de usuário e combinando os valores da pontuação para ter a pontuação total desse usuário específico.
  3. Grava os dados resultantes em uma tabela do BigQuery.

O diagrama a seguir apresenta os dados da pontuação de vários usuários durante o período de análise do pipeline. No diagrama, cada ponto de dados é um evento que resulta em um par de usuário/pontuação.

Figura 1: dados da pontuação de três usuários.

Este exemplo usa processamento em lote e o eixo Y do diagrama representa o tempo de processamento: primeiro, o pipeline processa os eventos inferiores do eixo Y, e depois, os superiores. O eixo X do diagrama representa o tempo de cada evento do jogo, conforme indicado pelo carimbo de data/hora do evento. Observe que os eventos individuais do diagrama não são processados pelo pipeline na mesma ordem em que ocorrem, ou seja, de acordo com os respectivos carimbos de data/hora.

Depois de ler os eventos de pontuação do arquivo de entrada, o pipeline agrupa todos os pares de usuário/pontuação e soma os valores da pontuação em um valor total por usuário. O UserScore armazena a lógica básica dessa etapa como a transformação composta definida pelo usuário ExtractAndSumScore:

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(
      PCollection<GameActionInfo> gameInfo) {

    return gameInfo
      .apply(MapElements
          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
      .apply(Sum.<String>integersPerKey());
  }
}

ExtractAndSumScore é escrito para ser mais geral, por isso, é possível transmitir no campo em que você quer agrupar os dados (no caso do jogo, por usuário ou equipe únicos). Isso significa que podemos reutilizar ExtractAndSumScore em outros pipelines que agrupam dados da pontuação por equipe, por exemplo.

Este é o método principal de UserScore, mostrando como aplicar as três etapas do pipeline:

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
    .apply("WriteUserScoreSums",
        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
                                                 configureBigQueryWrite()));

  // Run the batch pipeline.
  pipeline.run();
}

Como trabalhar com os resultados

O UserScore grava os dados em uma tabela do BigQuery (chamada user_score por padrão). Com os dados da tabela do BigQuery, é possível realizar mais uma análise interativa, como consulta à lista dos N usuários com maior pontuação em um determinado dia.

Vamos supor que você queira determinar interativamente os 10 usuários com maior pontuação de um determinado dia. Na interface do usuário do BigQuery, execute a seguinte consulta:

SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10

Limitações

Conforme mostrado no exemplo, o pipeline UserScore tem algumas limitações:

  • Os dados resultantes do jogo gerados pelo pipeline UserScore podem estar incompletos. Isso acontece porque alguns dados da pontuação podem ser gerados por jogadores off-line e enviados após o fechamento diário. UserScore só processa o conjunto fixo de entrada presente nos arquivos de entrada quando o pipeline é executado.
  • O UserScore processa todos os eventos de dados presentes no arquivo de entrada no tempo de processamento. Ele não examina nem verifica erros dos eventos baseados na hora. Portanto, os resultados podem incluir alguns valores que tenham tempos de evento fora do período de análise pertinente, por exemplo, registros atrasados do dia anterior.
  • Como o UserScore é executado somente depois que todos os dados já foram coletados, há alta latência entre o momento em que os usuários geram eventos de dados (hora do evento) e o momento em que os resultados são calculados (tempo do processamento).
  • UserScore informa somente o total de resultados do dia inteiro e não fornece informações mais refinadas sobre como os dados são acumulados durante o dia.

A partir do próximo exemplo de pipeline, você verá como usar os recursos do Dataflow para abordar essas limitações.

HourlyTeamScore: processamento avançado em lote com janelas

O pipeline HourlyTeamScore expande os princípios básicos de análise em lote usados no pipeline UserScore e reduz algumas de suas limitações. HourlyTeamScore faz análises mais refinadas, usando outros recursos dos SDKs do Dataflow e abrangendo mais aspectos dos dados do jogo. Por exemplo, HourlyTeamScore pode filtrar dados que não fazem parte do período relevante para a análise.

Como o UserScore, o HourlyTeamScore é uma tarefa a ser executada periodicamente, depois que todos os dados relevantes forem reunidos, por exemplo, uma vez por dia. O pipeline lê um conjunto fixo de dados de um arquivo e grava os resultados em uma tabela do BigQuery, exatamente como o UserScore.

O que o HourlyTeamScore faz?

HourlyTeamScore calcula a pontuação total por equipe, por hora, em um conjunto fixo de dados, por exemplo, dados de um dia.

  • Em vez de operar no conjunto de dados inteiro de uma vez, HourlyTeamScore divide os dados de entrada em janelas lógicas e faz os cálculos delas. Assim, o HourlyUserScore fornece informações sobre dados de pontuação por janela, em que cada janela representa o progresso da pontuação do jogo em intervalos fixos de tempo. Por exemplo, uma vez por hora.
  • O HourlyTeamScore filtra eventos de dados com base na respectiva hora, indicada pelo carimbo de data/hora incorporado, que está dentro do período de análise pertinente. Basicamente, o pipeline verifica o carimbo de data/hora de cada evento do jogo para assegurar que esteja dentro do intervalo da análise (nesse caso, o dia em questão). Os eventos de dados de dias anteriores são descartados e não são incluídos nos totais de pontuação. Isso torna o HourlyTeamScore mais robusto e menos propenso a erros nos dados resultantes do que o UserScore. Além disso, o pipeline leva em consideração os dados atrasados com carimbo de data/hora dentro do período de análise pertinente.

Veja abaixo detalhes sobre cada uma dessas melhorias do HourlyTeamScore:

Janelas de tempo fixo

Com as janelas de tempo fixo, o pipeline fornece informações mais claras sobre os eventos acumulados no conjunto de dados no curso do período de análise. Em nosso caso, ele nos informa em quais horas do dia cada equipe esteve ativa e quanto ela pontuou nessas ocasiões.

O seguinte diagrama mostra como o pipeline processa os dados de pontuação de uma equipe em um dia depois de aplicar janelas de tempo fixo:

Figura 2: dados da pontuação de duas equipes. As pontuações de cada equipe estão divididas em janelas lógicas com base no momento em que ocorreram durante o evento.

Observe que, à medida que o tempo do processamento avança, as somas são agora por janela. Cada janela representa uma hora do tempo do evento durante o dia em que as pontuações ocorreram.

O recurso de gestão de janelas do Dataflow usa as informações intrínsecas do carimbo de data/hora anexadas a cada elemento de PCollection. Como queremos que o pipeline crie janelas com base na hora do evento, precisamos primeiro extrair o carimbo de data/hora incorporado a cada registro de dados e aplicá-lo ao elemento correspondente na PCollection de dados da pontuação. Em seguida, o pipeline aplica a função de gestão de janela para dividir a PCollection em janelas lógicas.

Veja o código, que mostra como HourlyTeamScore usa as transformações WithTimestamps e Window para executar estas operações:

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
       WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(Window.named("FixedWindowsTeam")
    .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getWindowDuration()))))

As transformações que o pipeline usa para especificar a gestão de janelas são diferentes das transformações de processamento de dados reais (como ExtractAndSumScores). Essa função proporciona a você flexibilidade na criação do seu pipeline do Dataflow, porque é possível executar transformações atuais em conjuntos de dados com diferentes características de janelas.

Filtrar com base no tempo do evento

HourlyTeamScore usa a filtragem para remover quaisquer eventos do nosso conjunto de dados com carimbos de data/hora que não se enquadram no período de análise relevante (ou seja, eles não foram gerados durante o dia em que estamos interessados). Isso impede que o pipeline inclua erroneamente dados que foram, por exemplo, gerados off-line durante o dia anterior e enviados ao servidor de jogos no dia atual.

Dessa forma, o pipeline inclui dados atrasados relevantes, eventos de dados com carimbos de data/hora válidos, mas que chegaram após o término do período de análise. Se o horário de fechamento do pipeline é às 12h, por exemplo, podemos executar o pipeline às 2h, mas filtrar eventos cujos carimbos de data/hora indiquem que tenham ocorrido após o fechamento das 12h. Os eventos de dados atrasados que chegam entre 12h01min e 2h, que têm carimbos de data/hora que indicam que eles ocorreram antes do fechamentos das 12h, são incluídos no processamento do pipeline.

HourlyTeamScore usa a transformação Filter para realizar essa operação. Quando você aplica Filter, especifica um predicado com que cada registro de dados é comparado. Os registros de dados aprovados na comparação são incluídos, e os eventos reprovados são excluídos. Em nosso caso, o predicado é o horário de fechamento que especificamos e comparamos apenas uma parte dos dados, o campo de carimbo de data/hora.

O código a seguir mostra como HourlyTeamScore usa a transformação Filter para filtrar eventos que ocorrem antes ou depois do período de análise:

.apply("FilterStartTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply("FilterEndTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

Como calcular a pontuação por equipe, por janela

HourlyTeamScore usa a mesma transformação ExtractAndSumScores do pipeline UserScore, mas passa uma chave diferente (equipe, em vez de usuário). Além disso, como o pipeline aplica ExtractAndSumScores depois de implantar a gestão de janelas de tempo fixo de uma hora aos dados de entrada, os dados ficam agrupados por equipe e por janela. É possível ver a sequência completa de transformações no método principal do HourlyTeamScore:

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    // Parse the incoming data.
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))

    // Filter out data before and after the given times so that it is not included
    // in the calculations. As we collect data in batches (say, by day), the batch for the day
    // that we want to analyze could potentially include some late-arriving data from the previous
    // day. If so, we want to weed it out. Similarly, if we include data from the following day
    // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
    // that fall after the time period we want to analyze.
    .apply("FilterStartTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
    .apply("FilterEndTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

    // Add an element timestamp based on the event log, and apply fixed windowing.
    .apply("AddEventTimestamps",
           WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
    .apply(Window.named("FixedWindowsTeam")
        .<GameActionInfo>into(FixedWindows.of(
              Duration.standardMinutes(options.getWindowDuration()))))

    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
    .apply("WriteTeamScoreSums",
      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
          configureWindowedTableWrite()));

  pipeline.run();
}

Limitações

Na forma que está escrito, HourlyTeamScore continua com uma limitação:

  • O HourlyTeamScore ainda tem alta latência entre o momento em que os eventos de dados ocorrem (hora do evento) e a geração dos resultados (tempo do processamento). Isso acontece porque ele é um pipeline em lote e precisa esperar que todos os eventos de dados estejam presentes para iniciar o processamento.

LeaderBoard: processamento de streaming com dados do jogo em tempo real

Uma maneira de ajudar a solucionar o problema da latência dos pipelines UserScore e HourlyTeamScore é fazer a leitura dos dados de pontuação de uma origem de streaming. O pipeline LeaderBoard apresenta o processamento de stream ao ler os dados de pontuação do jogo a partir do Google Cloud Pub/Sub, em vez de usar um arquivo do servidor de jogos.

O pipeline LeaderBoard também demonstra como processar dados de pontuação do jogo em relação ao tempo do processamento e à hora do evento. O LeaderBoard envia dados sobre pontuações de usuário individual e pontuações de equipe, cada um em relação a um período diferente.

Como o pipeline LeaderBoard lê os dados do jogo de uma origem de streaming quando eles são gerados, pense nele como um job contínuo executado simultaneamente com o processo do jogo. Assim, LeaderBoard fornece informações de baixa latência sobre como os usuários estão jogando em um determinado momento. Isso é útil, por exemplo, quando queremos fornecer um placar ao vivo pela Internet, para os usuários acompanharem seu progresso em relação a outros durante o jogo.

O que o LeaderBoard faz?

O pipeline LeaderBoard lê os dados do jogo publicados no Pub/Sub quase em tempo real, e usa esses dados para executar duas tarefas separadas de processamento:

  • O LeaderBoard calcula a pontuação total de cada usuário único e publica resultados especulativos a cada dez minutos do tempo de processamento. Ou seja, a cada 10 minutos, o pipeline emite a pontuação total por usuário que processou até o momento. Esse cálculo fornece um "placar" atualizado, quase em tempo real, independentemente de quando os eventos reais do jogo foram gerados.
  • O LeaderBoard calcula as pontuações de equipe a cada hora em que o pipeline é executado. Isso é útil quando queremos, por exemplo, recompensar a equipe mais pontuada a cada hora do jogo. O cálculo da pontuação de equipe usa janelas de tempo fixo para dividir os dados de entrada em janelas finitas de uma hora baseadas no tempo do evento, indicado pelo carimbo de data/hora, quando os dados chegam ao pipeline.

    Além disso, o cálculo também usa mecanismos de gatilho do Dataflow para fornecer resultados especulativos a cada hora, que são atualizados a cada cinco minutos até completar uma hora, bem como para capturar dados atrasados e adicioná-los à janela específica de uma hora a que eles pertencem.

Vejamos a seguir essas duas tarefas, em detalhes.

Calcular a pontuação do usuário com base no tempo do processamento

Queremos que o pipeline emita uma pontuação total atualizada de cada usuário a cada dez minutos que o pipeline é executado. Esse cálculo não considera quando a pontuação real foi gerada pela instância do jogo do usuário. Ele simplesmente emite a soma de todas as pontuações desse usuário que chegaram ao pipeline até o momento. Os dados atrasados são incluídos no cálculo sempre que chegam ao pipeline em execução.

Como queremos todos os dados que chegam ao pipeline sempre que atualizamos o cálculo, o pipeline precisa considerar todos os dados da pontuação do usuário em uma única janela global. Essa janela global é ilimitada, mas é possível especificar um tipo de ponto de fechamento temporário a cada 10 minutos de cálculo, usando um acionador do tempo de processamento.

Quando especificamos um acionador do tempo de processamento de 10 minutos para uma janela global, o pipeline tira um "instantâneo" do conteúdo da janela sempre que o acionador dispara, o que acontece em intervalos de 10 minutos. Por ser uma única janela global, cada instantâneo contém todo os dados coletados até esse ponto no tempo. Veja no seguinte diagrama os efeitos do uso de acionador do tempo de processamento na única janela global:

Figura 3: dados da pontuação de três usuários. As pontuações de cada usuário são agrupadas em uma única janela global, com um gatilho que gera um snapshot da saída a cada 10 minutos.

À medida que o tempo do processamento avança e mais pontuações são processadas, o gatilho emite a soma atualizada de cada usuário.

Veja no exemplo de código a seguir mostra como LeaderBoard define o acionador de tempo de processamento para gerar os dados das pontuações do usuário:

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing.
 * Get periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
    return input.apply("LeaderboardUserGlobalWindow",
        Window.<GameActionInfo>into(new GlobalWindows())
            // Get periodic results every ten minutes.
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
            .accumulatingFiredPanes()
            .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

Observe que o LeaderBoard usa um acionador de acumulação para o cálculo da pontuação do usuário (invocando .accumulatingFiredPanes ao definir o acionador). O acionador de acumulação faz com que o pipeline acumule os dados emitidos anteriormente junto com os novos dados que chegarem desde o último disparo do acionador. Isso garante que o LeaderBoard emita a soma atualizada das pontuações do usuário, em vez de uma coleção de somas individuais.

Como calcular a pontuação da equipe com base no tempo do evento

Queremos que o pipeline emita também uma pontuação total de cada equipe durante cada hora de jogo. Ao contrário do cálculo de pontuação do usuário, nas pontuações da equipe, tratamos do momento no tempo do evento em que cada pontuação realmente ocorreu, para considerar cada hora do jogo, individualmente. Queremos também fornecer atualizações especulativas à medida que cada hora avançar, além de incluir no cálculo as instâncias de dados atrasados que chegam depois que uma determinada hora de dados é considerada completa.

Como pensamos em cada hora individualmente, podemos aplicar janelas de tempo fixo aos dados de entrada, exatamente como em HourlyTeamScore. Para fornecer atualizações especulativas e atualizações de dados atrasados, é preciso especificar outros parâmetros do acionador. O acionador faz com que cada janela calcule e emita resultados em um intervalo especificado, nesse caso, a cada cinco minutos, e continua acionando depois que a janela é considerada "completa", para incluir os dados atrasados. Assim como no cálculo de pontuação do usuário, defina o gatilho no modo de acumulação para garantir a soma atualizada de cada janela de uma hora.

Os acionadores de atualizações especulativas e dados atrasados ajudam no problema de distorção de tempo. Os eventos do pipeline não são necessariamente processados na ordem em que ocorreram de acordo com os carimbos de data/hora. Eles podem chegar fora de ordem ao pipeline, ou atrasados, no nosso caso, porque terem sido gerados enquanto o telefone do usuário estava desconectado da rede. O Dataflow precisa de um recurso que ajude a definir quando "todos" os dados já estão em uma determinada janela. Esse recurso é a marca d'água.

O ideal seria que todos os dados fossem processados logo que ocorressem, para que o tempo do processamento fosse igual ou pelo menos tivesse uma relação linear com o tempo do evento. No entanto, em virtude da imprecisão inerente aos sistemas distribuídos, por exemplo, telefones que enviam dados com atraso, o Dataflow normalmente usa uma marca d'água heurística.

Veja no seguinte diagrama a relação entre o tempo do processamento contínuo e o tempo do evento de cada pontuação de duas equipes:

Figura 4: dados de pontuação por equipe, com janelas por tempo do evento. Um gatilho baseado no tempo de processamento faz com que a janela emita resultados antecipados especulativos e inclua resultados atrasados.

A linha pontilhada no diagrama é a marca d'água "ideal": noção do Dataflow de quando considerar que todos os dados de uma determinada janela já chegaram. A linha sólida e irregular representa a marca d'água real, conforme determinado pela origem de dados, em nosso caso, Pub/Sub.

Os dados que chegam acima da linha sólida da marca d'água são os dados atrasados. Esse é um evento de pontuação que atrasou, talvez porque foi gerado off-line, e chegou depois que a janela à qual ele pertence fechou. O acionador de disparo atrasado do pipeline garante que esses dados atrasados sejam incluídos na soma.

O exemplo de código a seguir mostra como LeaderBoard aplica janelas de tempo fixo com os acionadores apropriados para que o pipeline faça os cálculos desejados:

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
    return infos.apply("LeaderboardTeamFixedWindows",
        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
            // We will get early (speculative) results as well as cumulative
            // processing of late data.
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(FIVE_MINUTES))
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(TEN_MINUTES)))
            .withAllowedLateness(allowedLateness)
            .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

Juntas, essas estratégias de processamento permitem resolver problemas de latência e de integridade presentes nos pipelines UserScore e HourlyTeamScore, enquanto usamos as mesmas transformações básicas para processar os dados. Na verdade, ambos os cálculos ainda usam a mesma transformação ExtractAndSumScore que usamos nos pipelines UserScore e HourlyTeamScore.

GameStats: detecção de abuso e análise de uso

Embora LeaderBoard demonstre como usar janelas e gatilhos básicos para realizar uma análise de dados flexível e de baixa latência, também é possível usar técnicas de janelas mais avançadas para fazer análises mais abrangentes. Isso inclui alguns cálculos criados para detectar abusos no sistema como spam, ou para buscar insights do comportamento do usuário. O pipeline GameStats se baseia na funcionalidade de baixa latência em LeaderBoard para demonstrar como é possível usar o Dataflow para realizar esse tipo de análise avançada.

Assim como LeaderBoard, GameStats lê dados de uma fonte de streaming, neste caso, Pub/Sub. Pense nele como um job contínuo que fornece insight do jogo à medida que os usuários jogam.

O que o GameStats faz?

Como LeaderBoard, GameStats calcula a pontuação total por equipe, por hora. No entanto, o pipeline faz também dois tipos de análises mais complexas:

  • O GameStats faz a detecção de abusos do sistema que realiza algumas análises estatísticas simples dos dados da pontuação para determinar quais usuários, se houver, são bots ou criadores de spam. Ele usa a lista de usuários suspeitos de spam/bot para filtrar os bots do cálculo de pontuação da equipe por hora.
  • O GameStats analisa padrões de uso agrupando os dados do jogo que compartilham horas de evento semelhantes por meio da gestão de janelas de sessão. Isso gera informações sobre o período que os usuários tendem a jogar e como a duração do jogo muda com o passar do tempo.

Vejamos a seguir esses recursos mais detalhadamente.

Detecção de abuso

Vamos supor que a pontuação do jogo dependa da velocidade de "clique" do usuário no telefone. A detecção de abuso de GameStats analisa os dados de pontuação de cada usuário para detectar se um usuário tem uma taxa de cliques anormalmente alta e, portanto, uma pontuação anormalmente alta. Isso indica que o jogo está sendo jogado por um bot que opera de modo significativamente mais rápido do que o ser humano.

Para determinar se a pontuação é ou não "anormalmente" alta, o GameStats calcula a média de todas as pontuações dessa janela de tempo fixo e verifica cada pontuação individual em relação à pontuação média multiplicada por um fator de ponderação arbitrário, em nosso caso, 2,5. Assim, qualquer pontuação 2,5 vezes maior que a média é considerada um produto de spam. O pipeline GameStats rastreia uma lista de usuários que criam spam e os filtra nos cálculos de pontuação da equipe do placar.

Como a média depende dos dados do pipeline, precisamos calculá-los e, em seguida, usar esses dados calculados em uma transformação ParDo posterior, que filtra pontuações que excedem o valor ponderado. Para fazer isso, transmita a média calculada como uma entrada secundária para a filtragem ParDo.

Veja no seguinte exemplo de código a transformação composta que processa a detecção de abusos. A transformação usa a transformação Sum.integersPerKey para somar todas as pontuações por usuário, em seguida, a transformação Mean.globally para determinar a pontuação média de todos os usuários. Depois de calculado (como um singleton PCollectionView), podemos enviá-lo para a filtragem ParDo usando .withSideInputs:

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores = userScores
        .apply("UserSum", Sum.<String>integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
        .apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered = sumScores
        .apply(ParDo
            .named("ProcessAndFilter")
            // use the derived mean total score as a side input
            .withSideInputs(globalMeanScore)
            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
              private final Aggregator<Long, Long> numSpammerUsers =
                createAggregator("SpammerUsers", new Sum.SumLongFn());
              @Override
              public void processElement(ProcessContext c) {
                Integer score = c.element().getValue();
                Double gmc = c.sideInput(globalMeanScore);
                if (score > (gmc * SCORE_WEIGHT)) {
                  LOG.info("user " + c.element().getKey() + " spammer score " + score
                      + " with mean " + gmc);
                  numSpammerUsers.addValue(1L);
                  c.output(c.element());
                }
              }
            }));
    return filtered;
  }
}

A transformação abuse-detection gera uma visualização de usuários suspeitos de bots de spam. Mais adiante no pipeline, use essa visualização para filtrar os usuários quando calcular a pontuação de equipe por hora, novamente usando o mecanismo de entrada secundária. Veja no seguinte exemplo de código onde inserir o filtro de spam, entre janelas fixas de pontuações e extração de pontuações da equipe:

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
  .apply(Window.named("WindowIntoFixedWindows")
      .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getFixedWindowDuration())))
      )
  // Filter out the detected spammer users, using the side input derived above.
  .apply(ParDo.named("FilterOutSpammers")
          .withSideInputs(spammersView)
          .of(new DoFn<GameActionInfo, GameActionInfo>() {
            @Override
            public void processElement(ProcessContext c) {
              // If the user is not in the spammers Map, output the data element.
              if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                c.output(c.element());
              }
            }}))
  // Extract and sum teamname/score pairs from the event data.
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

Analisar padrões de uso

Busque insights sobre quando os usuários estão jogando e por quanto tempo, examinando os tempos de evento de cada pontuação do jogo e agrupando as pontuações com tempos de evento semelhantes em sessões. O GameStats usa a função integrada de gestão de janelas de sessão do Dataflow para agrupar as pontuações do usuário em sessões com base na hora em que ocorreram.

Ao definir janelas de sessão, você especifica uma duração mínima do intervalo entre os eventos. Todos os eventos cujos tempos de chegada sejam mais próximos do que a duração mínima do intervalo são agrupados na mesma janela. Os eventos em que a diferença no tempo de chegada é maior do que o intervalo são agrupados em janelas separadas. Dependendo de como a duração mínima do intervalo for definida, presume-se com segurança que as pontuações das mesmas janelas de sessão fazem parte da mesma fase (relativamente) ininterrupta do jogo. As pontuações de uma janela diferente indicam que o usuário parou de jogar por pelo menos o tempo mínimo do intervalo até retornar mais tarde.

Veja no diagrama como os dados ficam quando são agrupados em janelas de sessão. Ao contrário das janelas fixas, as janelas de sessão são diferentes para cada usuário e dependem do padrão de jogo de cada usuário:

Um diagrama representando janelas de sessão.
Figura 5: sessões de usuário com duração mínima do intervalo. Observe como as sessões de cada usuário são diferentes de acordo com a quantidade de instâncias que eles jogam e a duração das paradas entre as instâncias.

Use os dados das janelas de sessão para determinar a duração média do tempo ininterrupto de jogo de todos os usuários e a pontuação total deles durante cada sessão. Faça isso no código, primeiro aplicando janelas de sessão, somando a pontuação por usuário e sessão e depois usando uma transformação para calcular a duração de cada sessão individual.

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
  .apply(Window.named("WindowIntoSessions")
        .<KV<String, Integer>>into(
              Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
    .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
  // For this use, we care only about the existence of the session, not any particular
  // information aggregated over it, so the following is an efficient way to do that.
  .apply(Combine.perKey(x -> 0))
  // Get the duration per session.
  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))

Isso nos dá um conjunto de sessões de usuário, cada uma com uma duração anexada. Em seguida, calcule a duração média da sessão recriando janelas de tempo fixo para os dados e calculando a média de todas as sessões no final de cada hora.

// Re-window to process groups of session sums according to when the sessions complete.
.apply(Window.named("WindowToExtractSessionMean")
      .<Integer>into(
          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
       new WriteWindowedToBigQuery<Double>(
          options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));

Use as informações resultantes para encontrar, por exemplo, as horas do dia em que os usuários passam mais tempo jogando ou os períodos do dia em que as sessões de jogo são mais curtas.

Informações adicionais

Para mais informações sobre o funcionamento desses exemplos, execute os pipelines de exemplo incluídos em Exemplos de SDK do Dataflow (branch master-1.x) no GitHub. Consulte o README.md do diretório de exemplos para instruções completas sobre como executar exemplos de pipelines incluídos.

As seguintes postagens e vídeos do blog fornecem mais contexto e informações sobre o modelo de dados unificado do Dataflow para processamento em lote e de streaming:

  • Postagem do blog Dataflow e Spark: feita pelos engenheiros do Google que usam exemplos de domínio de jogos para dispositivos móveis para ilustrar a comparação entre o modelo do Dataflow e o Apache Spark (em inglês).
  • Dataflow talk @Scale: apresentação em vídeo tirada da conferência @Scale, pelo engenheiro do Google, Frances Perry, que explica o modelo do Dataflow usando uma versão mais antiga dos exemplos de jogos para dispositivos móveis descritos acima.
  • "The World Beyond Batch: Streaming 101 e Streaming 102": uma série de duas postagens no blog da O'Reilly (em inglês), criada por Tyler Akidau, engenheiro do Google, que examina o futuro do processamento de Big Data.