모바일 게임 파이프라인 예

이 섹션에서는 기본적인 WordCount 예시보다 더욱 복잡한 기능을 시연하는 일련의 Dataflow 파이프라인 예시를 살펴봅니다. 이 섹션의 파이프라인은 사용자가 휴대전화에서 플레이하는 가상의 게임에서 추출한 데이터를 처리합니다. 이 파이프라인은 복잡성이 증가하는 처리를 보여줍니다. 예를 들어 첫 번째 파이프라인에서는 배치 분석 작업을 실행하여 비교적 간단한 점수 데이터를 수집하는 방법을 보여주고, 이후 파이프라인에서는 Dataflow의 기간 설정 및 트리거 기능을 사용하여 사용자 플레이 패턴에 대한 지연 시간이 짧은 데이터 분석과 더욱 복잡한 정보를 제공합니다.

사용자가 가상의 모바일 게임 인스턴스를 실행할 때마다 데이터 이벤트가 생성됩니다. 각 데이터 이벤트는 다음 정보로 구성됩니다.

  • 게임을 플레이하는 사용자의 고유 ID
  • 사용자가 속한 팀의 팀 ID
  • 해당 플레이 인스턴스의 점수
  • 특정 플레이 인스턴스를 실행한 시점을 기록하는 타임스탬프 - 각 게임 데이터 이벤트의 이벤트 시간입니다.

사용자가 게임 인스턴스를 완료하면 휴대전화는 데이터 이벤트를 게임 서버로 전송하고, 데이터는 해당 서버에서 로깅되어 파일로 저장됩니다. 일반적으로 데이터는 완료 즉시 게임 서버로 전송됩니다. 하지만 휴대전화가 서버에 연결되어 있지 않은 경우(예: 비행기 안, 네트워크 범위 밖), 사용자는 게임을 '오프라인'으로 즐길 수 있습니다. 게임 서버에 사용자의 휴대전화가 다시 연결되면 휴대전화는 누적된 게임 데이터를 전부 전송합니다.

즉 일부 데이터 이벤트는 사용자가 생성한 시점보다 훨씬 늦게 게임 서버에 수신될 수도 있습니다. 이 시간차는 각 점수가 생성된 시점을 고려하여 계산을 수행하는 파이프라인에서 처리 함의를 가질 수 있습니다. 이러한 파이프라인은 예를 들어 하루 중 매 시간 동안 생성된 점수를 추적하거나 사용자가 게임을 지속하는 시간의 길이를 계산할 수 있습니다. 양쪽 모두 각 데이터 레코드의 이벤트 시간에 의존합니다.

모바일 게임 예제 파이프라인의 복잡성은 단순한 배치 분석부터 실시간 분석과 악용 감지를 수행할 수 있는 좀 더 복잡한 파이프라인까지 다양합니다. 이 섹션에서는 각 예를 살펴보고 기간 설정트리거와 같은 Dataflow 기능을 사용하여 파이프라인 기능을 확장하는 방법을 설명합니다.

UserScore: 기본 배치 점수 처리

UserScore 파이프라인은 모바일 게임 데이터를 처리하는 가장 단순한 예시입니다. UserScore는 유한한 데이터 세트 동안의 사용자별 총 점수(예: 게임 서버에 저장된 하루치 점수)를 결정합니다. UserScore와 같은 파이프라인은 모든 관련 데이터를 수집한 후 주기적으로 실행하는 것이 가장 좋습니다. 예를 들어 UserScore는 해당 일자에 수집한 데이터에 대해 야간 작업으로 실행할 수 있습니다.

UserScore는 어떤 역할을 하나요?

사용자가 분석 기간 중 게임 인스턴스를 두 번 이상 실행했을 경우 하루치의 점수 데이터에서 각 사용자 ID는 각각 점수 값과 타임스탬프가 있는 레코드를 여러 개 가질 수 있습니다. 하루 동안 사용자가 플레이한 모든 인스턴스의 총 점수를 확인하려면 파이프라인에서 모든 레코드를 개별 사용자별로 그룹화해야 합니다.

파이프라인이 각 이벤트를 처리하는 동안 이벤트 점수는 해당 사용자의 총계에 추가됩니다.

UserScore는 각 레코드에서 필요한 데이터, 특히 사용자 ID와 점수 값만을 파싱합니다. 파이프라인은 모든 레코드의 이벤트 시간을 고려하지 않습니다. 파이프라인을 실행할 때 지정한 입력 파일에 있는 모든 데이터를 처리할 뿐입니다.

UserScore의 기본 파이프라인 흐름은 다음 작업을 수행합니다.

  1. Google Cloud Storage에 저장된 파일에서 하루치 점수 데이터를 읽습니다.
  2. 각 게임 이벤트를 사용자 ID로 그룹화하고 점수 값을 결합하여 각 고유 사용자별 점수 값을 합산하고 해당 사용자의 총 점수를 구합니다.
  3. BigQuery 테이블에 결과 데이터를 작성합니다.

다음 다이어그램은 파이프라인 분석 기간 동안 여러 사용자의 점수 데이터를 보여줍니다. 다이어그램에서 각 데이터 포인트는 사용자-점수 쌍 한 개로 나타나는 이벤트입니다.

그림 1: 사용자 3명의 점수 데이터

이 예에서는 일괄 처리를 사용하며, 다이어그램의 Y축은 처리 시간을 나타냅니다. 파이프라인은 Y축 하단의 이벤트를 먼저 처리한 후 축 상단의 이벤트를 처리합니다. 다이어그램의 X축은 각 게임 이벤트의 이벤트 시간을 해당 이벤트의 타임스탬프로 표시한 것입니다. 다이어그램의 개별 이벤트는 파이프라인에서 발생한 시간(타임스탬프 기준)과 동일한 순서로 처리되지 않습니다.

입력 파일에서 점수 이벤트를 읽은 후 파이프라인은 모든 사용자-점수 쌍을 그룹화하고 점수 값을 사용자당 하나의 합계 값으로 합산합니다. UserScore는 이 단계의 핵심 논리를 맞춤 설정 복합 변환 ExtractAndSumScore로 캡슐화합니다.

public static class ExtractAndSumScore
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {

  private final String field;

  ExtractAndSumScore(String field) {
    this.field = field;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(
      PCollection<GameActionInfo> gameInfo) {

    return gameInfo
      .apply(MapElements
          .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
          .withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
      .apply(Sum.<String>integersPerKey());
  }
}

ExtractAndSumScore는 데이터를 그룹화할 필드(게임의 경우 고유 사용자 또는 고유 아이템)를 전달할 수 있다는 점에서 보다 일반적으로 작성되었다고 할 수 있습니다. 즉 팀별로 점수 데이터를 그룹화하는 다른 파이프라인에서 ExtractAndSumScore를 다시 사용할 수 있습니다.

파이프라인의 세 단계를 모두 적용하는 UserScore의 주 메소드는 다음과 같습니다.

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  // Read events from a text file and parse them.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))
    // Extract and sum username/score pairs from the event data.
    .apply("ExtractUserScore", new ExtractAndSumScore("user"))
    .apply("WriteUserScoreSums",
        new WriteToBigQuery<KV<String, Integer>>(options.getTableName(),
                                                 configureBigQueryWrite()));

  // Run the batch pipeline.
  pipeline.run();
}

결과 작업

UserScore는 BigQuery 테이블에 데이터를 작성합니다(기본적으로 user_score라고 부릅니다). BigQuery 테이블의 데이터를 사용하여 특정한 날 최고 점수를 기록한 사용자 N명의 목록을 쿼리하는 등 양방향 분석을 추가로 수행할 수 있습니다.

특정한 날 최고 점수를 기록한 사용자 10명을 양방향으로 확인한다고 가정해 봅시다. BigQuery 사용자 인터페이스에서 다음 쿼리를 실행할 수 있습니다.

SELECT * FROM [MyGameProject:MyGameDataset.user_score] ORDER BY total_score DESC LIMIT 10

제한사항

이 예시에서 설명한 것처럼 UserScore 파이프라인에는 몇 가지 제한사항이 있습니다.

  • 일부 점수 데이터는 오프라인 플레이어에서 생성되고 일일 마감 시간 이후에 전송되므로 UserScore 파이프라인에서 생성된 결과 데이터는 불완전할 수 있습니다. UserScore는 파이프라인 실행 시 입력 파일에 있는 고정된 입력 집합만 처리합니다.
  • UserScore는 처리 시간에 입력 파일의 모든 데이터 이벤트를 처리하고 이벤트 시간을 기준으로 이벤트를 검사하거나 별도로 오류를 확인하지 않습니다. 따라서 결과에는 이벤트 시간이 관련 분석 기간에 해당하지 않는 일부 값(예: 늦게 수신된 전날 레코드)이 포함될 수 있습니다.
  • UserScore는 모든 데이터를 수집한 후에만 실행되므로 사용자가 데이터 이벤트를 생성하는 시점(이벤트 시간)과 결과를 계산하는 시점(처리 시간) 사이에 긴 지연 시간이 발생합니다.
  • 또한 UserScore는 하루 전체의 총 결과만 보고하며 하루 중 데이터의 누적 방식에 대한 세부 정보를 제공하지 않습니다.

다음 파이프라인 예부터는 Dataflow의 기능을 사용하여 이러한 제한사항을 해결하는 방법에 대해 알아보겠습니다.

HourlyTeamScore: 기간 설정을 통한 고급 배치 처리

HourlyTeamScore 파이프라인은 UserScore 파이프라인에서 사용한 기본 배치 분석 원칙을 확장하여 일부 제한사항을 개선합니다. HourlyTeamScore는 Dataflow SDK의 추가 기능을 사용하고 게임 데이터의 더 많은 측면을 감안하여 세부적인 분석을 수행합니다. 예를 들어 HourlyTeamScore는 관련 분석 기간에 해당하지 않는 데이터를 필터링할 수 있습니다.

UserScore와 마찬가지로 HourlyTeamScore는 관련 데이터를 모두 수집한 후 주기적으로(예: 하루 1회) 실행하는 것이 가장 좋습니다. 파이프라인은 파일에서 고정 데이터 세트를 읽고 UserScore와 마찬가지로 BigQuery 테이블에 결과를 작성합니다.

HourlyTeamScore는 어떤 역할을 하나요?

HourlyTeamScore는 하루치 점수 등의 고정 데이터 세트에서 팀별 시간당 총 점수를 계산합니다.

  • HourlyTeamScore는 한 번에 전체 데이터 세트를 작업하는 대신 입력 데이터를 논리적 기간으로 분할하여 해당 기간에 대해 계산을 수행합니다. 이를 통해 HourlyUserScore기간별 점수 데이터 정보를 제공할 수 있으며, 여기서 각 기간은 고정된 시간 간격에서의 게임 점수 진행 상태를 나타냅니다(예: 1시간마다 1번).
  • HourlyTeamScore는 삽입된 타임스탬프로 표시되는 이벤트 시간이 관련 분석 기간 내에 있는지 여부를 기준으로 데이터 이벤트를 필터링합니다. 기본적으로 파이프라인은 각 게임 이벤트의 타임스탬프를 검사하여 타임스탬프가 분석할 범위(여기서는 해당 날짜) 내에 있는지 확인합니다. 이전 날짜의 데이터 이벤트는 폐기되고 점수 합계에 포함되지 않습니다. 이를 통해 HourlyTeamScoreUserScore보다 더욱 강력하고 결과 데이터의 오류가 적게 발생합니다. 또한 파이프라인에서 관련 분석 기간 내에 타임스탬프가 있는 늦게 제공된 데이터를 감안할 수 있습니다.

아래에서는 이러한 HourlyTeamScore의 개선사항을 자세히 살펴보겠습니다.

고정 기간 설정

고정 기간 설정을 사용하면 파이프라인에서 분석 기간 동안 데이터 세트의 이벤트 누적 방법에 대해 더 나은 정보를 제공할 수 있습니다. 여기에서는 하루 중 각 팀이 활성이었던 시간과 그 시간 동안 얻은 점수를 알려줍니다.

다음 다이어그램은 고정 기간 설정을 적용한 후 파이프라인에서 한 팀의 하루치 점수 데이터를 처리하는 방법을 보여줍니다.

그림 2: 2개 팀의 점수 데이터. 각 팀의 점수는 이벤트 시간에서 점수가 발생한 시점을 기준으로 논리적 기간으로 분할됩니다.

처리 시간이 진행됨에 따라 기간별로 합계가 집계됩니다. 각 기간은 점수가 발생한 날의 이벤트 시간 1시간을 나타냅니다.

Dataflow의 기간 설정 기능은 PCollection의 각 요소에 연결된 기본 타임스탬프 정보를 사용합니다. 파이프라인에서 이벤트 시간을 기준으로 기간을 설정하려 하므로 우선 각 데이터 레코드에 삽입되어 있는 타임스탬프를 추출하고 점수 데이터 PCollection의 해당 요소에 적용해야 합니다. 그러면 파이프라인은 윈도우 함수를 적용하여 PCollection을 논리적 기간으로 분할할 수 있습니다.

다음은 HourlyTeamScoreWithTimestamps 변환과 Window 변환을 사용하여 이러한 작업을 수행하는 방법을 보여주는 코드입니다.

// Add an element timestamp based on the event log, and apply fixed windowing.
.apply("AddEventTimestamps",
       WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
.apply(Window.named("FixedWindowsTeam")
    .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getWindowDuration()))))

파이프라인이 기간 설정을 지정하기 위해 사용하는 변환은 실제 데이터 처리 변환(예: ExtractAndSumScores)과 구분됩니다. 이 기능을 사용하면 Dataflow 파이프라인 설계에서 서로 다른 기간 설정 특징을 가진 데이터 세트에 기존 변환을 실행하는 유연성을 확보할 수 있습니다.

이벤트 시간에 따른 필터링

HourlyTeamScore필터링을 사용하여 관련 분석 기간에 속하지 않는 타임스탬프가 있는 데이터세트의 이벤트(즉, 필요한 날짜에 생성되지 않은 이벤트)를 삭제합니다. 이렇게 하면 가령 전날에 오프라인에서 생성되었지만 당일에 게임 서버로 전송된 모든 데이터가 파이프라인에 잘못 포함되지 않습니다.

또한 파이프라인에 관련 지연 데이터, 즉 유효한 타임스탬프가 포함되었지만 분석 기간이 종료된 후 전달된 데이터 이벤트를 추가할 수 있습니다. 파이프라인 마감 시간이 오전 12시일 경우, 파이프라인을 오전 2시에 실행해도 오전 12시 마감 이후 발생한 타임스탬프가 있는 모든 이벤트를 필터링할 수 있습니다. 오전 12시 01분에서 2시 사이에 지연되어 전달되었지만 이벤트가 오전 12시 마감 이전에 발생했음을 나타내는 타임스탬프가 있는 데이터 이벤트는 파이프라인 처리에 포함됩니다.

HourlyTeamScoreFilter 변환을 사용하여 이 작업을 수행합니다. Filter를 적용할 때 각 데이터 레코드를 비교할 조건자를 지정해야 합니다. 비교를 통과한 데이터 레코드는 포함되고, 비교에 실패한 이벤트는 제외됩니다. 이 사례에서 조건자는 지정한 마감 시간이며, 데이터에서 타임스탬프 필드 한 부분만 비교합니다.

다음 코드는 HourlyTeamScoreFilter 변환을 사용하여 관련 분석 기간 전 또는 후에 발생한 이벤트를 필터링하는 방법을 보여줍니다.

.apply("FilterStartTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
.apply("FilterEndTime", Filter.byPredicate(
    (GameActionInfo gInfo)
        -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

팀별, 기간별 점수 계산

HourlyTeamScoreUserScore 파이프라인과 동일한 ExtractAndSumScores 변환을 사용하지만 다른 키(사용자가 아닌 팀)를 전달합니다. 또한 이 파이프라인은 입력 데이터의 기간 설정을 1시간 고정으로 지정한 ExtractAndSumScores를 적용하므로 데이터가 팀별 기간별로 그룹화됩니다. HourlyTeamScore의 주 메소드에서 전체 변환 시퀀스를 확인할 수 있습니다.

public static void main(String[] args) throws Exception {
  // Begin constructing a pipeline configured by commandline flags.
  Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
  Pipeline pipeline = Pipeline.create(options);

  final Instant stopMinTimestamp = new Instant(minFmt.parseMillis(options.getStopMin()));
  final Instant startMinTimestamp = new Instant(minFmt.parseMillis(options.getStartMin()));

  // Read 'gaming' events from a text file.
  pipeline.apply(TextIO.Read.from(options.getInput()))
    // Parse the incoming data.
    .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()))

    // Filter out data before and after the given times so that it is not included
    // in the calculations. As we collect data in batches (say, by day), the batch for the day
    // that we want to analyze could potentially include some late-arriving data from the previous
    // day. If so, we want to weed it out. Similarly, if we include data from the following day
    // (to scoop up late-arriving events from the day we're analyzing), we need to weed out events
    // that fall after the time period we want to analyze.
    .apply("FilterStartTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() > startMinTimestamp.getMillis()))
    .apply("FilterEndTime", Filter.byPredicate(
        (GameActionInfo gInfo)
            -> gInfo.getTimestamp() < stopMinTimestamp.getMillis()))

    // Add an element timestamp based on the event log, and apply fixed windowing.
    .apply("AddEventTimestamps",
           WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
    .apply(Window.named("FixedWindowsTeam")
        .<GameActionInfo>into(FixedWindows.of(
              Duration.standardMinutes(options.getWindowDuration()))))

    // Extract and sum teamname/score pairs from the event data.
    .apply("ExtractTeamScore", new ExtractAndSumScore("team"))
    .apply("WriteTeamScoreSums",
      new WriteWindowedToBigQuery<KV<String, Integer>>(options.getTableName(),
          configureWindowedTableWrite()));

  pipeline.run();
}

제한사항

위에서 설명한 대로 HourlyTeamScore에는 여전히 제한사항이 남아 있습니다.

  • 배치 파이프라인의 특성상 모든 데이터가 전달될 때까지 처리를 기다려야 하므로 HourlyTeamScore에서는 데이터 이벤트가 발생한 시점(이벤트 시간)과 결과가 생성된 시점(처리 시간) 간에 여전히 긴 지연 시간이 발생합니다.

LeaderBoard: 실시간 게임 데이터를 통한 스트리밍 처리

UserScoreHourlyTeamScore 파이프라인의 지연 시간 문제를 해결하는 방법으로는 스트리밍 소스에서 점수 데이터를 읽는 방법이 있습니다. LeaderBoard 파이프라인은 게임 서버의 파일이 아닌 Google Cloud Pub/Sub에서 게임 점수 데이터를 읽어 스트리밍 처리를 수행합니다.

또한 LeaderBoard 파이프라인은 처리 시간이벤트 시간과 관련된 게임 점수 데이터를 처리하는 방법을 보여줍니다. LeaderBoard는 각각 다른 타임프레임을 포함한 개별 사용자 점수와 팀 점수에 관한 데이터를 출력합니다.

LeaderBoard 파이프라인은 데이터가 생성될 때마다 스트리밍 소스에서 게임 데이터를 읽기 때문에 파이프라인을 게임 진행과 동시에 실행되는 지속적인 작업으로 간주할 수 있습니다. 따라서 LeaderBoard는 사용자가 특정 시점에서 게임을 하는 방식에 대한 지연 시간이 짧은 정보를 제공할 수 있습니다. 이러한 정보는 예를 들어 사용자가 게임을 하면서 다른 사용자와 진행 상태를 비교할 수 있도록 실시간 웹 기반 점수판을 제공하는 때 유용합니다.

LeaderBoard는 어떤 역할을 하나요?

LeaderBoard 파이프라인은 거의 실시간으로 Pub/Sub에 게시된 게임 데이터를 읽고 이 데이터를 사용하여 두 가지 별도의 처리 작업을 수행합니다.

  • LeaderBoard는 각 순 사용자의 총 점수를 계산하고 처리 시간(10분)마다 예측 결과를 게시합니다. 즉 파이프라인은 10분마다 지금까지 처리한 사용자별 총 점수를 출력합니다. 이 계산은 실제 게임 이벤트가 생성된 시점에 상관없이 실시간에 가깝게 실행되는 '리더보드'를 제공합니다.
  • LeaderBoard는 파이프라인이 실행되는 매 시간의 팀 점수를 계산합니다. 이 방법은 예를 들어 매 시간마다 최고 득점을 달성한 팀에게 보상을 제공하려는 경우 유용합니다. 파이프라인에 데이터가 전달되면 팀 점수 계산은 고정 기간 설정을 사용하여 타임스탬프로 표시된 이벤트 시간을 기준으로 입력 데이터를 한 시간 단위의 유한한 기간으로 분할합니다.

    또한 팀 점수 계산은 Dataflow의 트리거 메커니즘을 사용하여 매 시간(시간 종료 시까지 5분 단위로 업데이트)의 예측 결과를 제공하고 지연된 데이터를 캡처하여 해당 시간 단위 기간에 추가합니다.

아래에서는 이러한 작업을 자세히 살펴보겠습니다.

처리 시간을 기반으로 사용자 점수 계산

파이프라인이 실행되는 동안 10분 간격으로 각 사용자의 진행 중인 총 점수를 출력하려고 합니다. 이 계산은 사용자의 게임 인스턴스가 실제 점수를 생성한 시점을 고려하지 않고 출력 시점까지 파이프라인에 전달된 해당 사용자의 모든 점수의 합계를 출력합니다. 지연 데이터는 실행 중인 파이프라인에 전달될 때마다 계산에 포함됩니다.

계산을 업데이트할 때마다 파이프라인에 전달된 모든 데이터가 필요하므로 파이프라인이 단일 전역 기간의 모든 사용자 점수 데이터를 고려해야 합니다. 단일 전역 기간은 제한되지 않지만 처리 시간 트리거를 사용하여 10분 단위로 계산의 임시 마감 지점을 지정할 수 있습니다.

단일 전역 기간에 10분의 처리 시간 트리거를 지정하면 파이프라인은 트리거가 10분 간격으로 실행될 때마다 기간 내 콘텐츠의 '스냅샷'을 생성합니다. 단일 전역 기간을 사용하므로 각 스냅샷은 해당 시점의 모든 데이터를 포함합니다. 다음 다이어그램은 단일 전역 기간에 처리 시간 트리거를 사용하는 효과를 보여줍니다.

그림 3: 사용자 3명의 점수 데이터 각 사용자의 점수는 10분 간격으로 출력용 스냅샷을 생성하는 트리거와 함께 단일 전역 기간으로 그룹화됩니다.

처리 시간이 진행되고 더 많은 점수가 처리되면 트리거는 업데이트된 각 사용자별 합계를 출력합니다.

다음 코드 예시는 LeaderBoard가 처리 시간 트리거를 설정하여 사용자 점수 데이터를 출력하는 방법을 보여줍니다.

/**
 * Extract user/score pairs from the event stream using processing time, via global windowing.
 * Get periodic updates on all users' running scores.
 */
@VisibleForTesting
static class CalculateUserScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration allowedLateness;

  CalculateUserScores(Duration allowedLateness) {
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> input) {
    return input.apply("LeaderboardUserGlobalWindow",
        Window.<GameActionInfo>into(new GlobalWindows())
            // Get periodic results every ten minutes.
            .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(TEN_MINUTES)))
            .accumulatingFiredPanes()
            .withAllowedLateness(allowedLateness))
        // Extract and sum username/score pairs from the event data.
        .apply("ExtractUserScore", new ExtractAndSumScore("user"));
  }
}

LeaderBoard는 트리거를 설정할 때 .accumulatingFiredPanes를 호출하여 사용자 점수 계산에 누적 트리거를 사용합니다. 누적 트리거를 사용하면 파이프라인이 이전에 전송한 데이터에 마지막 트리거가 발생 이후에 전달된 새로운 데이터를 누적합니다. 이렇게 하면 LeaderBoard를 개별 총합의 컬렉션이 아닌 진행 중인 사용자 점수의 총합으로 만들 수 있습니다.

이벤트 시간을 기반으로 팀 점수 계산

파이프라인에서 게임을 플레이하는 동안 매 시간마다 각 팀의 총 점수를 출력하려고 합니다. 사용자 점수 계산과는 달리 팀 점수에서는 각 게임 시간을 개별적으로 고려해야 하므로 각 점수가 실제로 발생한 이벤트 시간의 시점이 중요합니다. 또한 개별 시간이 진행될 때마다 예측 업데이트를 제공하고 지정된 시간의 데이터가 완료된 것으로 간주된 이후 전달된 모든 지연 데이터 인스턴스를 계산에 포함시켜야 합니다.

매 시간을 개별적으로 고려하기 때문에 HourlyTeamScore와 마찬가지로 입력 데이터에 고정 기간 설정을 적용할 수 있습니다. 예측 업데이트 및 지연 데이터의 업데이트를 제공하기 위해 추가 트리거 매개변수를 지정합니다. 이 트리거를 통해 각 기간이 지정한 간격(이 예시에서는 5분 간격)으로 결과를 계산하여 출력하고, 기간이 '종료'된 것으로 간주된 이후에도 트리거를 유지하여 지연 데이터를 계산에 포함시킵니다. 사용자 점수 계산과 마찬가지로 트리거를 누적 모드로 설정하여 매 시간 단위 기간의 진행 중인 합계를 계산합니다.

예측 업데이트 및 지연 데이터 트리거는 시간차 문제 해결을 지원합니다. 파이프라인의 이벤트는 타임스탬프에 따라 실제로 발생한 순서대로 처리되지는 않습니다. 이벤트는 사용자의 휴대전화가 네트워크 접속이 끊겼을 때 생성되므로 비순차적으로, 또는 늦게 파이프라인에 전달될 수 있습니다. Dataflow는 특정 기간 내 데이터를 '전부' 확보하는 시점을 합리적으로 예측할 수 있는 방법이 필요합니다. 이를 워터마크라고 합니다.

이상적인 상황은 모든 데이터가 발생한 즉시 처리되어 처리 시간이 이벤트 시간과 동일하거나 적어도 선형 관계를 유지하는 것입니다. 하지만 분산 시스템에는 휴대전화 보고 지연과 같은 부정확성이 내재되어 있으므로 Dataflow는 주로 휴리스틱 워터마크를 사용합니다.

다음 다이어그램은 두 팀의 진행 중인 처리 시간과 각 점수의 이벤트 시간의 관계를 보여줍니다.

그림 4: 이벤트 시간으로 기간 설정한 팀별 점수 데이터. 처리 시간 기준 트리거를 통해 기간이 조기 예측 결과를 출력하고 지연 결과를 포함합니다.

다이그램의 점선은 '이상적'인 워터마크, 즉 Dataflow에서 특정 기간의 모든 데이터가 전달된 것으로 합리적으로 간주할 수 있는 시점의 개념입니다. 불규칙한 실선은 데이터 소스(이 경우 Pub/Sub)에서 파악된 실제 워터마크를 나타냅니다.

실선으로 표시한 워터마크 위에 전달되는 데이터는 지연 데이터입니다. 이 점수 이벤트는 오프라인에서 생성되었다거나 하는 등의 이유로 지연되어 해당 기간이 종료된 이후 전달됩니다. 이 파이프라인의 지연 전달 트리거는 이 지연 데이터가 총합에 포함될 수 있도록 합니다.

다음 코드 예시는 LeaderBoard가 고정 기간 설정을 적절한 트리거에 적용하여 파이프라인이 원하는 계산을 수행하도록 만드는 방법을 보여줍니다.

// Extract team/score pairs from the event stream, using hour-long windows by default.
@VisibleForTesting
static class CalculateTeamScores
    extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Integer>>> {
  private final Duration teamWindowDuration;
  private final Duration allowedLateness;

  CalculateTeamScores(Duration teamWindowDuration, Duration allowedLateness) {
    this.teamWindowDuration = teamWindowDuration;
    this.allowedLateness = allowedLateness;
  }

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<GameActionInfo> infos) {
    return infos.apply("LeaderboardTeamFixedWindows",
        Window.<GameActionInfo>into(FixedWindows.of(teamWindowDuration))
            // We will get early (speculative) results as well as cumulative
            // processing of late data.
            .triggering(AfterWatermark.pastEndOfWindow()
                .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(FIVE_MINUTES))
                .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(TEN_MINUTES)))
            .withAllowedLateness(allowedLateness)
            .accumulatingFiredPanes())
        // Extract and sum teamname/score pairs from the event data.
        .apply("ExtractTeamScore", new ExtractAndSumScore("team"));
  }
}

이러한 처리 전략을 사용하면 동일한 기본 변환을 사용하여 데이터를 처리하면서 UserScoreHourlyTeamScore 파이프라인의 지연 시간 및 완전성 문제를 해결할 수 있습니다. 사실 두 계산 모두 UserScoreHourlyTeamScore 파이프라인에서 사용한 것과 동일한 ExtractAndSumScore 변환을 사용합니다.

GameStats: 악용 감지 및 사용 분석

LeaderBoard가 기본 기간 설정 및 트리거를 사용하여 지연 시간이 짧고 유연한 데이터 분석을 수행하는 방법을 보여주지만, 고급 기간 설정 기술을 사용하여 보다 포괄적인 분석을 수행할 수 있습니다. 여기에는 시스템 악용(예: 스팸)을 감지하거나 사용자 행동에 대한 정보를 확보하도록 설계된 일부 계산이 포함됩니다. GameStats 파이프라인은 Dataflow를 사용하여 이러한 종류의 고급 분석을 수행하는 방법을 보여기 위해 지연 시간이 짧은 LeaderBoard의 기능을 기반으로 작성되었습니다.

LeaderBoard와 마찬가지로 GameStats는 스트리밍 소스(이 예에서는 Pub/Sub)에서 데이터를 읽습니다. 따라서 GameStats를 사용자가 게임을 플레이할 때 게임에 대한 정보를 제공하는 지속적인 작업으로 간주하는 것이 좋습니다.

GameStats는 어떤 역할을 하나요?

LeaderBoard와 마찬가지로 GameStats는 팀별, 시간별 총 점수를 계산합니다. 하지만 이 파이프라인은 더욱 복잡한 두 가지 분석을 같이 수행합니다.

  • GameStats악용 감지 시스템으로, 점수 데이터에 간단한 통계 분석을 수행하여 사용자가 스팸 발송자 또는 봇인지 파악하고 의심스러운 스팸/봇 사용자 목록을 사용하여 시간별 팀 점수 계산에서 봇을 필터링합니다.
  • GameStats는 세션 기간 설정을 통해 비슷한 이벤트 시간을 공유하는 게임 데이터를 그룹화하여 사용 패턴을 분석합니다. 이를 통해 사용자의 게임 지속 시간과 시간 경과에 따른 게임 길이 변화에 대한 정보를 얻을 수 있습니다.

아래에서는 이러한 기능에 대해 자세히 살펴보겠습니다.

악용 감지

게임에서 사용자의 '클릭' 속도에 따라 점수가 결정된다고 가정하겠습니다. GameStats의 악용 감지 기능은 각 사용자의 점수 데이터를 분석하여 사용자의 '클릭률'이 비정상적으로 높은지, 따라서 점수가 비정상적으로 높은지 감지합니다. 비정상적으로 높은 클릭률은 인간보다 훨씬 빠른 속도로 작동하는 봇이 게임을 하고 있다는 것을 나타낼 수 있습니다.

점수가 '비정상적으로' 높은지 여부를 파악하기 위해 GameStats는 고정 기간 내의 모든 점수의 평균을 계산하고 각 개별 점수를 임의의 가중치(이 경우 2.5)를 곱한 평균 점수와 비교합니다. 따라서 평균보다 2.5배 이상 높은 점수는 스팸의 결과물로 간주됩니다. GameStats 파이프라인은 '스팸' 사용자 목록을 추적하고 해당 사용자를 팀 리더보드의 팀 점수 계산에서 필터링합니다.

평균 값은 파이프라인 데이터를 기반으로 하므로 이 값을 계산하고 계산된 값을 후속 ParDo 변환에 사용하여 가중치가 적용된 값을 초과하는 점수를 필터링해야 합니다. 이를 위해 계산된 평균 값을 부차 입력으로 필터링 ParDo에 전달할 수 있습니다.

다음 코드 예시에서는 악용 감지를 수행하는 복합 변환을 보여줍니다. 이 변환은 Sum.integersPerKey 변환을 사용하여 사용자별 점수를 모두 합산한 다음 Mean.globally 변환으로 모든 사용자의 평균 값을 결정합니다. 이 값을 PCollectionView 싱글톤으로 계산한 다음 .withSideInputs를 통해 ParDo를 필터링에 전달할 수 있습니다.

public static class CalculateSpammyUsers
    extends PTransform<PCollection<KV<String, Integer>>, PCollection<KV<String, Integer>>> {
  private static final Logger LOG = LoggerFactory.getLogger(CalculateSpammyUsers.class);
  private static final double SCORE_WEIGHT = 2.5;

  @Override
  public PCollection<KV<String, Integer>> apply(PCollection<KV<String, Integer>> userScores) {

    // Get the sum of scores for each user.
    PCollection<KV<String, Integer>> sumScores = userScores
        .apply("UserSum", Sum.<String>integersPerKey());

    // Extract the score from each element, and use it to find the global mean.
    final PCollectionView<Double> globalMeanScore = sumScores.apply(Values.<Integer>create())
        .apply(Mean.<Integer>globally().asSingletonView());

    // Filter the user sums using the global mean.
    PCollection<KV<String, Integer>> filtered = sumScores
        .apply(ParDo
            .named("ProcessAndFilter")
            // use the derived mean total score as a side input
            .withSideInputs(globalMeanScore)
            .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
              private final Aggregator<Long, Long> numSpammerUsers =
                createAggregator("SpammerUsers", new Sum.SumLongFn());
              @Override
              public void processElement(ProcessContext c) {
                Integer score = c.element().getValue();
                Double gmc = c.sideInput(globalMeanScore);
                if (score > (gmc * SCORE_WEIGHT)) {
                  LOG.info("user " + c.element().getKey() + " spammer score " + score
                      + " with mean " + gmc);
                  numSpammerUsers.addValue(1L);
                  c.output(c.element());
                }
              }
            }));
    return filtered;
  }
}

악용 감지 변환은 스팸 봇으로 의심되는 사용자 뷰를 생성합니다. 이후 파이프라인에서 이 뷰를 사용하여 시간당 팀 점수 계산 시 다시 부가 입력 메커니즘을 사용하여 해당 사용자를 필터링합니다. 다음 코드 예시는 고정 기간으로 점수 기간을 설정하는 작업과 팀 점수를 추출하는 작업 사이에 스팸 필터를 삽입하는 위치를 보여줍니다.

// Calculate the total score per team over fixed windows,
// and emit cumulative updates for late data. Uses the side input derived above-- the set of
// suspected robots-- to filter out scores from those users from the sum.
// Write the results to BigQuery.
rawEvents
  .apply(Window.named("WindowIntoFixedWindows")
      .<GameActionInfo>into(FixedWindows.of(
          Duration.standardMinutes(options.getFixedWindowDuration())))
      )
  // Filter out the detected spammer users, using the side input derived above.
  .apply(ParDo.named("FilterOutSpammers")
          .withSideInputs(spammersView)
          .of(new DoFn<GameActionInfo, GameActionInfo>() {
            @Override
            public void processElement(ProcessContext c) {
              // If the user is not in the spammers Map, output the data element.
              if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {
                c.output(c.element());
              }
            }}))
  // Extract and sum teamname/score pairs from the event data.
  .apply("ExtractTeamScore", new ExtractAndSumScore("team"))

사용 패턴 분석

각 게임 점수의 이벤트 시간을 검토하고 이벤트 시간이 비슷한 점수를 세션으로 그룹화하여 사용자가 게임을 하는 시점과 게임 길이에 대한 정보를 얻을 수 있습니다. GameStats는 Dataflow의 기본 제공 세션 윈도우 함수를 사용하여 발생한 시간을 기준으로 사용자 점수를 세션으로 그룹화합니다.

세션 기간을 설정할 때 이벤트 간의 최소 간격 기간을 지정합니다. 전달 시간의 차이가 최소 간격 기간보다 짧은 모든 이벤트는 동일한 기간으로 그룹화됩니다. 전달 시간의 차이가 간격 기간보다 긴 이벤트는 별도의 간격으로 그룹화됩니다. 최소 간격 기간 설정에 따라 동일한 세션 기간 내의 점수는 (비교적) 연속적인 게임 진행 속에서 발생했다고 가정할 수 있습니다. 서로 다른 기간의 점수는 사용자가 적어도 최소 간격 기간 동안 게임을 중단했다가 다시 시작했다는 것을 나타냅니다.

다음 다이어그램은 세션 기간으로 그룹화한 데이터의 모습을 나타냅니다. 고정 기간과는 달리 세션 기간은 각 사용자별로 다르며 개별 사용자의 플레이 패턴에 따라 결정됩니다.

세션 기간 설정을 보여주는 그림
그림 5: 최소 간격 기간을 가지는 사용자 세션. 각 사용자는 플레이한 인스턴스의 개수와 인스턴스 간 중단 시간 길이에 따라 서로 다른 세션을 가집니다.

세션 기간 설정 데이터를 사용하여 모든 사용자의 연속적인 플레이 시간의 평균 길이와 각 세션 동안 달성한 총 점수를 확인할 수 있습니다. 이 과정은 코드에서 먼저 세션 기간을 적용하고 사용자 및 세션별로 점수를 합산한 다음 변환을 통해 개별 세션의 길이를 계산하여 수행할 수 있습니다.

// Detect user sessions-- that is, a burst of activity separated by a gap from further
// activity. Find and record the mean session lengths.
// This information could help the game designers track the changing user engagement
// as their set of games changes.
userEvents
  .apply(Window.named("WindowIntoSessions")
        .<KV<String, Integer>>into(
              Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
    .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
  // For this use, we care only about the existence of the session, not any particular
  // information aggregated over it, so the following is an efficient way to do that.
  .apply(Combine.perKey(x -> 0))
  // Get the duration per session.
  .apply("UserSessionActivity", ParDo.of(new UserSessionInfoFn()))

이렇게 하면 길이가 포함된 사용자 세션 세트를 확인할 수 있습니다. 이 데이터를 고정 기간으로 재설정한 다음 매 시간 종료 시마다 모든 세션의 평균을 계산하여 평균 세션 길이를 계산할 수 있습니다.

// Re-window to process groups of session sums according to when the sessions complete.
.apply(Window.named("WindowToExtractSessionMean")
      .<Integer>into(
          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
// Find the mean session duration in each window.
.apply(Mean.<Integer>globally().withoutDefaults())
// Write this info to a BigQuery table.
.apply("WriteAvgSessionLength",
       new WriteWindowedToBigQuery<Double>(
          options.getTablePrefix() + "_sessions", configureSessionWindowWrite()));

결과 정보를 사용하면 예를 들어 사용자가 하루 중 게임을 가장 오래 하는 시간대나 게임 세션이 짧은 시간대를 확인할 수 있습니다.

추가 정보

GitHub의 Dataflow SDK 예(master-1.x 브랜치)에 포함된 예시 파이프라인을 실행하여 이러한 예가 작동하는 방식을 자세하게 확인할 수 있습니다. 포함된 예시 파이프라인의 전체 안내는 예시 디렉토리의 README.md를 참조하세요.

다음 블로그 게시글 및 동영상은 Dataflow의 배치 및 스트리밍 처리용 통합 데이터 모델에 관한 추가 컨텍스트와 정보를 제공합니다.

  • Dataflow & Spark 블로그 게시물 — Google 엔지니어가 작성한 블로그 게시물로, 모바일 게임 도메인의 예를 사용하여 Dataflow 모델과 Apache Spark 모델을 비교하는 방법을 설명합니다.
  • Dataflow talk @Scale — @Scale 컨퍼런스에서 촬영한 Google 엔지니어 Frances Perry의 프레젠테이션 동영상으로, 위에서 설명한 모바일 게임 예시의 초기 버전을 사용하여 Dataflow 모델을 설명합니다.
  • 'The World Beyond Batch: Streaming 101 and Streaming 102' — Google 엔지니어인 Tyler Akidau가 작성한 O'Reilly 블로그의 2부작 게시물로, 빅데이터 처리 기술의 향후 전망을 살펴봅니다.
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.