Cloud Dataflow を使用した大規模なログの処理

Google Cloud Platform は、大規模かつ多様なログ分析オペレーションを処理するために必要なスケーラブルなインフラストラクチャを提供します。このソリューションでは、Cloud Platform を使用して、複数のソースからのログエントリを処理する分析パイプラインを構築する方法について説明します。意味のある情報を抽出し、分析、評価、レポート作成に使用可能なデータから得られた分析情報を持続できるような方法でログデータを結合します。

概要

アプリケーションが複雑になるほど、ログにキャプチャされたデータから分析情報を得ることが難しくなります。ログは増え続けるソースから生成されるため、有益な情報を照合したり、問い合わせたりするのが難しい可能性があります。大規模なログデータを分析するためのインフラストラクチャの構築、運用、保守には、稼働中の分散システムとストレージに関する豊富な専門知識が必要です。この種の専用インフラストラクチャは 1 回限りの資本経費にされることが多く、その結果、容量が固定され、初期投資を超えてスケーリングすることが難しくなります。このような制限は、データから意味のある実現可能な分析情報を得ることの障害となり、ビジネスに影響を与える可能性があります。

このソリューションは、Cloud Platform を使用することによって、このような制限を克服する方法を示します。このソリューションでは、サンプル マイクロサービスのセットが Google Kubernetes Engine 上で動作してウェブサイトを実装します。 Stackdriver Logging がこうしたサービスからログを収集し、Google Cloud Storage バケットに保存します。 その後で、Google Cloud Dataflow がメタデータを抽出し、基本集計を計算することによって、ログを処理します。Cloud Dataflow パイプラインは、毎日ログ要素を処理し、日々のログに基づいてサーバー レスポンス答時間の集計指標を生成するように設計されています。最後に、Cloud Dataflow からの出力が Google BigQuery テーブルに読み込まれます。そこでは、データを分析してビジネス インテリジェンスを得ることができます。このソリューションでは、低レイテンシで非同期のログ処理のために、ストリーミング モードで実行するようにパイプラインを変更する方法についても説明します。

ソリューションではいくつかの Cloud Platform コンポーネントを使用します

付属のチュートリアルでは、サンプル Cloud Dataflow パイプライン、サンプル ウェブ アプリケーション、構成情報、サンプルの実行手順を提供します。

アプリについて

サンプル デプロイは、ショッピング アプリをモデル化します。このサンプルでは、ユーザーは小売サイトのホームページを訪問して、個別の商品を閲覧してから、近くの実店舗で商品を確認できます。アプリは、3 つのマイクロサービス(HomeServiceBrowseServiceLocateService)で構成されます。各サービスは、共有名前空間内の API エンドポイントから利用できます。ユーザーは、/home/browse/locate をベース URL に追加することによって、サービスにアクセスします。

アプリケーションは、受信 HTTP リクエストを stdout にログ出力するように構成されています。

Kubernetes Engine と Stackdriver Logging の使用

この例では、マイクロサービスが Kubernetes Engine クラスタ内で動作します。このクラスタは、Kubernetes を実行する Google Compute Engine インスタンス(ノード)からなるグループです。デフォルトで、Kubernetes Engine は、モニタリング、ヘルスチェック、集中型ロギングを含む、さまざまなサービスを提供するように各ノードを構成します。このソリューションでは、Stackdriver Logging が各マイクロサービスから Cloud Storage にログを送信するためにこの組み込みサポートを使用します。このソリューションの対象外である情報をファイルに記録するアプリケーションの代替手段として、Kubernetes を使用したクラスタレベルのロギングを構成できます。

各マイクロサービスは、クラスタ内の個別のポッド上で動作します。各ポッドは、Kubernetes Engine サービスを使用することにより、1 つのノード上で動作し、単一の HTTP エンドポイントとして公開されます。

マイクロサービスは個別のノード上で動作します

クラスタ内の各ノードで、ログメッセージをキャプチャする Stackdriver Logging エージェントが実行されます。ログが Stackdriver Logging 内で使用可能になると、Cloud SDK から入手可能な Stackdriver Logging サポートを使用して、スクリプトが自動的に Cloud Storage バケットにログをエクスポートします。logging.sh では、次の例のようなコマンドをサンプルが実行します。

# Create a Cloud Storage Bucket
gsutil -q mb gs://BUCKET_NAME

# Allow Stackdriver Logging access to the bucket
gsutil -q acl ch -g cloud-logs@google.com:O gs://BUCKET_NAME

# For each microservice, set up Stackdriver Logging exports
gcloud beta logging sinks create SINK_NAME \
  storage.googleapis.com/BUCKET_NAME \
  --log=”kubernetes.home_service…” --project=PROJECT_ID

ログビューアを使用すれば、Cloud Storage にエクスポートするようにログを構成することもできます。このソリューションでは SDK を使用します。これは、複数のログをエクスポートするときに必要になるためです。

ログのエクスポート先として Cloud Storage を使用する場合は、タイプ LogEntry のログエントリが 1 時間ごとのバッチで個別の JSON ファイルに保存されます。これらの構造化された Cloud Logging エントリには、各ログメッセージが作成された時刻、それを生成したのはリソースかインスタンスか、その重大度レベルなどを指定する追加のメタデータが含まれています。次の Stackdriver Logging エントリの例では、structPayload.log 要素で、マイクロサービスが生成したオリジナルのログメッセージを見ることができます。

{
  "metadata": {
    "projectId": "...",
    "serviceName": "compute.googleapis.com",
    "zone": "us-central1-f",
    "labels": {
      "compute.googleapis.com/resource_id": "4154944251817867710",
      "compute.googleapis.com/resource_type": "instance"
    },
    "timestamp": "2015-09-22T20:01:13Z"
  },
  "insertId": "2015-09-22|13:01:17.636360-07|10.106.196.103|1124257302",
  "log": "kubernetes.browse-service-iaus6_default_sample-browse-service",
  "structPayload": {
    "stream": "stdout",
    "log": "2015/09/22 - 20:01:13 | 404 | 176ns | 10.160.0.1:34790 | GET /browse/46"
  }
}

Cloud Dataflow パイプラインの作成

Cloud Dataflow は、何種類ものデータ処理タスクに使用できるシンプルながらも強力なシステムです。Dataflow SDK は、絶えず更新されるデータソースからの無制限のまたは無限のデータセットを含む、任意のサイズのデータセットを表すことが可能な統合データモデルを提供します。そのため、このソリューションにおけるログデータの操作に最適です。Dataflow マネージド サービスは、バッチジョブとストリーミング ジョブの両方を実行できます。これは、非同期または同期、リアルタイム、イベント駆動型のデータ処理に単一のコードベースが使用できることを意味します。

Dataflow SDK は、PCollection という名前の特殊なコレクション クラスを介した単純なデータ表現を提供します。この SDK は、PTransform クラスを介した組み込みとカスタムのデータ変換を提供します。Cloud Dataflow では、変換がパイプラインの処理ロジックを表します。変換は、データの結合、値の数学的計算、データ出力のフィルタリング、データの形式間の変換などのさまざまな処理オペレーションに使用できます。パイプライン、PCollection、変換、I/O ソースとシンクの詳細については、Dataflow プログラミング モデルをご覧ください。

次の図は、Cloud Storage に保存されたログデータのパイプライン オペレーションを示しています。

Cloud Dataflow パイプラインはいくつかの手順を辿ります

図では複雑に見えるかもしれませんが、Cloud Dataflow がパイプラインの構築と使用を容易にします。次のセクションでは、パイプラインの各ステージでの特定のオペレーションについて説明します。

データの受信

パイプラインは、3 つのマイクロサービスからのログが含まれている Cloud Storage バケットからの入力を処理することによって開始します。ログのコレクションのそれぞれが String 要素の PCollection になります。また、それぞれの要素は単一の LogEntry オブジェクトに対応します。次のスニペットでは、homeLogsbrowseLogslocateLogsPCollection<String>: 型です。

homeLogs = p.apply(TextIO.Read.named("homeLogsTextRead").from(options.getHomeLogSource()));
browseLogs = p.apply(TextIO.Read.named("browseLogsTextRead").from(options.getBrowseLogSource()));
locateLogs = p.apply(TextIO.Read.named("locateLogsTextRead").from(options.getLocateLogSource()));

絶えず更新されるデータセットの課題に対処するため、Dataflow SDK ではウィンドウ化と呼ばれる手法が使用されます。ウィンドウ化は、PCollection 内のデータをその個別の要素のタイムスタンプに従って論理的に細分化することによって動作します。この場合はソースタイプが TextIO であるため、すべてのオブジェクトが最初に単一のグローバル ウィンドウに読み込まれます。これがデフォルトの動作です。

オブジェクトへのデータの収集

次の手順では、Flatten オペレーションを使用して、個別のマイクロサービス PCollection を単一の PCollection にまとめます。

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

各ソース PCollection が同じデータ型を含み、同じグローバル ウィンドウ化戦略を使用するため、このオペレーションは便利です。このソリューションでは各ログのソースと構造が同じですが、このアプローチはソースと構造が異なるログにまで拡張することもできます。

これで、作成した単一の PCollection を使用することにより、ログエントリに対していくつかの手順を実施するカスタム変換を通して、個別の String を処理できます。手順は次のとおりです。

変換は文字列メッセージを処理してログメッセージを作成します

  • JSON 文字列を Stackdriver Logging LogEntry Java オブジェクトにシリアル化解除します。
  • LogEntry メタデータからタイムスタンプを抽出します。
  • 正規表現を使用して、ログメッセージから個別のフィールド(timestampresponseTimehttpStatusCodehttpMethodsource IP アドレス、destination エンドポイント)を抽出します。これらのフィールドを使用して、タイムスタンプ付きの LogMessage カスタム オブジェクトを作成します。
  • LogMessage オブジェクトを新しい PCollection に出力します。

次のコードで手順を実行します。

PCollection<LogMessage> allLogMessages = allLogs
  .apply(ParDo.named("allLogsToLogMessage").of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

日単位のデータの集計

目標は、日単位で要素を処理し、日々のログに基づいて集計指標を生成することです。この集計を行うには、日単位でデータを細分化するウィンドウ化関数が必要です。これは、PCollection 内の各 LogMessage にタイムスタンプが付けられている場合に可能な処理です。Cloud Dataflow が 1 日の境界に沿って PCollection を分割したら、ウィンドウ化 PCollection をサポートするオペレーションがウィンドウ化スキームを引き受けます。

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply(Window.named("allLogMessageToDaily").<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

単一のウィンドウ化 PCollection により、単一の Cloud Dataflow ジョブを実行するだけで、3 つすべての複数日ログソースを通して日単位集計指標を計算できるようになりました。

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Combine.<String,Double,Double>perKey(new Max.MaxDoubleFn()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

まず、変換が LogMessage オブジェクトを入力として取得してから、宛先エンドポイントをキーとしてレスポンス時間値にマッピングする、Key-Value ペアの PCollection を出力します。その PCollection を使用して、次の 2 つの集計指標を計算できます。1 つは宛先あたりの最大レスポンス時間で、もう 1 つは宛先あたりの平均レスポンス時間です。PCollection はまだ日単位で分割されているため、各計算の出力が 1 日分のログデータを表します。これは、2 つの最終 PCollection が作成されることを意味します。1 つは 1 日の宛先あたりの最大レスポンス時間を含み、もう 1 つは 1 日の宛先あたりの平均レスポンス時間を含みます。

コンピューティングは日単位指標を集計します

BigQuery へのデータの読み込み

パイプラインの最後の手順では、下流分析とデータ ウェアハウジングのために結果の PCollection を BigQuery に出力します。

まず、パイプラインがすべてのログソースの LogMessage オブジェクトを含む PCollection を BigQuery TableRow オブジェクトの PCollection に変換します。この手順は、Cloud Dataflow の組み込みサポートを利用して BigQuery をパイプライン用のシンクとして使用するために必要です。

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply(ParDo.named("logMessageToTableRow").of(new LogMessageTableRowFn()));

BigQuery テーブルには、定義済みのスキーマが必要です。このソリューションでは、スキーマがデフォルト値のアノテーションを使用して LogAnalyticsPipelineOptions.java で定義されます。たとえば、maximum-response-time テーブルのスキーマは次のように定義されます。

@Default.String("destination:STRING,aggResponseTime:FLOAT")

集計された応答時間値を含む PCollection に対するオペレーションは、それらに適切なスキーマを適用し、欠落しているテーブルを作成しながら、TableRow オブジェクトの PCollection に変換します。

logsAsTableRows.apply(BigQueryIO.Write
  .named("allLogsToBigQuery")
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

このソリューションは、常に、新しいデータを既存のデータに追加します。このパイプラインは新しいログデータを分析するために定期的に実行されるため、これが適切な選択になります。ただし、これらのオプションのいずれかが別のシナリオにより適合する場合は、既存のテーブルデータが切り捨てられたり、空のテーブルにだけ書き込まれたりする可能性があります。

データへのクエリ

BigQuery コンソールを使用すると、出力データに対してクエリを実行し、TableauQlikView などのサードパーティ製ビジネス インテリジェンス ツールに接続してさらに分析を進めることができます。

BigQuery コンソールはログデータに対するクエリを実行します

ストリーミング パイプラインの使用

サンプルには、バッチモードとストリーミング モードのどちらかでパイプラインを実行するためのサポートが含まれています。パイプラインをバッチからストリーミングに変更するのに数手順しかかかりません。まず、Stackdriver Logging セットアップがロギング情報を Cloud Storage ではなく Cloud Pub/Sub にエクスポートします。その次の手順は、Cloud Dataflow パイプライン内の入力ソースを Cloud Storage から Cloud Pub/Sub トピック サブスクリプションに変更することです。入力ソースごとに 1 つずつのサブスクリプションが必要です。

Cloud Pub/Sub はサブスクリプションを使用します

logging.sh で、使用中の SDK コマンドを確認することができます。

Cloud Pub/Sub 入力データから作成された PCollection は無制限のグローバル ウィンドウを使用します。ただし、個別のエントリにはすでにタイムスタンプが押されています。これは、Cloud Stackdriver LogEntry オブジェクトからタイムスタンプ データを抽出する必要がないことを意味します。抽出するのは、カスタム LogMessage オブジェクトを作成するためのログ タイムスタンプだけです。

Cloud Pub/Sub パイプラインを使用すれば、ログからタイムスタンプを抽出することができます

パイプラインの残りの部分は、ダウンストリームのフラット化、変換、集計、出力オペレーションを含めて、現状のまま変更されません。

パイプラインの実行

この Cloud Dataflow パイプラインを設定、構築、デプロイする手順と、マイクロサービスをデプロイして Stackdriver Logging エクスポートを設定するために必要な手順については、チュートリアルをご覧ください。Cloud Dataflow ジョブを実行する場合は、Google Cloud Platform Console を使用して、進行状況をモニタリングしたり、パイプライン内の各ステージに関する情報を表示したりできます。

次の画像は、パイプライン例を実行中のコンソール ユーザー インターフェースを示しています。

GCP Console に実行中の Cloud Dataflow ジョブが表示される

ソリューションの拡張

このソリューションで説明したパイプラインと一連の操作は、さまざまな方法で拡張することができます。最も明白な拡張は、LogMessage データ全体で追加の集計を実行することです。たとえば、セッションまたは匿名ユーザー情報がログ出力に含まれている場合は、ユーザー アクティビティに基づいて集計を作成することができます。また、ApproximateQuantiles 変換を使用して、応答時間の分布を生成することもできます。

リソースと費用

このチュートリアルでは、次にような Google Cloud Platform の有料コンポーネントを使用します。

  • マイクロサービスをデプロイするための Kubernetes Engine
  • ログを受信してエクスポートするための Stackdriver Logging
  • エクスポートされたログをバッチモードで保存するための Cloud Storage
  • エクスポートされたログをストリーミング モードでストリーミングするための Cloud Pub/Sub
  • ログデータを処理するための Cloud Dataflow
  • 処理出力を保存してその出力に対する豊富なクエリをサポートするための BigQuery

このチュートリアルを実行するための費用は、実行時間によって異なります。料金計算ツールを使用すると、予想使用量に基づいて費用の見積もりを作成できます。Cloud Platform の新規ユーザーは無料トライアルをご利用いただけます。

チュートリアル

https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow で GitHub から、セットアップ手順やソースコードを含むチュートリアルの内容をすべて入手できます。

次のステップ

  • Google Cloud Platform のその他の機能を試す。チュートリアルをご覧ください。
  • Google Cloud Platform プロダクトを使用してエンドツーエンドのソリューションを作成する方法を学習します。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...