Dataflow を使用して本番環境対応のログのエクスポートを Splunk にデプロイする

このチュートリアルでは、Cloud LoggingPub/SubDataflow を使用してスケーラブルでフォールト トレラントなログ エクスポート メカニズムを作成します。

このチュートリアルは、IT オペレーションまたはセキュリティに関するユースケースに対して、Google Cloud のリソースから Splunk Enterprise または Splunk Cloud にログとイベントをストリーミングすることを必要とする管理者を対象としています。このチュートリアルでは、Google が提供する Splunk Dataflow テンプレートを使用して、Splunk HTTP Event Collector(HEC)へのログのストリーミングを高い信頼性を確保しつつ大規模に実施します。また、Dataflow パイプラインの容量計画と、サーバーまたはネットワークに一時的な問題が発生した際に生じる可能性のある配信エラーを処理する方法についても説明します。

このチュートリアルは、次の図のような組織のリソース階層を前提としています。この図は、Splunk にログをエクスポートする組織レベルの集約シンクを示しています。ログ エクスポート パイプラインは、Splunk Export Project という名前のサンプル プロジェクト内に作成します。ここでは、組織ノードのすべての Google Cloud プロジェクトのログが安全に収集、処理、配信されます。

Splunk にログがエクスポートされる組織の集約シンク。

アーキテクチャ

次のアーキテクチャ図は、このチュートリアルで構築するログのエクスポート プロセスを示しています。

Splunk へのログのエクスポート。

  • プロセスの開始時に、組織レベルのログシンクによって単一の Pub/Sub トピックとサブスクリプションにログが転送されます。
  • プロセスの中心にあるメインの Dataflow パイプラインは、Pub/Sub から Splunk へのストリーミング パイプラインであり、Pub/Sub サブスクリプションからログを取得して Splunk に配信します。
  • メインの Dataflow パイプラインと同様に、2 番目の Dataflow パイプラインは Pub/Sub 間のストリーミング パイプラインであり、配信が失敗した場合にメッセージを再生します。
  • プロセスの最後には、ログの宛先として Splunk Enterprise または Splunk Cloud の HEC エンドポイントが配置されています。

目標

  • 専用のプロジェクトに集約ログシンクを作成します。
  • 組織のログレートに合わせて、Splunk Dataflow パイプラインの容量を計画します。
  • Splunk Dataflow パイプラインをデプロイして、Splunk にログをエクスポートします。
  • Splunk Dataflow パイプライン内のユーザー定義関数(UDF)を使用して、処理中のログまたはイベントを変換します。
  • 配信エラーに対処して、構成ミスや一時的なネットワークの問題によるデータ損失を回避します。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただけます。

このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳しくは、クリーンアップをご覧ください。

始める前に

  1. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  2. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  3. Cloud Monitoring API, Cloud Key Management Service, Compute Engine, Pub/Sub, and Dataflow API を有効にします。

    API を有効にする

IAM 権限を取得する

  1. Cloud Console で、組織リソースとプロジェクト リソースに対して次の Identity and Access Management(IAM)権限が付与されていることを確認します。詳細については、リソースへのアクセス権の付与、変更、取り消しをご覧ください。
    権限 事前定義ロール リソース
    • logging.sinks.create
    • logging.sinks.get
    • logging.sinks.update
    • ログ構成書き込み(roles/logging.configWriter
    組織
    • cloudkms.keyRings.create
    • cloudkms.cryptoKeys.*
    • Cloud KMS 管理者(roles/cloudkms.admin
    プロジェクト
    • compute.networks.*
    • compute.routers.*
    • compute.firewalls.*
    • networkservices.*
    • Compute ネットワーク管理者(roles/compute.networkAdmin
    • Compute セキュリティ管理者(roles/compute.securityAdmin
    プロジェクト
  2. 適切な IAM 権限が付与されていない場合は、カスタムロールを作成してください。カスタムロールは必要なアクセス権を付与する一方で、最小権限の原則に従うことができるようサポートします。

環境設定

  1. Cloud Console で、Cloud Shell をアクティブにします。

    Cloud Shell をアクティブにする

  2. Cloud Shell で、プロジェクト ID と組織 ID の変数を作成します。このチュートリアルでは、以下の変数を使用します。

    export PROJECT_ID=project-id
    export ORG_ID=organization-id
    
    • project-id: プロジェクト ID
    • organization-id: 組織 ID
  3. このチュートリアルでは、us-central1 リージョンにリソースを作成します。

    export REGION_ID=us-central1
    
  4. アクティブな Cloud Shell セッションのプロジェクトを設定します。

    gcloud config set project $PROJECT_ID
    

安全なネットワークの設定

このステップでは、ログを処理して Splunk Enterprise にエクスポートする前に、安全なネットワークを設定します。

  1. VPC ネットワークとサブネットを作成します。

    gcloud compute networks create export-network --subnet-mode=custom
    gcloud compute networks subnets create export-network-us-central \
         --network=export-network \
         --region=$REGION_ID \
         --range=192.168.1.0/24
    
  2. Cloud NAT ゲートウェイを作成します。

    gcloud compute routers create nat-router \
           --network=export-network \
           --region=$REGION_ID
    
    gcloud compute routers nats create nat-config \
       --router=nat-router \
       --nat-custom-subnet-ip-ranges=export-network-us-central \
       --auto-allocate-nat-external-ips \
       --region=$REGION_ID
    

    セキュリティ上の理由から、パブリック IP アドレスを使用せずに Dataflow パイプライン ワーカー VM をデプロイします。続行コマンドで Dataflow ワーカー VM が外部 Splunk HEC サービスにアクセスできるようにするには、Dataflow VM(この場合は export-network-us-central)のサブネットにマッピングされた Cloud NAT を構成します。この構成により、Dataflow ワーカー VM はインターネットにアクセスし、各 Dataflow ワーカー VM で外部 IP アドレスを使用することなく、Splunk に HTTPS リクエストを発行できます。

    Cloud NAT ゲートウェイは、使用中の Dataflow VM の数に応じて IP アドレスを自動的に割り振ります。

    Splunk HEC へのトラフィックを既知の IP アドレスのサブセットに制限する場合は、静的 IP アドレスを予約して Cloud NAT ゲートウェイに手動で割り当てます。ただし、このチュートリアルでは説明しません。

    詳細については、Cloud NAT IP アドレスCloud NAT のポート予約のドキュメントをご覧ください。

  3. 限定公開の Google アクセスを有効にする

     gcloud compute networks subnets update export-network-us-central \
         --enable-private-ip-google-access \
         --region=$REGION_ID
    

    Cloud NAT ゲートウェイを作成すると、限定公開の Google アクセスが自動的に有効になります。ただし、プライベート IP アドレスを持つ Dataflow ワーカーが Google Cloud APIs とサービスが使用する外部 IP アドレスにアクセスできるようにするには、サブネットの限定公開の Google アクセスを手動で有効にする必要があります。

ログシンクの作成

このセクションでは、組織全体にわたるログシンクとその Pub/Sub の宛先、さらに必要な権限を作成します。

  1. Cloud Shell で、新しいログシンクのエクスポート先として Pub/Sub トピックと関連するサブスクリプションを作成します。

    gcloud pubsub topics create org-logs-all
    gcloud pubsub subscriptions create \
        --topic org-logs-all org-logs-all-sub
    
  2. 組織のログシンクを作成します。

    gcloud logging sinks create org-logs-all-sink \
      pubsub.googleapis.com/projects/$PROJECT_ID/topics/org-logs-all \
      --organization=$ORG_ID \
      --include-children \
      --log-filter='NOT logName:projects/$PROJECT_ID/logs/dataflow.googleapis.com'
    

    このコマンドは、次のオプションで構成されます。

    • --organization オプションは、これが組織レベルのログシンクであることを指定します。
    • 組織レベルのログシンクにすべてのサブフォルダとプロジェクトのすべてのログが含まれるようにするには、--include-children オプションが必要です。
    • --log-filter オプションは、転送するログを指定します。この例では、プロジェクト $PROJECT_ID に固有の Dataflow オペレーション ログを除外します。これは、ログ エクスポート Dataflow パイプラインがログを処理するときにそれよりも多くのログを生成するためです。フィルタは、パイプラインが独自のログをエクスポートしないようにして、負荷の急増を防ぎます。

      出力には、o#####-####@gcp-sa-logging.iam.gserviceaccount.com の形式のサービス アカウントが含まれます。

  3. サービス アカウントを以下の変数として $LOG_SINK_SA に保存します。

     export LOG_SINK_SA=[MY_SA]@gcp-sa-logging.iam.gserviceaccount.com
    
  4. ログシンクのサービス アカウントに権限を付与します。

    gcloud pubsub topics add-iam-policy-binding org-logs-all \
        --member=serviceAccount:$LOG_SINK_SA \
        --role=roles/pubsub.publisher
    

    このコマンドは、Pub/Sub トピック org-logs-all のログシンク サービス アカウントに Pub/Sub パブリッシャー IAM のロールを付与し、ログシンク サービス アカウントがトピックのメッセージをパブリッシュできるようにします。

Splunk の HEC エンドポイントの設定

このステップでは、Splunk の HEC エンドポイントを設定し、新しく作成された HEC トークンを暗号化します。

Splunk の HEC を構成する

  1. まだ Splunk の HEC エンドポイントを構成していない場合は、Splunk のドキュメントで Splunk の HEC の構成方法をご覧ください。Splunk の HEC は、Splunk Cloud サービスまたは独自の Splunk Enterprise インスタンスで実行できます。
  2. Cloud Shell セッションで、Splunk の HEC トークンが作成されたら、トークン値をコピーします。
  3. トークンの値を splunk-hec-token-plaintext という名前のファイルに保存します。

暗号化用の Cloud KMS 鍵を作成する

Splunk HEC URL とトークンは、デプロイする Splunk Dataflow パイプラインに必須のパラメータです。セキュリティを強化するため、Cloud KMS 鍵を使用してトークンを暗号化し、Dataflow ジョブの作成時に暗号化されたトークンのみを渡します。これにより、Dataflow Console またはジョブの詳細から Splunk HEC トークンが漏洩しなくなります。

  1. Cloud Shell で、Cloud KMS キーリングを作成します。

    # Create a key ring in same location
    gcloud kms keyrings create export-keys \
      --location=$REGION_ID
    
  2. 新しいキーリングに Cloud KMS 鍵を作成します。

    # Create a key on the new key ring
    gcloud kms keys create hec-token-key \
        --keyring=export-keys \
        --location=$REGION_ID \
        --purpose="encryption"
    

Splunk の HEC トークンの暗号化と復号を行う権限を追加する

Splunk の HEC トークンを暗号化するには、鍵を使用できる暗号化と復号の IAM ロールを割り当てられている必要があります。また、Dataflow パイプライン ワーカーは Splunk のトークン パラメータをローカルに復号する必要があるため、Dataflow コントローラ サービス アカウントの暗号化と復号の IAM ロールも必要です。

Dataflow パイプライン ワーカーは Compute Engine インスタンスであり、デフォルトでプロジェクトの Compute Engine サービス アカウント(project-number-compute@developer.gserviceaccount.com)を使用します。

プロジェクトで Compute Engine API を有効にすると、Compute Engine サービス アカウントが自動的に作成されます。Compute Engine サービス アカウントは、Dataflow がリソースにアクセスしてオペレーションを実行するために使用する Dataflow コントローラ サービス アカウントとして機能します。

  1. Cloud Console で、[セキュリティ] ページに移動します。

    [セキュリティ] ページに移動

  2. [暗号鍵] タブを選択します。

  3. 作成したキーリングの横にあるチェックボックスをオンにします。

  4. 権限を編集するパネルがまだ開いていない場合は、[情報パネルを表示] をクリックします。

  5. 情報パネルの [権限] タブで、[メンバーを追加] をクリックします。

  6. プロジェクト アカウントと project-number-compute@developer.gserviceaccount.com の両方をメンバーとして追加します。

  7. [Cloud KMS 暗号鍵の暗号化 / 復号] ロールを選択します。

  8. [保存] をクリックします。

暗号化と復号のロールを追加します。

Splunk の HEC トークンを暗号化する

Cloud Shell で、Splunk の HEC エンドポイントを設定したときに作成した Cloud KMS 鍵 hec-token-key を使用して Splunk の HEC トークンを暗号化します。

    gcloud kms encrypt \
        --key=hec-token-key \
        --keyring=export-keys \
        --location=$REGION_ID \
        --plaintext-file=./splunk-hec-token-plaintext \
        --ciphertext-file=./splunk-hec-token-encrypted

このコマンドは、暗号化された Splunk の HEC トークン splunk-hec-token-encrypted を持つ新しいファイルを作成します。これで、一時ファイル splunk-hec-token-plaintext を削除できます。

Dataflow パイプラインの容量を計画する

Dataflow パイプラインをデプロイする前に、最大サイズとスループットを決定する必要があります。これらの値を決定すると、次のいずれの問題も発生させることなく、パイプラインがアップストリームの Pub/Sub サブスクリプションの 1 日におけるピーク時のログボリューム(GB/日)とログメッセージのレート(1 秒あたりのイベント数である EPS)に対応できます。

  • メッセージ バックログまたはメッセージ スロットリングによる遅延。
  • パイプラインのオーバープロビジョニングに要する追加費用(詳細については、このセクションの最後にある注をご覧ください)。

このチュートリアルのサンプル値は、次の特性を持つ組織に基づいています。

  • 1 日あたり 1 TB のログを生成する。
  • メッセージの平均サイズは 1 KB である。
  • 継続メッセージのピーク時のレートが、平均レートの 2 倍である。

パイプラインの最大サイズを設定するレート管理パラメータを設定するの手順に沿って、サンプル値を組織の値に置き換えることができます。

パイプラインの最大サイズを設定する

  1. 次の式を使用して平均 EPS を決定します。

    \( {AverageEventsPerSecond}\simeq\frac{TotalDailyLogsInTB}{AverageMessageSizeInKB}\times\frac{10^9}{24\times3600} \)

    この例では、生成されるログの平均レートは 11,500 EPS です。

  2. 次の式を使用して継続ピーク EPS を特定します。乗数 N はロギングの急増する性質を表します。この例では、N=2 です。したがって、生成されるログのピークレートは 23,000 EPS です。

    \( {PeakEventsPerSecond = N \times\ AverageEventsPerSecond} \)

  3. 最大 EPS を算出した後は、次のサイズ設定のガイドラインを使用して、必要な vCPU の最大数を決定できます。この数を使用して、n1-standard-4 マシンタイプを想定した場合の Dataflow ワーカーの最大数、または maxNumWorkers を算出することもできます。

    \( {maxCPUs = ⌈PeakEventsPerSecond / 3k ⌉\\ maxNumWorkers = ⌈maxCPUs / 4 ⌉} \)

    この例では、⌈23 / 3⌉ = 8 により最大 8 個の vCPU コアが必要です。これは、デフォルトのマシンタイプ n1-standard-4 における VM の最大数 2 に相当します。

  4. Cloud Shell で、次の環境変数を使用してパイプラインのサイズを設定します。

    export DATAFLOW_MACHINE_TYPE="n1-standard-4"
    export DATAFLOW_MACHINE_COUNT=2
    

レート管理パラメータを設定する

Splunk Dataflow パイプラインには、レート管理パラメータがあります。これらのパラメータは出力 EPS レートを調整し、ダウンストリーム Splunk HEC エンドポイントの過負荷を防ぎます。

  1. 次のガイドラインを使用して、すべての VM ワーカーにおける Splunk の HEC への並列接続数を決定して、EPS レートを最大化します。

    \( {parallelism = maxCPUs * 2} \)

    parallelism 設定をオーバーライドして、vCPU あたり 2~4 の同時接続に、デプロイされた最大数のワーカーで対応します。デフォルトの parallelism 値が 1 の場合、並列処理は無効になり、出力レートは人為的に制限されます。

    この例では、並列接続数は 2 x 8 = 16 として算出されます。

  2. EPS を改善し、Splunk の HEC に対する負荷を軽減するには、イベントのバッチ処理を使用します。

    \( {batchCount >= 10} \)

    平均ログメッセージを 1 KB 前後にして、リクエストごとに少なくとも 10 個のイベントをバッチ処理することをおすすめします。最小イベント数を設定すると、Splunk の HEC に対して過度の負荷が生じることを回避しつつ、有効な EPS レートを改善できます。

  3. Cloud Shell で、parallelismbatchCount の計算値を使用して、次の環境変数をレートの制御用に設定します。

    export DATAFLOW_PARALLELISM=16
    export DATAFLOW_BATCH_COUNT=10
    

パイプライン容量パラメータの概要

次の表は、このチュートリアルの次のステップで使用されるパイプライン容量の値と、ジョブ パラメータを構成するためのベスト プラクティスをまとめたものです。

パラメータ チュートリアルの値 一般的なベスト プラクティス
DATAFLOW_MACHINE_TYPE n1-standard-4 コスト比に対する最適なパフォーマンスを実現するため、ベースライン マシンサイズ n1-standard-4 に設定
DATAFLOW_MACHINE_COUNT

2

上記の計算に基づく予想ピーク時の EPS を処理するために必要なワーカー数(maxNumWorkers)に設定
DATAFLOW_PARALLELISM

16

2 x ワーカー当たりの vCPU 数 x maxNumWorkers を、並列 HEC 接続数の最大値に設定
DATAFLOW_BATCH_COUNT

10

最大バッファリング遅延(2 秒)を許容できる場合は、ログのイベント / リクエストを 10~50 に設定

自動スケーリング パイプラインは、ワーカーの最大数または maxNumWorkers を想定して、可能性のあるストリーミング ワーカーごとに 1 つのデータ永続ディスク(デフォルトは 400 GB)をデプロイします。これらのディスクは、起動時を含め、実行中の任意の時点でワーカー間でマウントされます。

各ワーカー インスタンスは 15 個の永続ディスクに制限されているため、起動ワーカーの最小数は ⌈maxNumWorkers/15⌉ です。そのため、デフォルト値が maxNumWorkers=20 の場合は、パイプライン使用量(と費用)は次のようになります。

  • ストレージ: 20 個の永続ディスクを使用する静的ストレージ
  • コンピューティング: 少なくとも 2 つのワーカー インスタンス(⌈20/15⌉ = 2)以上、かつ最大 20 個のワーカー インスタンスを使用する動的コンピューティング。

この値は 8 TB の永続ディスクに相当します。ディスクが完全に使用されないと、特にほとんどの時間で 1 つまたは 2 つのワーカーしか稼働していない場合に、不必要なコストが発生する可能性があります。

Dataflow パイプラインを使用したログのエクスポート

このセクションでは、Google Cloud のログメッセージを Splunk の HEC に配信する Dataflow パイプラインをデプロイします。未処理のトピック(デッドレター トピック)やサブスクリプションなどの依存リソースもデプロイし、配信不能のメッセージを保持します。

Dataflow パイプラインをデプロイする

  1. Cloud Shell で、未処理のサブスクリプションとして使用する Pub/Sub トピックとサブスクリプションを作成します。

     gcloud pubsub topics create org-logs-all-dl
     gcloud pubsub subscriptions create --topic org-logs-all-dl org-logs-all-dl-sub
    
  2. 残りのパイプライン パラメータを構成するために必要な環境変数を設定します。YOUR_SPLUNK_HEC_URL を SPLUNK HEC エンドポイント(https://splunk-hec-host:8088など)に置き換えます。

     # Splunk HEC endpoint values
     export SPLUNK_HEC_URL="YOUR_SPLUNK_HEC_URL"
     export SPLUNK_HEC_TOKEN=`cat ./splunk-hec-token-encrypted | base64`
     # Dataflow pipeline input subscription and dead-letter topic
     export DATAFLOW_INPUT_SUB="org-logs-all-sub"
     export DATAFLOW_DEADLETTER_TOPIC="org-logs-all-dl"
    
  3. Dataflow パイプラインをデプロイします。

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    tokenKMSEncryptionKey=projects/${PROJECT_ID}/locations/${REGION_ID}/keyRings/export-keys/cryptoKeys/hec-token-key,\
    url=${SPLUNK_HEC_URL},\
    token=${SPLUNK_HEC_TOKEN},\
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://splk-public/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    出力に返された新しいジョブ ID をコピーします。

    デフォルトでは、Splunk Dataflow パイプラインは Splunk の HEC エンドポイントの SSL 証明書を検証します。開発とテスト用に自己署名証明書を使用する場合は、SSL 検証を無効にする必要があります。詳細については、Pub/Sub to Splunk Dataflow テンプレート パラメータ(disableCertificateValidation)をご覧ください。

  4. 新しいジョブ ID を DATAFLOW_JOB_ID 環境変数に保存します。この変数は後のステップで使用します。

    export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"
    

Splunk でログを表示する

Dataflow パイプライン ワーカーがプロビジョニングされ、Splunk の HEC にログを配信できるようになるまでに数分を要します。ログが正常に受信され、Splunk Enterprise または Splunk Cloud Search インターフェースでインデックスに登録されていることを確認できます。

数分ほどで、Dataflow パイプライン ワーカーがプロビジョニングされ、Splunk HEC にログを提供できるようになります。ログが正常に受信され、Splunk Enterprise または Splunk Cloud Search インターフェースでインデックスに登録されていることを確認できます。モニタリング対象リソースの種類別のログ数を表示するには:

  1. Splunk で [Splunk Search & Reporting] を開きます。
  2. MY_INDEX インデックスが Splunk の HEC トークンに構成されている index=[MY_INDEX] | stats count by resource.type の検索を実行します。

    Splunk でログを表示します。

  3. イベントが表示されない場合は、配信エラーの処理をご覧ください。

UDF による処理中イベントの変換

Splunk Dataflow テンプレートは、カスタム イベント変換用の UDF をサポートしています。デプロイしたパイプラインは、オプションのパラメータ javascriptTextTransformGcsPathjavascriptTextTransformFunctionName で指定されたサンプル UDF を使用します。サンプルの UDF には、新しいフィールドの追加やイベントベースでの Splunk HEC メタデータの設定など、イベント拡張用のコードサンプルが含まれています。サンプル UDF には、失敗した配信をリプレイするデコード ロジックも含まれています。この方法については、サンプル UDF の変更をご覧ください。

このセクションでは、サンプルの UDF 関数を編集して、新しいイベント フィールドを追加します。この新しいフィールドでは、追加のコンテキスト情報として元の Pub/Sub サブスクリプションの値を指定します。

サンプル UDF を変更する

  1. Cloud Shell で、サンプル UDF 関数を含む JavaScript ファイルをダウンロードします。

    wget https://storage.googleapis.com/splk-public/js/dataflow_udf_messages_replay.js
    
  2. 任意のエディタで JavaScript ファイルを開きます。新しいフィールド inputSubscription をイベント ペイロードに追加する行のコメントを解除します。

    // event.inputSubscription = "splunk-dataflow-pipeline";
    
  3. 新しいイベント フィールド inputSubscription"org-logs-all-sub" に設定して、イベントの発生元である入力 Pub/Sub サブスクリプションをトラッキングします。

    event.inputSubscription = "org-logs-all-sub";
    
  4. ファイルを保存します。

  5. Cloud Shell で、新しい Cloud Storage バケットを作成します。

    # Create a new Cloud Storage bucket
    gsutil mb -b on gs://${PROJECT_ID}-dataflow/
    
  6. ファイルを Cloud Storage バケットにアップロードします。

    # Upload JavaScript file
    gsutil cp ./dataflow_udf_messages_replay.js gs://${PROJECT_ID}-dataflow/js/
    

新しい UDF で Dataflow パイプラインを更新する

  1. Cloud Shell で [ドレイン オプション] を使用してパイプラインを停止します。これにより、すでに Pub/Sub から pull されたログが失われないようにできます。

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    
  2. 更新された UDF を使用して新しいパイプラインをデプロイします。

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    tokenKMSEncryptionKey=projects/${PROJECT_ID}/locations/${REGION_ID}/keyRings/export-keys/cryptoKeys/hec-token-key,\
    url=${SPLUNK_HEC_URL},\
    token=${SPLUNK_HEC_TOKEN},\
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://${PROJECT_ID}-dataflow/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    出力に返された新しいジョブ ID をコピーします。

  3. ジョブ ID を DATAFLOW_JOB_ID 環境変数に保存します。export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"

配信エラーの処理

イベント処理または Splunk の HEC への接続に関するエラーにより、配信エラーが発生する場合があります。このセクションでは、エラー処理ワークフローのデモを行うための配信エラーを導入します。また、Splunk への配信に失敗したメッセージを表示して再配信をトリガーする方法についても説明します。

エラー処理の概要

次の図は、Splunk Dataflow パイプラインのエラー処理ワークフローを示しています。

Splunk へのログのエクスポート。

  1. Pub/Sub to Splunk Dataflow パイプライン(メイン パイプライン)は、ユーザーによる調査のため、配信不能のメッセージを自動的に未処理トピックに転送します。
  2. この演算子は、処理されていないサブスクリプションでの失敗したメッセージの調査とトラブルシューティングを行い、HEC トークンの構成ミスを修正するなど、配信エラーの根本原因を修正します。
  3. この演算子は Pub/Sub to Pub/Sub Dataflow パイプライン(セカンダリ パイプライン)をトリガーします。このパイプライン(前の図の点で強調表示された部分)は一時的なパイプラインで、失敗したメッセージを未処理のサブスクリプションから元のログシンクのトピックに戻します。
  4. メイン パイプラインは、以前失敗したメッセージを再処理します。この手順では、パイプラインでサンプル UDF を使用して、失敗したメッセージ ペイロードを正しく検出およびデコードする必要があります。この関数の次の部分では、この条件付きデコード ロジック(トラッキングのための配信試行の集計など)を実装します。

    // If message has already been converted to Splunk HEC object  with stringified
     // obj.event JSON payload, then it's a replay of a previously failed delivery:
     // Unnest and parse obj.event. Drop previously injected obj.attributes
     // such as errorMessage and timestamp
     if (obj.event) {
       try {
         event = JSON.parse(obj.event);
         redelivery = true;
       } catch(e) {
         event = obj;
       }
     } else {
       event = obj;
     }
    
     // Keep a tally of delivery attempts
     event.delivery_attempt = event.delivery_attempt || 1;
     if (redelivery) {
       event.delivery_attempt += 1;
     }
    

配信エラーをトリガーする

このセクションでは、配信エラーをトリガーします。次のいずれかの方法で、配信エラーを手動で導入できます。

  • Splunk サーバーを停止し(単一インスタンスの場合)、接続エラーを発生させます。
  • Splunk 入力構成から、関連する HEC トークンを無効にします。

失敗したメッセージに関するトラブルシューティング

失敗したメッセージを調べるには、Cloud Console を使用します。

  1. Cloud Console で、Pub/Sub の [サブスクリプション] ページを開きます。

    Pub/Sub の [サブスクリプション] に移動

  2. 作成した未処理のサブスクリプションをクリックします。上記の例を使用した場合、サブスクリプション名は projects/${PROJECT_ID}/subscriptions/org-logs-all-dl-sub です。

  3. メッセージ ビューアを開くには、[メッセージを表示] をクリックします。

  4. メッセージを表示するには、[PULL] をクリックします。[確認応答メッセージを有効にする] はオフのままにします。

  5. 以下のメッセージをはじめとする、失敗したメッセージの検査が可能になりました。

    • Message body 列の Splunk イベント ペイロード。
    • attribute.errorMessage 列のエラー メッセージ。
    • attribute.timestamp 列のエラー タイムスタンプ。

次のスクリーンショットは、Splunk の HEC エンドポイントが一時的に停止している、または到達できない場合に表示されるエラー メッセージの例です。errorMessage 属性 The target server failed to respond に注意してください。

失敗したメッセージの属性。

次の表に、考えられる Splunk 配信エラーと、それらのメッセージを未処理のトピックに転送する前にパイプラインで記録される errorMessage 属性を示します。

潜在的な処理または接続に関するエラー Dataflow テンプレートによる自動再試行の有無 errorMessage 属性の例
Splunk サーバー 5xx エラー Splunk write status code: 503
Splunk サーバー 4xx エラー × Splunk write status code: 403
Splunk サーバーの停止 × The target server failed to respond
Splunk SSL 証明書が無効 × Host name X does not match the certificate...
UDF JavaScript 構文エラー × ReferenceError: foo is not defined
一時的なネットワーク エラー × Read timed out

or

Connection reset

場合によっては、パイプラインは指数バックオフを使用して自動的に再試行を試みます。たとえば、Splunk サーバーの 5xx エラーが発生した場合、Splunk の HEC エンドポイントが過負荷状態になるとエラーが発生します。または、メッセージが HEC に送信されない永続的な問題が発生することがあります。この場合、パイプラインは再試行しません。次の問題は、Splunk 5xx エラートリガーの例です。

  • UDF 関数の構文エラー。
  • 無効な HEC トークンにより、Splunk サーバー 4xx による「Forbidden」サーバー レスポンスが発生しています。

失敗したメッセージを再生する

このセクションでは、配信エラーの根本原因が修正されたことを前提に、未処理のメッセージを再生します。トリガー配信エラーのセクションで Splunk の HEC エンドポイントを無効にした場合は、Splunk の HEC エンドポイントが動作するようになっていることを確認します。

  1. Cloud Shell では、未処理のサブスクリプションからのメッセージを再処理する前に、未処理のサブスクリプションのスナップショットを作成することをおすすめします。これにより、予期しない構成エラーが発生した場合にメッセージが失われないようにすることができます。

     gcloud pubsub snapshots create dlt-snapshot-`date +"%Y%m%d-%H%M%S"` \
         --subscription=org-logs-all-dl-sub
    
  2. Pub/Sub to Pub/Sub Dataflow テンプレートを使用し、別の Dataflow ジョブにより、未処理のサブスクリプションから入力トピックにメッセージを転送します。

      DATAFLOW_INPUT_TOPIC="org-logs-all"
      DATAFLOW_DEADLETTER_SUB="org-logs-all-dl-sub"
    
      JOB_NAME=splunk-dataflow-replay-`date +"%Y%m%d-%H%M%S"`
      gcloud dataflow jobs run $JOB_NAME \
           --gcs-location= gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
           --worker-machine-type=n1-standard-2 \
           --max-workers=1 \
           --region=$REGION_ID \
           --parameters \
      inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_DEADLETTER_SUB},\
      outputTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_INPUT_TOPIC}
    

    このコマンドが返す Dataflow ジョブ ID をコピーします。

  3. Dataflow ジョブ ID を DATAFLOW_JOB_ID 環境変数に保存します。

  4. Cloud Console で、Pub/Sub の [サブスクリプション] ページに移動します。

    Pub/Sub の [サブスクリプション] ページに移動

  5. 未処理のサブスクリプションを選択します。[ACK 処理されていないメッセージ数] が 0 に減少していることを確認します。

    失敗したメッセージ

  6. Cloud Shell で、作成した Dataflow ジョブをドレインします。

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    

    メッセージが元の入力トピックに転送されると、メインの Dataflow パイプラインは自動的に失敗したメッセージを取得して Splunk に再配信します。

Splunk でメッセージを確認する

  1. メッセージが再配信されたことを確認するには、Splunk で [Splunk Search & Reporting] を開きます。

  2. delivery_attempts > 1 を検索します。これは、サンプル UDF が各イベントに追加され、配信試行回数を追跡する特別なフィールドです。検索の期間の範囲を拡張して、以前に発生した可能性のあるイベントが含まれるようにしてください。これは、イベントのタイムスタンプはインデックス登録の日時ではなく、イベントを最初に作成した日時であるためです。

次の画像の例では、最初に失敗した 2 つのメッセージが、数日前の正しいタイムスタンプを使用して Splunk に正常に配信されてインデックスに登録されています。insertId フィールド値は、未処理のサブスクリプションから手動で pull することにより失敗したメッセージを調べるときに見つかった値と同じになります。insertId は、Cloud Logging が割り当てる元のログエントリの一意の識別子です。

Splunk で失敗したメッセージ。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

組織レベルのシンクを削除する

gcloud logging sinks delete org-logs-all-sink --organization=$ORG_ID

プロジェクトの削除

ログシンクを削除したら、ログの受信とエクスポート用に作成されたリソースの削除を進めます。最も簡単な方法は、チュートリアル用に作成したプロジェクトを削除することです。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

次のステップ