モバイルゲーム パイプラインの例

このセクションでは、Dataflow パイプライン例のシリーズのチュートリアルを提供し、基本の WordCount の例より複雑な機能を実際に示します。このセクションのパイプラインでは、ユーザーがスマートフォンでプレイする架空のゲームのデータを処理します。パイプラインでは、レベルごとに複雑になっていく処理を示します。たとえば、最初のパイプラインでは一括分析ジョブを実行して、比較的単純なスコアデータを取得しますが、後のほうのパイプラインでは、Dataflow のウィンドウ処理を使用して、低レイテンシのデータ分析とユーザーのプレイパターンに関する複雑なインテリジェンスを実現する機能をトリガーします。

架空のモバイルゲームをユーザーがプレイするたびに、データイベントが生成されます。各データイベントは、次の情報で構成されます。

  • ゲームをプレイするユーザーの一意の ID
  • ユーザーが所属するチームのチーム ID
  • プレイの特定のインスタンスのスコア値。
  • プレイの特定のインスタンスが発生した時点を記録するタイムスタンプ。これは、各ゲームデータ イベントのイベント時間です。

ユーザーがゲームのインスタンスを完了すると、スマートフォンからゲームサーバーにデータイベントが送信され、そこでデータが記録されてファイルに保存されます。一般に、データは完了時にすぐにゲームサーバーに送信されます。ただし、スマートフォンがサーバーと通信していない場合、つまり飛行機内やネットワーク圏外などの場合、ユーザーは「オフライン」でゲームをプレイできます。ユーザーのスマートフォンがゲームサーバーと通信可能になると、蓄積したゲームデータがすべて送信されます。

つまり、一部のデータイベントは、ユーザーが生成した時点より大幅に遅れてゲームサーバーで受信されます。この時間差が、各スコアが生成されたタイミングを計算する、パイプラインの処理に影響することがあります。このようなパイプラインでは、たとえば、1 時間ごとに生成されたスコアのトラッキングや、ユーザーがゲームを継続してプレイした時間の計算を行います。このどちらも、各データレコードのイベント時間に依存しています。

モバイルゲームの例では、パイプラインは簡単な一括分析から、リアルタイム分析と不正行為の検出を実行できる複雑なものまで、その複雑さはさまざまです。このセクションでは、ウィンドウ処理などの Dataflow 機能を使用する方法と、パイプラインの機能展開をトリガーする方法をそれぞれ例で示します。

UserScore: 基本スコアの一括処理

UserScore パイプラインはモバイルゲーム データの最も簡単な処理例です。UserScore は有限のデータセット(ゲームサーバーに保存されている 1 日分のスコアなど)に対するユーザーごとの合計スコアを決定します。UserScore のようなパイプラインは、すべての関連データが収集された後に定期的に実行すると、最も効率よくなります。たとえば、UserScore はその日に収集したデータの夜間ジョブとして実行できます。

UserScore の機能

1 日分のスコアデータには、(分析ウィンドウ処理中にユーザーがゲームのインスタンスを複数回プレイした場合)各ユーザー ID に複数のレコードが存在し、それぞれに独自のスコア値とタイムスタンプがあります。1 日にユーザーがプレイしたインスタンスすべての合計スコアを出したい場合、パイプラインでは、ユーザー別にすべてのレコードをグループ化する必要があります。

パイプラインで各イベントを処理すると、イベントスコアがその特定のユーザーの合計値に追加されます。

UserScore は、各レコードから必要なデータ、特にユーザー ID とスコア値のみを解析します。パイプラインでは、レコードのイベント時間は考慮されません。パイプラインを実行するとき、指定した入力ファイル内にあるすべてのデータが処理されるだけです。

UserScore の基本的なパイプラインのフローでは、次のことが行われます。

  1. Google Cloud Storage から 1 日のスコアデータを読み取ります
  2. ゲームイベントをユーザー ID ごとにグループ化し、特定のユーザーのスコア値をまとめて合計スコアを算出することで、個々のユーザーごとのスコア値を算出します
  3. BigQuery テーブルに結果データを書き込みます

次の図は、パイプライン分析期間中における複数ユーザーのスコアデータを示しています。図では、それぞれのデータポイントは、ユーザーとスコア組み合わせのごとのイベントです。

図 1: 3 ユーザーのスコアデータ。

この例では、バッチ処理を使用しており、図の Y 軸は処理時間を示しています。パイプライン処理イベントは、Y 軸の下から始まり、徐々に高くなります。図の X 軸は各ゲームイベントのイベント時間を表しており、イベントのタイムスタンプで示されます。図のそれぞれのイベントは、タイムスタンプに応じたパイプラインでの発生順には処理されないことに注意してください。

パイプラインは入力ファイルからスコアイベントを読み取った後、そのユーザーとスコアの組み合わせをすべてグループ化し、一意のユーザーごとにスコア値を合計します。UserScore は、ユーザー定義の複合変換 ExtractAndSumScore としてこのステップのコアロジックをカプセル化します。

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(
      PCollection<GameActionInfo> gameInfo) {

    return gameInfo
      .apply(MapElements
          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
      .apply(Sum.<String>integersPerKey());
  }
}

ExtractAndSumScore の記述は汎用的であり、データをグループ化したいフィールド(このゲームの場合は一意のユーザーまたは一意のチーム)に渡すことができます。つまり、ExtractAndSumScore は、チーム別のスコアデータなどをグループ化する他のパイプラインでも再利用できます。

ここでは、パイプラインの 3 つのステップすべてを適用する方法を示す、UserScore の主なメソッドを示します。

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
    .apply("WriteUserScoreSums",
        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
                                                 configureBigQueryWrite()));

  // Run the batch pipeline.
  pipeline.run();
}

結果の操作

UserScore はデータを BigQuery テーブル(デフォルトの名前は user_score)に書き込みます。BigQuery テーブルのデータを使用すると、特定の日のスコア上位 N ユーザーのクエリなど、さらにインタラクティブな分析を実行できます。

特定の日のスコア上位 10 ユーザーをインタラクティブに決定するとします。BigQuery ユーザー インターフェースでは、次のクエリを実行できます。

SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10

制限事項

例に示すとおり、UserScore パイプラインにはいくつかの制限があります。

  • 一部のスコアデータはオフライン プレーヤーによって生成されて毎日の締め切り後に送信される場合があるので、ゲームデータでは、UserScore パイプラインで生成される結果データが不完全な可能性がありますUserScore では、パイプライン実行時に入力ファイルに事前に送信されている固定入力セットのみを処理します。
  • UserScore は、処理時点で入力ファイルに存在するすべてのデータイベントを処理しますが、イベント時間に基づいてイベントの検査やエラーチェックは行いません。このため、結果には、前日からの遅延レコードなど、関連する分析期間外のイベント時間の値が含まれる場合があります。
  • UserScore はすべてのデータの収集後に初めて実行されるので、ユーザーがデータイベントを生成したとき(イベント時間)と結果が計算されるとき(処理時間)の間のレイテンシが高くなります。
  • UserScore は 1 日の合計結果のみをレポートし、その日にデータがどのように累積されたかを示す詳細な情報は提供されません。

次のパイプラインの例から、Dataflow の機能を使用して、このような制限に対処する方法について説明します。

HourlyTeamScore: ウィンドウ処理による高度な一括処理

HourlyTeamScore パイプラインは、UserScore パイプラインで使用されている基本の一括分析の原則に従って展開し、その制限の一部に改良を加えます。HourlyTeamScore は Dataflow SDK の追加機能を使用し、ゲームデータのその他の部分を取り入れることによって、詳細な分析を実行します。たとえば、HourlyTeamScore では関連する分析期間に関係のないデータを除外できます。

UserScoreと同様に、HourlyTeamScore も、すべての関連データを収集した後に、定期的(1 日 1 回など)に実行するジョブとして最適です。パイプラインはファイルから固定データセットを読み取り、結果を UserScore のように BigQuery テーブルに書き込みます。

HourlyTeamScore の機能

HourlyTeamScore は固定データセット(1 日分のデータなど)で時間ごとにチーム別の合計スコアを計算します。

  • HourlyTeamScore はデータセット全体を一括して操作するのではなく、入力データを複数の論理ウィンドウに分割し、そのウィンドウで計算を実行します。これにより、HourlyUserScore はウィンドウ別のスコアデータに関する情報を提供し、各ウィンドウが一定間隔で(たとえば 1 時間おきに)ゲームスコアの進行状況を表します。
  • HourlyTeamScore は、(埋め込みのタイムスタンプで示される)イベント時間が該当する分析期間に入るかどうかに基づいて、データイベントをフィルタリングします。基本的に、パイプラインは各ゲームイベントのタイムスタンプが分析対象の範囲(この場合は日)になるかどうかを確認します。前日までのデータイベントは破棄され、スコア合計には含まれなくなります。これにより、HourlyTeamScore はさらに強固になり、UserScore 結果データのエラーも少なくなります。また、このパイプラインでは、タイムスタンプが関連する分析期間内であれば、遅れて到着したデータも考慮されます。

HourlyTeamScore のそれぞれの拡張について詳しく説明します。

固定時間のウィンドウ処理

固定時間のウィンドウ処理を使用すると、パイプラインから、分析期間中にデータセットにイベントをどのように累積したかに関する詳しい情報が提供されます。この場合、1 日に各チームがいつアクティブであったか、またその時間中にチームが獲得したスコアがわかります。

次の図は、固定時間のウィンドウ処理を適用した後、パイプラインによってあるチームの 1 日分のスコアデータがどのように処理されるかを示しています。

図 2: 2 つのチームのスコアデータ。各チームのスコアは、イベント時間中、そのスコアがいつ発生したかに基づいて、複数の論理ウィンドウに分割されます。

処理時間が進むにつれ、合計はウィンドウ別になり、各ウィンドウはスコアが発生した日のイベント時間での 1 時間を表すようになります。

Dataflow のウィンドウ処理機能では、PCollection の各要素に付加されている内的タイムスタンプ情報を使用します。パイプラインではイベント時間に基づいてウィンドウ処理する必要があるので、各データレコードに埋め込まれているタイムスタンプを最初に抽出し、それをスコアデータの PCollection 内にある対応する要素に適用します。次に、パイプラインはウィンドウ処理関数を適用して、PCollection を複数の論理ウィンドウに分割できます。

次のコードでは、HourlyTeamScoreWithTimestamps 変換と Window 変換を使用して、これらのオペレーションを実行しています。

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
       WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(Window.named("FixedWindowsTeam")
    .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getWindowDuration()))))

ウィンドウ処理を指定するためにパイプラインで使用される変換は、実際のデータ処理変換(ExtractAndSumScores など)とは区別される点に注意してください。この機能により、Dataflow パイプラインを柔軟に設計できるため、ウィンドウ処理特性が異なるデータセットでも、既存の変換を実行できます。

イベント時間に基づくフィルタリング

HourlyTeamScoreフィルタリングを使用して、タイムスタンプが該当する分析期間外になるイベント(対象となる日に生成されたものではないイベント)をデータセットから除外します。これにより、前日にオフラインで生成され、翌日にゲームサーバーに送信されたデータなどのためにパイプラインにエラーが生じることを回避できます。

また、タイムスタンプが有効で、分析期間の終了後に受信したデータイベントなど、関連する遅延データをパイプラインに含めることもできます。たとえば、パイプラインの締め切り時間が午前 12 時の場合、午前 2 時にパイプラインを実行しますが、タイムスタンプに午前 12 時の締め切り後に発生したことが示されているイベントは除外します。遅延のため午前 12 時 1 分から午前 2 時までの間に受信し、タイムスタンプが締め切りの午前 12 時前となっているデータイベントは、パイプライン処理に含まれることになります。

HourlyTeamScoreFilter 変換を使用してこのオペレーションを実行します。Filter を適用する場合、各データレコードを比較するための述語を指定します。比較に該当するデータが含まれ、比較に該当しないイベントは除外されます。このケースでは、述語は指定する締め切り時間であり、比較はデータの一部分である、タイムスタンプ フィールドのみを比較します。

次のコードでは、HourlyTeamScoreFilter 変換がどのように使用され、関連する分析期間の前後に発生したイベントをフィルタリングするかを示しています。

.apply("FilterStartTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply("FilterEndTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

チーム別、ウィンドウ別のスコア計算

HourlyTeamScoreUserScore パイプラインと同じ ExtractAndSumScores 変換を使用しますが、渡されるキーは異なります(ユーザーではなくチーム)。また、パイプラインでは固定時間 1 時間のウィンドウ処理を入力データに適用した後で ExtractAndSumScores を適用するので、データがチーム別およびウィンドウ別にグループ化されます。一連の変換はすべて、HourlyTeamScore の主なメソッドで示されます。

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    // Parse the incoming data.
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))

    // Filter out data before and after the given times so that it is not included
    // in the calculations. As we collect data in batches (say, by day), the batch for the day
    // that we want to analyze could potentially include some late-arriving data from the previous
    // day. If so, we want to weed it out. Similarly, if we include data from the following day
    // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
    // that fall after the time period we want to analyze.
    .apply("FilterStartTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
    .apply("FilterEndTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

    // Add an element timestamp based on the event log, and apply fixed windowing.
    .apply("AddEventTimestamps",
           WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
    .apply(Window.named("FixedWindowsTeam")
        .<GameActionInfo>into(FixedWindows.of(
              Duration.standardMinutes(options.getWindowDuration()))))

    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
    .apply("WriteTeamScoreSums",
      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
          configureWindowedTableWrite()));

  pipeline.run();
}

制限事項

記述のとおり、HourlyTeamScore には他にも制限があります。

  • HourlyTeamScore はバッチ パイプラインであり、すべてのデータイベントが存在するようになるのを待ってから処理を開始するので、データイベント発生(イベント時間)から結果の生成(処理時間)までのレイテンシが依然として高くなります

LeaderBoard: リアルタイムでのゲームデータのストリーミング処理

UserScore および HourlyTeamScore のパイプラインで生じるレイテンシの問題に対処するには、スコアデータをストリーミング ソースから読み取る方法があります。LeaderBoard パイプラインは、ゲーム スコアデータをゲームサーバー上のファイルからではなく、Google Cloud Pub/Sub から読み取ってストリーミング処理を開始します。

また、LeaderBoard パイプラインは、処理時間とイベント時間の両方に関してゲーム スコアデータを処理する方法も示しています。LeaderBoard は異なる時間について、個々のユーザースコアとチームスコアの両方に関するデータを出力します。

LeaderBoard パイプラインではゲームデータが生成された時点でそれをストリーミング ソースから読み取るため、パイプラインをゲームプロセスと同時に実行されるジョブとみなすことができます。LeaderBoard はこのため、たとえばライブのウェブベース スコアカードを提供して、ユーザーが他のユーザーのプレイの進行状況をトラックできるようにするなど、ユーザーが任意のタイミングでゲームをプレイしているときのレイテンシを低くすることができます。

LeaderBoard の機能

LeaderBoard パイプラインはほぼリアルタイムで Pub/Sub に公開されたゲームデータを読み取り、そのデータを使用して 2 つの別の処理タスクを実行します。

  • LeaderBoard は一意のユーザー全員の合計スコアを計算し、処理時間の 10 分ごとに推測的な結果を公開します。つまり、パイプラインでその日に処理されたユーザー別の合計スコアが 10 分おきに出力されます。この計算では、実際のゲームイベントがいつ生成されたかに関係なく、ほぼリアルタイムで「スコアボード」を付けることができます。
  • LeaderBoard はパイプラインで実行される時間ごとのチームのスコアを計算します。これは、1 時間ごとにプレイの上位スコアのチームに褒賞を与える場合などに役立ちます。チームスコアの計算には、固定時間のウィンドウ処理を使用して、データがパイプラインで受信されると、入力データをイベント時間(タイムスタンプで表示)に基づく 1 時間ごとの有限のウィンドウに分割する固定時間のウィンドウ処理を使用します。

    また、チームスコアの計算では、Dataflow のトリガー メカニズムを使用して 1 時間ごとの見込みの結果(これは時間切れになるまで 5 分おきに更新されます)を提供します。遅延データがあればそれをキャプチャし、そのデータが属する特定時間のウィンドウに追加します。

次に、この両方のタスクについて詳しく説明します。

処理時間に基づくユーザースコアの計算

パイプラインの実行中に 10 分おきに各ユーザーのその時点の合計スコアを出力できるようにします。この計算では、ユーザーのプレイ インスタンスで実際のスコアがいつ生成されたかは考慮されず、単にその日にパイプラインで受信されたユーザーのすべてのスコアの合計が出力されます。パイプラインの実行中であればいつでも、受信した遅延データが計算に含まれます。

計算を更新するときは、常にパイプラインで受信されたデータがすべて必要なため、パイプラインでは、すべてのユーザーのスコアデータが単一のグローバル ウィンドウになるものとみなします。単一のグローバル ウィンドウは制限されていませんが、処理時間トリガーを使用して、10 分おきの計算のために一時的な締め切りを指定できます。

単一のグローバル ウィンドウで 10 分の処理時間トリガーを指定すると、ウィンドウでトリガーされるたび(10 分おき)に、パイプラインによってコンテンツの「スナップショット」が実際にとられるようになります。単一のグローバル ウィンドウを使用しているため、それぞれのスナップショットにはその時点までに収集されたデータがすべて含まれます。次の図は、処理時間トリガーを単一のグローバル ウィンドウで使用した場合の結果を示します。

図 3: 3 ユーザーのスコアデータ。各ユーザーのスコアは単一のグローバル ウィンドウにまとめられ、10 分おきに出力のスナップショットを生成するトリガーを含みます。

処理時間が経過し、処理されたスコアが増加すると、トリガーによって各ユーザーの合計の更新値が出力されます。

次のコード例では、LeaderBoard がどのように処理時間トリガーを設定して、ユーザーのスコアデータを出力するかを示します。

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing.
 * Get periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
    return input.apply("LeaderboardUserGlobalWindow",
        Window.<GameActionInfo>into(new GlobalWindows())
            // Get periodic results every ten minutes.
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
            .accumulatingFiredPanes()
            .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

LeaderBoard はユーザースコア計算に累積トリガーを使用します(トリガーを設定するときに .accumulatingFiredPanes を呼び出します)。累積トリガーを使用すると、これまでに返されたデータと、前回トリガーを実行してから受信した新しいデータがパイプラインで累積されるようになります。これにより、LeaderBoard は個々の合計のコレクションではなく、ユーザースコアのその時点の合計になります。

イベント時間に基づくチームスコアの計算

パイプラインでは、プレイ中の 1 時間おきに、各チームの合計スコアも出力する必要があります。ユーザースコアの計算とは異なり、チームスコアでは 1 時間ごとのプレイをそれぞれ考慮する必要があるため、各スコアが実際に発生したイベント時間を考慮する必要があります。また、1 時間ごとの進行状況に応じた見込みの更新を示し、特定のデータの完了後に受信した遅延データのインスタンスも計算に含められるようにする必要があります。

それぞれの時間は個別に考慮されるため、HourlyTeamScore と同様に、固定時間のウィンドウ処理を入力データにも適用できます。見込みの更新と遅延データの更新を提供するため、追加のトリガー パラメータも指定します。トリガーによって、それぞれのウィンドウで指定した間隔(この場合は 5 分間隔)で計算され、その結果が得られ、また、遅延データのためにウィンドウが「完了」した後にもトリガーが保持されます。ユーザースコアの計算と同様に、トリガーを累積モードに設定して、1 時間ごとのウィンドウでその時点の合計が得られるようにトリガーを設定します。

見込みの更新と遅延データのトリガーによって、タイムスキューの問題を解決できます。パイプラインでのイベントは、タイムスタンプに応じて実際に発生した順序で処理する必要はないため、パイプラインには決まった順序なく、または(ユーザーのスマートフォンがネットワークに接続されていないときに生成された場合など)遅れて送信されることがあります。Dataflow では適宜、特定のウィンドウでいつ「すべて」のデータが揃ったかを判断できる手段が必要であり、これは「ウォーターマーク」と呼ばれます。

理想としては、すべてのデータを発生時にすぐに処理し、処理時間がイベント時間と同じ、または最低限、直線の関係になることが望ましいとされます。ただし、分散システムには、それぞれ不確実な要素(スマートフォンからの報告の遅れなど)があるため、Dataflow ではヒューリスティックなウォーターマークを使用することがあります。

次の図は、2 つのチームにおける継続中の処理時間と、それぞれのスコアのイベント時間の関係を示しています。

図 4: イベント時間ごとに振動処理されたチーム別スコアデータ。処理時間に基づくトリガーによって、見込みの早期結果が得られますが、遅延結果も含むことができます。

図の点線は、指定のウィンドウですべてのデータを適宜受信しているとみなす、Dataflow の概念である「想定される」ウォーターマークを示します。不規則な実線は実際のウォーターマークを表し、データソース(この場合は Pub/Sub)によって決定したものです。

固定のウォーターマークより上で受信されたデータは、遅延データ、つまり、遅延したスコアイベント(オフラインで生成されたものなど)であり、そのウィンドウが終了した後に受信されたものです。このパイプラインの遅延呼び出しトリガーでは、このような遅延データも合計に含むことができます。

次のコード例では、LeaderBoard によって固定時間のウィンドウ処理と適切なトリガーがどのように適用され、パイプラインで必要な計算が実行されるかを示します。

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
    return infos.apply("LeaderboardTeamFixedWindows",
        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
            // We will get early (speculative) results as well as cumulative
            // processing of late data.
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(FIVE_MINUTES))
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(TEN_MINUTES)))
            .withAllowedLateness(allowedLateness)
            .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

まとめると、このような処理戦略では、データ処理に同じ基本的な変換を使用していても、UserScore および HourlyTeamScore パイプラインのレイテンシと完全性に関する問題に対応できます。つまり、両方の計算に、 と HourlyTeamScore の両方のパイプラインで使用したものと同じ ExtractAndSumScore 変換を継続して使用できます。

GameStats: 不正行為の検出と使用分析

LeaderBoard では基本的なウィンドウ処理の使用法を示し、柔軟な低レイテンシのデータ分析を実行できますが、さらに高度なウィンドウ処理技術を使用して、より広範囲な分析を実行することもできます。これにはシステムの不正行為(スパムなど)の検出、またはユーザーの行動の分析のための計算があります。GameStats パイプラインでは、Dataflow を使用して、この種の高度な分析を実行する方法を示す、低レイテンシの機能を LeaderBoard に構築しています。

LeaderBoard と同様に、GameStats ではストリーミング ソース(この場合は Pub/Sub)からデータが読み取られます。これは、ユーザーのプレイ中にゲームを分析する、進行中ジョブとして最も優れた手段です。

GameStats の機能

LeaderBoard と同様に、GameStats では 1 時間ごとのチーム別のスコアを計算します。ただし、このパイプラインではより複雑な 2 種類の分析も実行されます。

  • GameStats不正行為の検出を行います。その際、スコアデータの簡単な統計分析を実行し、スパマーまたはボットが存在するかどうか、存在する場合はどのユーザーかを判断します。次に、疑わしいスパムユーザーまたはボットユーザーのリストを使用して、1 時間ごとのチームスコア計算からそのボットを除外します。
  • GameStats使用パターンを分析します。その際、セッションのウィンドウ処理を使用して、互いに近いイベント時間を持つゲームデータをまとめます。これによって、ユーザーのプレイ時間の傾向や、ゲーム時間の変化などに関する情報を収集できます。

これらの機能の詳細について、次に示します。

不正行為の検出

ゲームのスコアは、ユーザーがスマートフォンを「クリック」する速度によって決まるものと仮定します。GameStats の不正行為検出では、各ユーザーのスコアデータを分析して、ユーザーの「クリック速度」が異常に速く、スコアも異常に高くなっているものを検出します。これは、ゲームをボットがプレイしており、人がプレイできない速度で操作されている可能性があることを示しています。

スコアが「異常に」高いかどうかを判断するために、GameStats ではその固定時間のウィンドウですべてのスコアの平均を計算し、平均スコアに任意の重み付け係数(この場合は 2.5)を乗算したものに対して個々のスコアを確認します。これによって、平均の 2.5 倍以上になるスコアは、スパムによるものと考えられます。GameStats パイプラインは「スパム」ユーザーのリストをトラックし、そのようなユーザーを、チームのスコアボードでのチームスコア計算から除外します。

平均はパイプライン データによって異なるため、平均を計算し、その計算データをそれ以降の ParDo 変換に使用して、重み付けした値を超えるスコアを除外する必要があります。これを行うには、計算した平均値を副入力としてフィルタリング ParDo に渡します。

次のコード例では、不正行為検出を扱う複合変換を示します。この変換では、Sum.integersPerKey 変換を使用してユーザー別のスコアをすべて合計し、Mean.globally 変換を使用してすべてのユーザーの平均スコアを決定します。この計算が(PCollectionView シングルトンとして)完了したら、.withSideInputs を使用してフィルタリング ParDo に渡すことができます。

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores = userScores
        .apply("UserSum", Sum.<String>integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
        .apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered = sumScores
        .apply(ParDo
            .named("ProcessAndFilter")
            // use the derived mean total score as a side input
            .withSideInputs(globalMeanScore)
            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
              private final Aggregator<Long, Long> numSpammerUsers =
                createAggregator("SpammerUsers", new Sum.SumLongFn());
              @Override
              public void processElement(ProcessContext c) {
                Integer score = c.element().getValue();
                Double gmc = c.sideInput(globalMeanScore);
                if (score > (gmc * SCORE_WEIGHT)) {
                  LOG.info("user " + c.element().getKey() + " spammer score " + score
                      + " with mean " + gmc);
                  numSpammerUsers.addValue(1L);
                  c.output(c.element());
                }
              }
            }));
    return filtered;
  }
}

不正行為検出の変換によって、スパムボットとして疑わしいユーザーのビューが生成されます。その後、パイプライン内でそのビューを使用して、1 時間ごとのチームスコアを計算するときに副入力のメカニズムを再度使用して、疑わしいユーザーを除外できます。次のコードでは、スコアを固定ウィンドウでウィンドウ処理し、チームスコアを抽出するまでの間にスパムフィルタを挿入する例を示します。

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
  .apply(Window.named("WindowIntoFixedWindows")
      .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getFixedWindowDuration())))
      )
  // Filter out the detected spammer users, using the side input derived above.
  .apply(ParDo.named("FilterOutSpammers")
          .withSideInputs(spammersView)
          .of(new DoFn<GameActionInfo, GameActionInfo>() {
            @Override
            public void processElement(ProcessContext c) {
              // If the user is not in the spammers Map, output the data element.
              if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                c.output(c.element());
              }
            }}))
  // Extract and sum teamname/score pairs from the event data.
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

使用パターンの分析

それぞれのゲームスコアのイベント時間を調べ、イベント時間が近いスコアをセッションにグループ化して、ユーザーがいつ、何時間ゲームをプレイしているかを分析できます。 GameStats では Dataflow 組み込みのセッション ウィンドウ処理関数を使用して、ユーザーのスコアをその発生時間に基づいてセッションに分割します。

セッションのウィンドウ処理を設定する場合は、イベント間の最小ギャップ時間を指定します。受信時間の間隔が最小ギャップ時間より近いイベントはすべて、同じウィンドウにまとめられます。受信時間の間隔がこのギャップ時間より長いイベントは、複数のウィンドウにまとめられます。最小ギャップ時間の設定に応じて、同じセッション ウィンドウ内のスコアが(相対的に)連続している同じプレイ時間に含まれるものと想定できます。ウィンドウが異なるスコアは、そのユーザーがゲームを停止し、プレイを再開するまで最小ギャップ時間以上の時間が経過したことを示します。

セッション ウィンドウにまとめられたデータは、次の図のように表示されます。固定ウィンドウとは異なり、セッション ウィンドウはユーザーのプレイパターンに応じて、ユーザーごとに異なります。

セッション ウィンドウ処理を表す図
図 5: ユーザー セッションと最小ギャップ時間。各ユーザーのセッションは、プレイしたインスタンス数とインスタンス間の休憩時間によって異なることに注意してください。

セッションのウィンドウ処理データを使用して、すべてのユーザーが連続してプレイした時間の平均と、各セッションでユーザーが得た合計スコアを計算できます。これは、最初にセッション ウィンドウを適用し。ユーザーごと、およびセッションごとのスコアを合計して、変換を使用して各セッションの長さを計算することによって得ることができます。

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
  .apply(Window.named("WindowIntoSessions")
        .<KV<String, Integer>>into(
              Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
    .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
  // For this use, we care only about the existence of the session, not any particular
  // information aggregated over it, so the following is an efficient way to do that.
  .apply(Combine.perKey(x -> 0))
  // Get the duration per session.
  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))

これにより、一連のユーザー セッションとその時間を把握できます。次に、データを固定時間ウィンドウに再度ウィンドウ処理して、平均セッション時間を計算し、1 時間ごとに終了したすべてのセッションの平均時間を計算できます。

// Re-window to process groups of session sums according to when the sessions complete.
.apply(Window.named("WindowToExtractSessionMean")
      .<Integer>into(
          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
       new WriteWindowedToBigQuery<Double>(
          options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));

結果として生じた情報を使用して、ユーザーが何時頃に最も長時間プレイしているか、または何時頃のセッションのプレイが最も短時間で終わっているかを調べることができます。

詳細情報

これらの例の詳細については、GitHub の Dataflow SDK の例master-1.x ブランチ)に含まれているサンプル パイプラインを実行して確認できます。パイプラインの例を実行する手順については、サンプル ディレクトリにある README.md をご覧ください。

次のブログ投稿と動画では、パッチ処理とストリーミング処理のための Dataflow の統一データモデルに関する詳細なコンテキストと情報を提供しています。

  • Dataflow と Spark のブログ投稿 - Google エンジニアによるブログ投稿。モバイルゲーム分野の例を使用して Dataflow モデルと Apache Spark のモデルを比較しています。
  • Dataflow トーク @Scale - @Scale カンファレンスで、Google エンジニア Frances Perry が行ったプレゼンテーションの動画。前述のモバイルゲームの早期バージョンを使用した Dataflow モデルについて説明します。
  • The World Beyond Batch: Streaming 101 および Streaming 102 - Google エンジニア Tyler Akidau による、O'Reilly ブログへの 2 部連続投稿。将来のビッグデータ処理について説明します。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。