Cloud Dataflow を使用した大規模なログの処理

Google Cloud Platform(GCP)は、大規模で多様なログ分析に対応するのに必要なスケーラブルなインフラストラクチャを提供します。このチュートリアルでは、GCP を使用して、複数のソースからのログエントリを処理する分析パイプラインを構築する方法を説明します。意味のある情報を抽出し、そのデータから得た分析情報を存続させるように、ログデータを組み合わせます。そのデータは、分析、レビュー、レポート作成に使用できます。

概要

アプリケーションが複雑になるほど、ログにキャプチャされたデータから分析情報を得ることが難しくなります。ログは増え続けるソースから生成されるため、有益な情報を照合したり、問い合わせたりするのが難しい可能性があります。大規模なログデータを分析するためのインフラストラクチャの構築、運用、保守には、稼働中の分散システムとストレージに関する豊富な専門知識が必要です。この種の専用インフラストラクチャは 1 回限りの資本経費にされることが多く、その結果、容量が固定され、初期投資を超えてスケーリングすることが難しくなります。このような制限は、データから意味のある実現可能な分析情報を得ることの障害となり、ビジネスに影響を与える可能性があります。

次の図のように GCP プロダクトを使用することで、このような制限を解消できます。

ソリューションではいくつかの GCP コンポーネントを使用します

このソリューションでは、ウェブサイトを実装するために一連のサンプル マイクロサービスを Google Kubernetes Engine(GKE)で実行します。 Stackdriver Logging はこれらのサービスからログを収集し、そのログを Cloud Storage バケットに保存します。 次に、Cloud Dataflow によって、メタデータを抽出し基本集計を計算することによって、ログを処理します。Cloud Dataflow パイプラインは、ログ要素を毎日処理し日々のログに基づいてサーバーのレスポンス時間の集計指標を生成するように設計されています。最後に、Cloud Dataflow からの出力が BigQuery テーブルに読み込まれ分析されて、ビジネス インテリジェンスが提供されます。このソリューションでは、低レイテンシで非同期のログ処理のために、ストリーミング モードで実行するようにパイプラインを変更する方法についても説明します。

このチュートリアルでは、サンプル Cloud Dataflow パイプライン、サンプル ウェブ アプリケーション、構成情報、およびサンプルを実行する手順について説明します。

費用

このチュートリアルでは、以下の課金対象の Google Cloud Platform コンポーネントを使用します。

  • GKE(マイクロサービスのデプロイ用)。
  • Logging(ログを受信してエクスポートするため)。
  • Cloud Storage(エクスポートしたログのバッチモードでの保存用)。
  • Cloud Pub/Sub(エクスポートしたログのストリーミング モードでのストリーミング用)。
  • Cloud Dataflow(ログデータの処理用)。
  • BigQuery(処理出力の保存とその出力に対する豊富なクエリのサポート用)。

料金計算ツールを使用して、予測される使用量に基づき、費用の見積もりを出すことができます。 GCP を初めてご利用の場合は、無料トライアルをご利用いただけます。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳しくは、クリーンアップをご覧ください。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP プロジェクトを選択または作成します。

    [リソースの管理] ページに移動

  3. プロジェクトに対して課金が有効になっていることを確認します。

    課金を有効にする方法について

  4. BigQuery、Cloud Storage、Cloud Pub/Sub、Cloud Dataflow、GKE、Logging API を有効にします。

    APIを有効にする

  5. Stackdriver Workspace を作成します。ワークスペースの詳細については、ワークスペースの管理をご覧ください。

    Stackdriver に移動

環境設定

このチュートリアルでは、Cloud Shell を使用してコマンドを入力します。Cloud Shell では GCP Console のコマンドラインにアクセスできます。また、GCP で開発を行うために必要な Cloud SDK やその他のツールも含まれています。Cloud Shell は GCP Console の下部にウィンドウとして表示されます。初期化が完了するまでに数分かかることもありますが、ウィンドウはすぐに表示されます。

Cloud Shell を使用して環境をセットアップし、このチュートリアルで使用される git リポジトリのクローンを作成するには、以下の手順を実行します。

  1. GCP Console で、Cloud Shell を開きます。

    Cloud Shell を開く

  2. 作成したばかりのプロジェクトで作業していることを確認してください。[YOUR_PROJECT_ID] を新しく作成した GCP プロジェクトに置き換えます。

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. デフォルトのコンピューティング ゾーンを設定します。このチュートリアルでは、us-east1 に設定します。本番環境にデプロイする場合は、選択したリージョンにデプロイします。

    export REGION=us-east1
    gcloud config set compute/region $REGION
    

サンプル リポジトリのクローニング

  • このチュートリアルで使用するスクリプトとアプリケーション ロジックを含むリポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/processing-logs-using-dataflow.git
    cd processing-logs-using-dataflow/services
    

環境変数の構成

# name your bucket
export PROJECT_ID=[YOUR_PROJECT_ID]
# name your GKE cluster
export CLUSTER_NAME=cluster-processing-logs-using-dataflow

# name the bucket for this tutorial
export BUCKET_NAME=${PROJECT_ID}-processing-logs-using-dataflow

# name the logging sink for this tutorial
export SINK_NAME=sink-processing-logs-using-dataflow

# name the logging sink for this tutorial
export DATASET_NAME=processing_logs_using_dataflow

新しい Google Kubernetes Engine クラスタにサンプル アプリケーションをデプロイする

# create the cluster and deploy sample services
./cluster.sh $PROJECT_ID $CLUSTER_NAME up

サンプル アプリケーションのデプロイについて

このサンプルのデプロイは、ショッピング アプリをモデル化します。このサンプルでは、ユーザーは小売サイトのホームページを訪問して、個別の商品を閲覧してから、近くの実店舗で商品を確認できます。アプリは、3 つのマイクロサービス(HomeServiceBrowseServiceLocateService)で構成されます。各サービスは、共有名前空間内の API エンドポイントから利用できます。ユーザーは、/home/browse/locate をベース URL に追加することによって、サービスにアクセスします。

アプリケーションは、受信 HTTP リクエストを stdout にログ出力するように構成されています。

Google Kubernetes Engine と Stackdriver Logging の使用

この例では、マイクロサービスは Kubernetes を実行する Compute Engine インスタンスまたはノードのグループである、Kubernetes Engine クラスタで実行されます。デフォルトでは、GKE は、モニタリング、ヘルスチェック、ログの一元管理を含むさまざまなサービスを提供するように各ノードが構成されます。このソリューションでは、Stackdriver Logging が各マイクロサービスから Cloud Storage にログを送信するためにこの組み込みサポートを使用します。このソリューションの対象外である、情報をファイルにログ出力するアプリケーションの代替手段として、Kubernetes を使用したクラスタレベルのロギングを構成できます。

各マイクロサービスは、クラスタ内の個々のポッド上で実行されます。各ポッドはノード上で実行され、GKE のサービスを使用して、単一の HTTP エンドポイントとして公開されます。

マイクロサービスは個別のノード上で動作します

クラスタ内の各ノードで、ログメッセージをキャプチャする Stackdriver Logging エージェントが実行されます。Logging でログが利用可能になると、Cloud SDK から利用可能な Logging サポートを使用して、スクリプトが Cloud Storage バケットに自動的にログをエクスポートします。

ログビューアを使用すると、Cloud Storage にエクスポートされるようにログを構成することもできます。このソリューションでは Cloud SDK を使用します。Cloud SDK は、複数のログをエクスポートするときに必要になるためです。

ログのエクスポート先として Cloud Storage を使用する場合は、タイプ LogEntry のログエントリが、1 時間ごとに一括で個々の JSON ファイルに保存されます。これらの構造化された Logging エントリには、各ログメッセージが作成されたとき、リソースとインスタンスのどちらがそれを生成したか、その重大度レベルなどを指定する追加のメタデータなどが含まれます。次の Logging エントリの例では、structPayload.log 要素にマイクロサービスが生成した元のログメッセージが表示されます。

 {
    "insertId": "ugjuig3j77zdi",
    "labels": {
        "compute.googleapis.com/resource_name": "fluentd-gcp-v3.2.0-9q4tr",
        "container.googleapis.com/namespace_name": "default",
        "container.googleapis.com/pod_name": "browse-service-rm7v9",
        "container.googleapis.com/stream": "stdout"
    },
    "logName": "projects/processing-logs-at-scale/logs/browse-service",
    "receiveTimestamp": "2019-03-09T00:33:30.489218596Z",
    "resource": {
        "labels": {
            "cluster_name": "cluster-processing-logs-using-dataflow",
            "container_name": "browse-service",
            "instance_id": "640697565266753757",
            "namespace_id": "default",
            "pod_id": "browse-service-rm7v9",
            "project_id": "processing-logs-at-scale",
            "zone": "us-east1-d"
        },
        "type": "container"
    },
    "severity": "INFO",
    "textPayload": "[GIN] 2019/03/09 - 00:33:23 | 200 |     190.726µs |      10.142.0.6 | GET      /browse/product/1\n",
    "timestamp": "2019-03-09T00:33:23.743466177Z"
 }

ロギングの設定

クラスタが実行しサービスがデプロイされたら、アプリケーションのロギングを構成できます。

最初にクラスタの認証情報を取得します。Stackdriver Logging のエクスポート シンクを構成するためのサービス名を取得するのに kubectl が使用されるためです。

gcloud container clusters get-credentials  $CLUSTER_NAME --region $REGION

コード リポジトリ内で、services/logging.sh によりバッチまたはストリーミング モードで必要なコンポーネントを設定します。スクリプトは次のパラメータを受け入れます。

logging.sh [YOUR_PROJECT_ID] [BUCKET_NAME] [streaming|batch] [up|down]

このチュートリアルの目的のために、バッチロギングを開始する

./logging.sh $PROJECT_ID $BUCKET_NAME batch up

下記の手順は、バッチモードで実行されるコマンドの例です。

  1. Cloud Storage バケットを作成します。

    gsutil -q mb gs://[BUCKET_NAME]

  2. バケットへの Stackdriver Logging のアクセス権を付与します。

    gsutil -q acl ch -g cloud-logs@google.com:O gs://[BUCKET_NAME]

  3. マイクロサービスごとに、シンクを使用して Stackdriver のエクスポートを設定します。

    gcloud logging sinks create [SINK_NAME] \ storage.googleapis.com/[BUCKET_NAME] \ --log-filter="kubernetes.home_service..." --project=[YOUR_PROJECT_ID]

エクスポート先の権限を更新する

宛先(この場合は Cloud Storage バケット)の権限は、シンクを作成しても変更されません。Cloud Storage バケットの権限の設定を変更して、シンクに書き込み権限を付与する必要があります。

Cloud Storage バケットの権限を更新するには、次の手順に従います。

  1. シンクの書き込み ID を特定します。

    1. [ログビューア] ページに移動します。

      [ログビューア] ページに移動

    2. シンクの概要(シンクの書き込み ID など)を表示するには、左側のメニューで [エクスポート] を選択します。

    3. 重要: 3 つのシンクのそれぞれに個別のサービス アカウントのメールアドレスがあります。それらのメール アカウントに、Cloud Storage バケットに対する権限を付与する必要があります。

  2. GCP Console で、[Storage] > [ブラウザ] をクリックします。

    [ブラウザ] に移動

  3. 詳細ビューを開くには、バケットの名前をクリックします。

  4. [権限] を選択し、[メンバーを追加] をクリックします。

  5. 役割Storage Object Creator に設定し、シンクの書き込み ID を入力します。

詳細については、エクスポート先の権限をご覧ください。

次のコマンドを使用してログ オブジェクトのパスを確認できます。

gsutil ls gs://processing-logs-at-scale-processing-logs-using-dataflow/ | grep service

出力に 3 つのエントリがすべて含まれている場合、データ パイプラインを実行するための手順を進めることができます。

 gs://processing-logs-at-scale-processing-logs-using-dataflow/browse-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/home-service/
 gs://processing-logs-at-scale-processing-logs-using-dataflow/locate-service/

BigQuery データセットを作成する

bq mk $DATASET_NAME

アプリケーション サービスの負荷を生成する

Apache HTTP サーバー ユーティリティをインストールする

Apache HTTP サーバー ベンチマーク ツール(ab)を使用して、サービスの負荷を生成します。

sudo apt-get update

sudo apt-get install -y apache2-utils

load.sh シェル スクリプトは、HomeServiceBrowseServiceLocateService からのレスポンスをリクエストすることによって、マイクロサービスの負荷を生成します。

単一の負荷セットは、ホームサービスへの 1 つのリクエストと、ブラウザ サービスと位置情報サービスに対するそれぞれ 20 のリクエストで構成されます。

次のオプションは、1,000 個の負荷セットを生成します(リクエストの同時実行数は 3 つに設定されています)

cd ../services
./load.sh 1000 3

十分な量のログが作成されるように、これを数分間実行します。

Cloud Dataflow パイプラインを開始する

十分な量のトラフィックがサービスに到達するようにしたら、Dataflow パイプラインを開始できます。

このチュートリアルでは、Cloud Dataflow パイプラインはバッチモードで実行されます。pipeline.sh シェル スクリプトはパイプラインを手動で開始します。

cd ../dataflow
./pipeline.sh $PROJECT_ID $DATASET_NAME $BUCKET_NAME run

Cloud Dataflow パイプラインについて

Cloud Dataflow は、さまざまなデータ処理タスクに使用できます。Cloud Dataflow SDK は、任意のサイズのデータセットを表現できる統合データモデルを提供します。継続的に更新されるデータソースの上限のないデータセットも表現できるので、このソリューションのログデータの処理に最適です。Cloud Dataflow マネージド サービスは、バッチジョブとストリーミング ジョブの両方を実行できます。これは、非同期または同期、リアルタイム、イベント ドリブンのデータ処理に単一のコードベースが使用できることを意味します。

Cloud Dataflow SDK は、PCollection という名前の特殊なコレクション クラスを介した単純なデータ表現を提供します。この SDK は、PTransform クラスを介して組み込みとカスタムのデータ変換を提供します。Cloud Dataflow では、変換がパイプラインの処理ロジックを表します。変換は、データの結合、値の数学的計算、データ出力のフィルタリング、データの形式間の変換などのさまざまな処理オペレーションに使用できます。パイプライン、PCollection、変換、I/O ソースとシンクの詳細については、Dataflow プログラミング モデルをご覧ください。

次の図は、Cloud Storage に保存されたログデータのパイプライン オペレーションを示しています。

パイプラインの操作手順

図では複雑に見えるかもしれませんが、Cloud Dataflow がパイプラインの構築と使用を容易にします。次のセクションでは、パイプラインの各ステージでの特定のオペレーションについて説明します。

データの受信

パイプラインは、3 つのマイクロサービスからのログが含まれている Cloud Storage バケットからの入力を処理することによって開始します。ログのコレクションのそれぞれが String 要素の PCollection になります。また、それぞれの要素は単一の LogEntry オブジェクトに対応します。次のスニペットでは、homeLogsbrowseLogslocateLogsPCollection<String>: 型です。

homeLogs = p.apply("homeLogsTextRead", TextIO.read().from(options.getHomeLogSource()));
browseLogs = p.apply("browseLogsTextRead", TextIO.read().from(options.getBrowseLogSource()));
locateLogs = p.apply("locateLogsTextRead", TextIO.read().from(options.getLocateLogSource()));

絶えず更新されるデータセットの課題に対処するため、Dataflow SDK ではウィンドウ化と呼ばれる手法が使用されます。ウィンドウ化は、PCollection のデータをその個別の要素のタイムスタンプに基づいて、論理的に細分化することによって動作します。この場合はソースタイプが TextIO であるため、すべてのオブジェクトが最初に単一グローバル ウィンドウに読み込まれます。これがデフォルトの動作です。

オブジェクトへのデータの収集

次のステップでは、Flatten オペレーションを使用して、個々のマイクロサービス PCollection を単一の PCollection にまとめます。

PCollection<String> allLogs = PCollectionList
  .of(homeLogs)
  .and(browseLogs)
  .and(locateLogs)
  .apply(Flatten.<String>pCollections());

各ソース PCollection が同じデータ型を含み、同じグローバル ウィンドウ化戦略を使用するため、このオペレーションは便利です。このソリューションでは各ログのソースと構造が同じですが、このアプローチはソースと構造が異なるログにまで拡張することもできます。

これで、作成した単一の PCollection を使用することにより、ログエントリに対していくつかの手順を実施するカスタム変換を通して、個別の String を処理できます。次の図に手順を示します。

変換で文字列メッセージを処理して、ログメッセージを作成します

  • JSON 文字列を Stackdriver Logging LogEntry Java オブジェクトにシリアル化解除します。
  • LogEntry メタデータからタイムスタンプを抽出します。
  • 正規表現を使用して、ログメッセージから個別のフィールド(timestampresponseTimehttpStatusCodehttpMethodsource IP アドレス、destination エンドポイント)を抽出します。これらのフィールドを使用して、タイムスタンプ付きの LogMessage カスタム オブジェクトを作成します。
  • LogMessage オブジェクトを新しい PCollection に出力します。

次のコードで手順を実行します。

PCollection<LogMessage> allLogMessages = allLogs
  .apply("allLogsToLogMessage", ParDo.of(new EmitLogMessageFn(outputWithTimestamp, options.getLogRegexPattern())));

日単位のデータの集計

目標は、日単位で要素を処理し、日々のログに基づいて集計指標を生成することです。この集計を行うには、日単位でデータを細分化するウィンドウ化関数が必要です。これは、PCollection 内の各 LogMessage にタイムスタンプが付けられている場合に可能な処理です。Cloud Dataflow が 1 日の境界に沿って PCollection を分割したら、ウィンドウ化 PCollection をサポートするオペレーションがウィンドウ化スキームを引き受けます。

PCollection<LogMessage> allLogMessagesDaily = allLogMessages
  .apply("allLogMessageToDaily", Window.<LogMessage>into(FixedWindows.of(Duration.standardDays(1))));

単一のウィンドウ化 PCollection により、単一の Cloud Dataflow ジョブを実行するだけで、3 つすべての複数日ログソースを通して日単位集計指標を計算できるようになりました。

PCollection<KV<String,Double>> destMaxRespTime = destResponseTimeCollection
  .apply(Max.<String>doublesPerKey());
 // .apply(Combine.<String,Double,Double>perKey(new Max.doublesPerKey()));

PCollection<KV<String,Double>> destMeanRespTime = destResponseTimeCollection
  .apply(Mean.<String,Double>perKey());

次の図のように、まず、変換が LogMessage オブジェクトを入力として取得してから、宛先エンドポイントをキーとしてレスポンス時間値にマッピングする、Key-Value ペアの PCollection を出力します。

コンピューティングは日単位指標を集計します

その PCollection を使用して、次の 2 つの集計指標を計算できます。1 つは宛先あたりの最大レスポンス時間で、もう 1 つは宛先あたりの平均レスポンス時間です。PCollection はまだ日単位で分割されているため、各計算の出力が 1 日分のログデータを表します。これは、2 つの最終 PCollection が作成されることを意味します。1 つは 1 日の宛先あたりの最大レスポンス時間を含み、もう 1 つは 1 日の宛先あたりの平均レスポンス時間を含みます。

BigQuery へのデータの読み込み

パイプラインの最後の手順では、下流分析とデータ ウェアハウジングのために結果の PCollection を BigQuery に出力します。

まず、パイプラインがすべてのログソースの LogMessage オブジェクトを含む PCollection を BigQuery TableRow オブジェクトの PCollection に変換します。この手順は、Cloud Dataflow の組み込みサポートを利用して BigQuery をパイプライン用のシンクとして使用するために必要です。

PCollection<TableRow> logsAsTableRows = allLogMessagesDaily
  .apply("logMessageToTableRow", ParDo.of(new LogMessageTableRowFn()));

BigQuery テーブルには、定義済みのスキーマが必要です。このソリューションでは、スキーマがデフォルト値のアノテーションを使用して LogAnalyticsPipelineOptions.java で定義されます。たとえば、maximum-response-time テーブルのスキーマは次のように定義されます。

@Default.String("destination:STRING,aggResponseTime:FLOAT")

集計された応答時間値を含む PCollection に対するオペレーションは、それらに適切なスキーマを適用し、欠落しているテーブルを作成しながら、TableRow オブジェクトの PCollection に変換します。

logsAsTableRows.apply("allLogsToBigQuery", BigQueryIO.writeTableRows()
  .to(options.getAllLogsTableName())
  .withSchema(allLogsTableSchema)
  .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
  .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

このソリューションは、常に、新しいデータを既存のデータに追加します。このパイプラインは新しいログデータを分析するために定期的に実行されるため、これが適切な選択になります。ただし、これらのオプションうちのいずれかが、別のシナリオでより適切になる場合は、既存のテーブルデータを切り捨てることができます。また、テーブルが空の場合にのみ書き込むこともできます。

BigQuery からのデータのクエリ

BigQuery コンソールを使用すると、出力データに対してクエリを実行し、TableauQlikView などのサードパーティ ビジネス インテリジェンス ツールに接続してさらに分析を行うことができます。

  1. GCP Console で、BigQuery を開きます。

    BigQuery を開く

  2. プロジェクト processing-logs-at-scale をクリックし、次にデータセット processing_logs_using_dataflow をクリックします。

  3. all_logs_table を選択し、次にデータペインで [プレビュー] を選択して、all logs テーブルのデータのサンプルを表示します。

  4. クエリエディタで、次のクエリを入力します。

    SELECT *
    FROM `processing_logs_using_dataflow.max_response_time_table`
    ORDER BY aggResponseTime DESC
    LIMIT 100;
    
  5. クエリを実行するには、[実行] をクリックします。

    BigQuery コンソールはログデータに対するクエリを実行します。

ストリーミング パイプラインの使用

サンプルには、バッチモードとストリーミング モードのどちらかでパイプラインを実行するためのサポートが含まれています。パイプラインをバッチからストリーミングに変更するのに数手順しかかかりません。まず、Stackdriver Logging セットアップがロギング情報を Cloud Storage ではなく Cloud Pub/Sub にエクスポートします。その次の手順は、Cloud Dataflow パイプライン内の入力ソースを Cloud Storage から Cloud Pub/Sub トピック サブスクリプションに変更することです。入力ソースごとに 1 つずつのサブスクリプションが必要です。

Cloud Pub/Sub はサブスクリプションを使用します。

logging.sh で、使用中の SDK コマンドを確認することができます。

Cloud Pub/Sub 入力データから作成された PCollection は無制限のグローバル ウィンドウを使用します。ただし、個別のエントリにはすでにタイムスタンプが押されています。これは、Cloud Stackdriver LogEntry オブジェクトからタイムスタンプ データを抽出する必要がないことを意味します。抽出するのは、カスタム LogMessage オブジェクトを作成するためのログ タイムスタンプだけです。

Cloud Pub/Sub パイプラインを使用すると、ログからタイムスタンプを抽出できます

パイプラインの残りの部分は、ダウンストリームのフラット化、変換、集計、出力オペレーションを含めて、現状のまま変更されません。

パイプラインのモニタリング

Cloud Dataflow ジョブを実行する場合は、Google Cloud Platform Console を使用すると、進行状況のモニタリングや、パイプライン内の各ステージに関する情報の確認を行うことができます。

次の画像は、パイプラインの例を実行しているときの GCP Console を示しています。

GCP Console では実行中の Cloud Dataflow ジョブが表示されます。

クリーンアップ

プロジェクトの削除

  1. GCP Console で [プロジェクト] ページに移動します。

    プロジェクト ページに移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

すべてのコンポーネントの削除

特定の環境変数が、設定中に使用した値に設定されたままであることを確認します。

  1. BigQuery データセットを削除します。

    bq rm $DATASET_NAME
    
  2. Cloud Logging のエクスポートを無効にします。エクスポートおよび指定した Cloud Storage バケットが削除されます。

    cd ../services
    ./logging.sh $PROJECT_ID $BUCKET_NAME batch down
    
  3. サンプルウェブ アプリケーションの実行に使用した Compute Engine クラスタを削除します。

    /cluster.sh $PROJECT_ID $CLUSTER_NAME down
    

ソリューションの拡張

このソリューションで説明したパイプラインと一連の操作は、さまざまな方法で拡張することができます。最も明白な拡張は、LogMessage データ全体で追加の集計を実行することです。たとえば、セッションまたは匿名ユーザー情報がログ出力に含まれている場合は、ユーザー アクティビティに基づいて集計を作成することができます。また、ApproximateQuantiles 変換を使用して、レスポンス時間の分布を生成することもできます。

次のステップ

  • Google Cloud Platform のその他の機能を試す。チュートリアルをご覧ください。
  • Google Cloud Platform プロダクトを使用してエンドツーエンドのソリューションを作成する方法を学習する。
このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...